From 84b686eb2e4e2eccde13f7cee1987b4211660729 Mon Sep 17 00:00:00 2001
From: Marcin Chrzanowski <marcin.j.chrzanowski@gmail.com>
Date: Sun, 12 Jan 2020 21:00:58 +0100
Subject: Use GTP time protocol for gossiping

---
 .../agent/messages/AttributesMessage.java          |  9 +++-
 .../cloudatlas/agent/messages/QueryMessage.java    |  9 +++-
 .../mimuw/cloudatlas/agent/modules/GossipGirl.java |  8 ++--
 .../cloudatlas/agent/modules/GossipGirlState.java  | 50 ++++++++++++++++++++--
 4 files changed, 67 insertions(+), 9 deletions(-)

(limited to 'src/main/java/pl/edu/mimuw')

diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java
index e4e3cb7..e3bd390 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java
@@ -3,18 +3,21 @@ package pl.edu.mimuw.cloudatlas.agent.messages;
 import java.util.Map;
 
 import pl.edu.mimuw.cloudatlas.model.AttributesMap;
+import pl.edu.mimuw.cloudatlas.model.ValueDuration;
 import pl.edu.mimuw.cloudatlas.model.PathName;
 
 public class AttributesMessage extends RemoteGossipGirlMessage {
     private PathName path;
     private AttributesMap attributes;
     private long receiverGossipId;
+    private ValueDuration offset;
 
-    public AttributesMessage(String messageId, long timestamp, PathName path, AttributesMap attributes, long receiverGossipId) {
+    public AttributesMessage(String messageId, long timestamp, PathName path, AttributesMap attributes, long receiverGossipId, ValueDuration offset) {
         super(messageId, timestamp, Type.ATTRIBUTES);
         this.path = path;
         this.attributes = attributes;
         this.receiverGossipId = receiverGossipId;
+        this.offset = offset;
     }
 
     private AttributesMessage() {}
@@ -30,4 +33,8 @@ public class AttributesMessage extends RemoteGossipGirlMessage {
     public long getReceiverGossipId() {
         return receiverGossipId;
     }
+
+    public ValueDuration getOffset() {
+        return offset;
+    }
 }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java
index e457c21..77a2068 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java
@@ -1,18 +1,21 @@
 package pl.edu.mimuw.cloudatlas.agent.messages;
 
 import pl.edu.mimuw.cloudatlas.model.Attribute;
+import pl.edu.mimuw.cloudatlas.model.ValueDuration;
 import pl.edu.mimuw.cloudatlas.model.ValueQuery;
 
 public class QueryMessage extends RemoteGossipGirlMessage {
     private Attribute name;
     private ValueQuery query;
     private long receiverGossipId;
+    private ValueDuration offset;
 
-    public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId) {
+    public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId, ValueDuration offset) {
         super(messageId, timestamp, Type.QUERY);
         this.name = name;
         this.query = query;
         this.receiverGossipId = receiverGossipId;
+        this.offset = offset;
     }
 
     public QueryMessage() {}
@@ -28,4 +31,8 @@ public class QueryMessage extends RemoteGossipGirlMessage {
     public long getReceiverGossipId() {
         return receiverGossipId;
     }
+
+    public ValueDuration getOffset() {
+        return offset;
+    }
 }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
index 440df33..5199e82 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
@@ -159,14 +159,14 @@ public class GossipGirl extends Module {
     private void sendInfo(GossipGirlState state) throws InterruptedException {
         System.out.println("DEBUG: about to send info");
         for (ZMI zmi : state.getZMIsToSend()) {
-            AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId);
+            AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId, state.offset);
             UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage);
             System.out.println("INFO: GossipGirl sending AttributesMessage");
             sendMessage(udupMessage);
         }
 
         for (Entry<Attribute, ValueQuery> query : state.getQueriesToSend()) {
-            QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId);
+            QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId, state.offset);
             UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage);
             System.out.println("INFO: GossipGirl sending QueryMessage");
             sendMessage(udupMessage);
@@ -183,7 +183,7 @@ public class GossipGirl extends Module {
             if (state.state == GossipGirlState.State.SEND_INFO || state.state == GossipGirlState.State.SEND_INFO_AND_FINISH) {
                 sendInfo(state);
             }
-            UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes());
+            UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), state.modifyAttributes(message.getAttributes()));
             System.out.println("INFO: GossipGirl sending UpdateAttributesMessage");
             sendMessage(updateMessage);
             if (state.state == GossipGirlState.State.FINISHED) {
@@ -199,7 +199,7 @@ public class GossipGirl extends Module {
         if (state != null) {
             System.out.println("INFO: handling Query in " + Long.toString(message.getReceiverGossipId()));
             state.setLastAction();
-            state.gotQuery(message.getName());
+            state.gotQuery(message);
             Map<Attribute, Entry<ValueQuery, ValueTime>> queries = new HashMap();
             queries.put(
                 message.getName(),
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
index e9bc02a..251d8b3 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
@@ -13,9 +13,13 @@ import java.util.Set;
 import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage;
 import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage;
 import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.QueryMessage;
 import pl.edu.mimuw.cloudatlas.model.Attribute;
+import pl.edu.mimuw.cloudatlas.model.AttributesMap;
 import pl.edu.mimuw.cloudatlas.model.PathName;
 import pl.edu.mimuw.cloudatlas.model.ValueContact;
+import pl.edu.mimuw.cloudatlas.model.ValueDuration;
+import pl.edu.mimuw.cloudatlas.model.ValueInt;
 import pl.edu.mimuw.cloudatlas.model.ValueQuery;
 import pl.edu.mimuw.cloudatlas.model.ValueTime;
 import pl.edu.mimuw.cloudatlas.model.ValueUtils;
@@ -48,18 +52,21 @@ public class GossipGirlState {
     public ValueTime hejkaSendTimestamp;
     public ValueTime hejkaReceiveTimestamp;
     public ValueTime noCoTamSendTimestamp;
-    public ValueTime noCoTamSendReceiveTimestamp;
+    public ValueTime noCoTamReceiveTimestamp;
+    public ValueDuration offset;
     private Map<PathName, ValueTime> theirZoneTimestamps;
     private Map<Attribute, ValueTime> theirQueryTimestamps;
     private List<PathName> zonesToSend;
     private List<Attribute> queriesToSend;
     private Set<PathName> waitingForZones;
     private Set<Attribute> waitingForQueries;
+    private boolean initiating;
 
     public GossipGirlState(long gossipId, PathName ourPath, ValueContact theirContact, boolean initiating) {
         this.gossipId = gossipId;
         this.ourPath = ourPath;
         this.theirContact = theirContact;
+        this.initiating = initiating;
         System.out.println("INFO: initializing Gossip state, their contact " + theirContact.toString());
         if (initiating) {
             state = State.WAIT_FOR_STATE_INITIALIZER;
@@ -139,6 +146,9 @@ public class GossipGirlState {
                 theirQueryTimestamps = message.getQueryTimestamps();
                 hejkaSendTimestamp = message.getHejkaSendTimestamp();
                 hejkaReceiveTimestamp = message.getHejkaReceiveTimestamp();
+                noCoTamSendTimestamp = message.getSentTimestamp();
+                noCoTamReceiveTimestamp = message.getReceivedTimestamp();
+                computeOffset();
                 System.out.println("DEBUG: set basic stuff");
                 setZonesToSend();
                 setQueriesToSend();
@@ -152,6 +162,23 @@ public class GossipGirlState {
         }
     }
 
+    public void computeOffset() {
+        ValueDuration rtd = (ValueDuration) (noCoTamReceiveTimestamp.subtract(hejkaSendTimestamp))
+                .subtract(noCoTamSendTimestamp.subtract(hejkaReceiveTimestamp));
+        offset = (ValueDuration) (noCoTamSendTimestamp.addValue(rtd.divide(new ValueInt(2l))))
+                .subtract(noCoTamReceiveTimestamp);
+        System.out.println("INFO: GossipGirlState calculated offset: " + offset.toString());
+    }
+
+    public AttributesMap modifyAttributes(AttributesMap attributes) {
+        ValueDuration delta = offset;
+        if (!initiating) {
+            delta = delta.negate();
+        }
+        attributes.addOrChange("timestamp", attributes.getOrNull("timestamp").subtract(delta));
+        return attributes;
+    }
+
     private void setWaitingFor() {
         setWaitingForZones();
         setWaitingForQueries();
@@ -305,6 +332,7 @@ public class GossipGirlState {
                 setZonesToSend();
                 setQueriesToSend();
                 setWaitingFor();
+                offset = message.getOffset();
                 state = State.SEND_INFO;
 
                 if (!waitingForZones.remove(message.getPath())) {
@@ -321,10 +349,26 @@ public class GossipGirlState {
         }
     }
 
-    public void gotQuery(Attribute name) {
+    public void gotQuery(QueryMessage message) {
         switch (state) {
+            case WAIT_FOR_FIRST_INFO:
+                // TODO: use offset to setup GTP
+                offset = message.getOffset();
+                setZonesToSend();
+                setQueriesToSend();
+                setWaitingFor();
+                state = State.SEND_INFO;
+
+                if (!waitingForQueries.remove(message.getName())) {
+                    System.out.println("DEBUG: got query we weren't expecting");
+                }
+                if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) {
+                    System.out.println("INFO: done waiting for info");
+                    state = state.SEND_INFO_AND_FINISH;
+                }
+                break;
             case WAIT_FOR_INFO:
-                if (!waitingForQueries.remove(name)) {
+                if (!waitingForQueries.remove(message.getName())) {
                     System.out.println("DEBUG: got query we weren't expecting");
                 }
                 if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) {
-- 
cgit v1.2.3