m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin <marcin.j.chrzanowski@gmail.com>2020-01-12 21:00:34 +0100
committerGitHub <noreply@github.com>2020-01-12 21:00:34 +0100
commit32bfe8f7efc1f4fb99ddf827a19ab466724dac06 (patch)
tree70b2bdcbd091cfe32a208198d0490f8b7a1c7ecd
parent48367af9a7c2e46de51c29cd9ad84e5fdae5c2df (diff)
parent84b686eb2e4e2eccde13f7cee1987b4211660729 (diff)
Merge pull request #114 from m-chrzan/gjetep
Use GTP time protocol for gossiping
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java9
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java9
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java8
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java50
-rw-r--r--src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java23
5 files changed, 80 insertions, 19 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java
index e4e3cb7..e3bd390 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java
@@ -3,18 +3,21 @@ package pl.edu.mimuw.cloudatlas.agent.messages;
import java.util.Map;
import pl.edu.mimuw.cloudatlas.model.AttributesMap;
+import pl.edu.mimuw.cloudatlas.model.ValueDuration;
import pl.edu.mimuw.cloudatlas.model.PathName;
public class AttributesMessage extends RemoteGossipGirlMessage {
private PathName path;
private AttributesMap attributes;
private long receiverGossipId;
+ private ValueDuration offset;
- public AttributesMessage(String messageId, long timestamp, PathName path, AttributesMap attributes, long receiverGossipId) {
+ public AttributesMessage(String messageId, long timestamp, PathName path, AttributesMap attributes, long receiverGossipId, ValueDuration offset) {
super(messageId, timestamp, Type.ATTRIBUTES);
this.path = path;
this.attributes = attributes;
this.receiverGossipId = receiverGossipId;
+ this.offset = offset;
}
private AttributesMessage() {}
@@ -30,4 +33,8 @@ public class AttributesMessage extends RemoteGossipGirlMessage {
public long getReceiverGossipId() {
return receiverGossipId;
}
+
+ public ValueDuration getOffset() {
+ return offset;
+ }
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java
index e457c21..77a2068 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java
@@ -1,18 +1,21 @@
package pl.edu.mimuw.cloudatlas.agent.messages;
import pl.edu.mimuw.cloudatlas.model.Attribute;
+import pl.edu.mimuw.cloudatlas.model.ValueDuration;
import pl.edu.mimuw.cloudatlas.model.ValueQuery;
public class QueryMessage extends RemoteGossipGirlMessage {
private Attribute name;
private ValueQuery query;
private long receiverGossipId;
+ private ValueDuration offset;
- public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId) {
+ public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId, ValueDuration offset) {
super(messageId, timestamp, Type.QUERY);
this.name = name;
this.query = query;
this.receiverGossipId = receiverGossipId;
+ this.offset = offset;
}
public QueryMessage() {}
@@ -28,4 +31,8 @@ public class QueryMessage extends RemoteGossipGirlMessage {
public long getReceiverGossipId() {
return receiverGossipId;
}
+
+ public ValueDuration getOffset() {
+ return offset;
+ }
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
index 440df33..5199e82 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
@@ -159,14 +159,14 @@ public class GossipGirl extends Module {
private void sendInfo(GossipGirlState state) throws InterruptedException {
System.out.println("DEBUG: about to send info");
for (ZMI zmi : state.getZMIsToSend()) {
- AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId);
+ AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId, state.offset);
UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage);
System.out.println("INFO: GossipGirl sending AttributesMessage");
sendMessage(udupMessage);
}
for (Entry<Attribute, ValueQuery> query : state.getQueriesToSend()) {
- QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId);
+ QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId, state.offset);
UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage);
System.out.println("INFO: GossipGirl sending QueryMessage");
sendMessage(udupMessage);
@@ -183,7 +183,7 @@ public class GossipGirl extends Module {
if (state.state == GossipGirlState.State.SEND_INFO || state.state == GossipGirlState.State.SEND_INFO_AND_FINISH) {
sendInfo(state);
}
- UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes());
+ UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), state.modifyAttributes(message.getAttributes()));
System.out.println("INFO: GossipGirl sending UpdateAttributesMessage");
sendMessage(updateMessage);
if (state.state == GossipGirlState.State.FINISHED) {
@@ -199,7 +199,7 @@ public class GossipGirl extends Module {
if (state != null) {
System.out.println("INFO: handling Query in " + Long.toString(message.getReceiverGossipId()));
state.setLastAction();
- state.gotQuery(message.getName());
+ state.gotQuery(message);
Map<Attribute, Entry<ValueQuery, ValueTime>> queries = new HashMap();
queries.put(
message.getName(),
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
index e9bc02a..251d8b3 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
@@ -13,9 +13,13 @@ import java.util.Set;
import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.QueryMessage;
import pl.edu.mimuw.cloudatlas.model.Attribute;
+import pl.edu.mimuw.cloudatlas.model.AttributesMap;
import pl.edu.mimuw.cloudatlas.model.PathName;
import pl.edu.mimuw.cloudatlas.model.ValueContact;
+import pl.edu.mimuw.cloudatlas.model.ValueDuration;
+import pl.edu.mimuw.cloudatlas.model.ValueInt;
import pl.edu.mimuw.cloudatlas.model.ValueQuery;
import pl.edu.mimuw.cloudatlas.model.ValueTime;
import pl.edu.mimuw.cloudatlas.model.ValueUtils;
@@ -48,18 +52,21 @@ public class GossipGirlState {
public ValueTime hejkaSendTimestamp;
public ValueTime hejkaReceiveTimestamp;
public ValueTime noCoTamSendTimestamp;
- public ValueTime noCoTamSendReceiveTimestamp;
+ public ValueTime noCoTamReceiveTimestamp;
+ public ValueDuration offset;
private Map<PathName, ValueTime> theirZoneTimestamps;
private Map<Attribute, ValueTime> theirQueryTimestamps;
private List<PathName> zonesToSend;
private List<Attribute> queriesToSend;
private Set<PathName> waitingForZones;
private Set<Attribute> waitingForQueries;
+ private boolean initiating;
public GossipGirlState(long gossipId, PathName ourPath, ValueContact theirContact, boolean initiating) {
this.gossipId = gossipId;
this.ourPath = ourPath;
this.theirContact = theirContact;
+ this.initiating = initiating;
System.out.println("INFO: initializing Gossip state, their contact " + theirContact.toString());
if (initiating) {
state = State.WAIT_FOR_STATE_INITIALIZER;
@@ -139,6 +146,9 @@ public class GossipGirlState {
theirQueryTimestamps = message.getQueryTimestamps();
hejkaSendTimestamp = message.getHejkaSendTimestamp();
hejkaReceiveTimestamp = message.getHejkaReceiveTimestamp();
+ noCoTamSendTimestamp = message.getSentTimestamp();
+ noCoTamReceiveTimestamp = message.getReceivedTimestamp();
+ computeOffset();
System.out.println("DEBUG: set basic stuff");
setZonesToSend();
setQueriesToSend();
@@ -152,6 +162,23 @@ public class GossipGirlState {
}
}
+ public void computeOffset() {
+ ValueDuration rtd = (ValueDuration) (noCoTamReceiveTimestamp.subtract(hejkaSendTimestamp))
+ .subtract(noCoTamSendTimestamp.subtract(hejkaReceiveTimestamp));
+ offset = (ValueDuration) (noCoTamSendTimestamp.addValue(rtd.divide(new ValueInt(2l))))
+ .subtract(noCoTamReceiveTimestamp);
+ System.out.println("INFO: GossipGirlState calculated offset: " + offset.toString());
+ }
+
+ public AttributesMap modifyAttributes(AttributesMap attributes) {
+ ValueDuration delta = offset;
+ if (!initiating) {
+ delta = delta.negate();
+ }
+ attributes.addOrChange("timestamp", attributes.getOrNull("timestamp").subtract(delta));
+ return attributes;
+ }
+
private void setWaitingFor() {
setWaitingForZones();
setWaitingForQueries();
@@ -305,6 +332,7 @@ public class GossipGirlState {
setZonesToSend();
setQueriesToSend();
setWaitingFor();
+ offset = message.getOffset();
state = State.SEND_INFO;
if (!waitingForZones.remove(message.getPath())) {
@@ -321,10 +349,26 @@ public class GossipGirlState {
}
}
- public void gotQuery(Attribute name) {
+ public void gotQuery(QueryMessage message) {
switch (state) {
+ case WAIT_FOR_FIRST_INFO:
+ // TODO: use offset to setup GTP
+ offset = message.getOffset();
+ setZonesToSend();
+ setQueriesToSend();
+ setWaitingFor();
+ state = State.SEND_INFO;
+
+ if (!waitingForQueries.remove(message.getName())) {
+ System.out.println("DEBUG: got query we weren't expecting");
+ }
+ if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) {
+ System.out.println("INFO: done waiting for info");
+ state = state.SEND_INFO_AND_FINISH;
+ }
+ break;
case WAIT_FOR_INFO:
- if (!waitingForQueries.remove(name)) {
+ if (!waitingForQueries.remove(message.getName())) {
System.out.println("DEBUG: got query we weren't expecting");
}
if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) {
diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java
index d443fad..dbacfe5 100644
--- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java
+++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java
@@ -34,6 +34,7 @@ import pl.edu.mimuw.cloudatlas.model.Attribute;
import pl.edu.mimuw.cloudatlas.model.PathName;
import pl.edu.mimuw.cloudatlas.model.TestUtil;
import pl.edu.mimuw.cloudatlas.model.ValueContact;
+import pl.edu.mimuw.cloudatlas.model.ValueDuration;
import pl.edu.mimuw.cloudatlas.model.ValueInt;
import pl.edu.mimuw.cloudatlas.model.ValueQuery;
import pl.edu.mimuw.cloudatlas.model.ValueString;
@@ -57,6 +58,7 @@ public class GossipGirlTest {
private AttributesMessage attributesMessage2;
private QueryMessage queryMessage1;
private QueryMessage queryMessage2;
+ private ValueDuration offset;
private HejkaMessage hejkaMessage;
@@ -84,7 +86,10 @@ public class GossipGirlTest {
Map<PathName, ValueTime> otherZoneTimestamps = makeOtherZoneTimestamps();
Map<Attribute, ValueTime> otherQueryTimestamps = makeOtherQueryTimestamps();
- noCoTamMessage = new NoCoTamMessage("", 0, 42, 0, otherZoneTimestamps, otherQueryTimestamps, ValueUtils.addToTime(testTime, 10), ValueUtils.addToTime(testTime, 22));
+ noCoTamMessage = new NoCoTamMessage("", 0, 42, 0, otherZoneTimestamps, otherQueryTimestamps, ValueUtils.addToTime(testTime, 10), ValueUtils.addToTime(testTime, 30));
+ noCoTamMessage.setSentTimestamp(ValueUtils.addToTime(testTime, 35));
+ noCoTamMessage.setReceivedTimestamp(ValueUtils.addToTime(testTime, 255));
+ offset = new ValueDuration(-100l);
attributesMessage1 = makeAttributesMessage("/son/bro", makeAttributes1());
attributesMessage2 = makeAttributesMessage("/son/whodis", makeAttributes2());
@@ -99,7 +104,7 @@ public class GossipGirlTest {
public QueryMessage makeQueryMessage(String name, String query) throws Exception {
- return new QueryMessage("", 0, new Attribute(name), new ValueQuery(query), 0);
+ return new QueryMessage("", 0, new Attribute(name), new ValueQuery(query), 0, offset);
}
public AttributesMap makeAttributes1() {
@@ -121,12 +126,12 @@ public class GossipGirlTest {
}
public AttributesMessage makeAttributesMessage(String path, AttributesMap attributes) {
- return new AttributesMessage("", 0, new PathName(path), attributes, 0);
+ return new AttributesMessage("", 0, new PathName(path), attributes, 0, offset);
}
public Map<PathName, ValueTime> makeOtherZoneTimestamps() {
Map<PathName, ValueTime> zoneTimestamps = new HashMap();
- addOtherZoneTimestamp(zoneTimestamps, "/son/sis", -100);
+ addOtherZoneTimestamp(zoneTimestamps, "/son/sis", -120);
addOtherZoneTimestamp(zoneTimestamps, "/son/bro", 0);
addOtherZoneTimestamp(zoneTimestamps, "/son/whodis", -300);
@@ -272,8 +277,7 @@ public class GossipGirlTest {
assertEquals(StanikMessage.Type.UPDATE_ATTRIBUTES, stanikMessage1.getType());
UpdateAttributesMessage updateMessage1 = (UpdateAttributesMessage) stanikMessage1;
assertEquals("/son/bro", updateMessage1.getPathName());
- // TODO: this should be modified by GTP
- assertEquals(testTime, updateMessage1.getAttributes().getOrNull("timestamp"));
+ assertEquals(ValueUtils.addToTime(testTime, 100), updateMessage1.getAttributes().getOrNull("timestamp"));
assertEquals(new ValueInt(140l), updateMessage1.getAttributes().getOrNull("foo"));
assertEquals(new ValueString(":wq"), updateMessage1.getAttributes().getOrNull("bar"));
@@ -289,7 +293,6 @@ public class GossipGirlTest {
assertEquals(updateMessage2.getQueries().get(new Attribute("&one")),
new SimpleImmutableEntry(
new ValueQuery("SELECT 3 AS one"),
- // TODO: this should be modified by GTP
ValueUtils.addToTime(testTime, 10)
)
);
@@ -385,8 +388,7 @@ public class GossipGirlTest {
assertEquals(StanikMessage.Type.UPDATE_ATTRIBUTES, stanikMessage1.getType());
UpdateAttributesMessage updateMessage1 = (UpdateAttributesMessage) stanikMessage1;
assertEquals("/son/bro", updateMessage1.getPathName());
- // TODO: this should be modified by GTP
- assertEquals(testTime, updateMessage1.getAttributes().getOrNull("timestamp"));
+ assertEquals(ValueUtils.addToTime(testTime, -100), updateMessage1.getAttributes().getOrNull("timestamp"));
assertEquals(new ValueInt(140l), updateMessage1.getAttributes().getOrNull("foo"));
assertEquals(new ValueString(":wq"), updateMessage1.getAttributes().getOrNull("bar"));
@@ -402,7 +404,6 @@ public class GossipGirlTest {
assertEquals(updateMessage2.getQueries().get(new Attribute("&one")),
new SimpleImmutableEntry(
new ValueQuery("SELECT 3 AS one"),
- // TODO: this should be modified by GTP
ValueUtils.addToTime(testTime, 10)
)
);
@@ -462,6 +463,7 @@ public class GossipGirlTest {
QueryMessage queryMessage = (QueryMessage) ((UDUPMessage) message).getContent();
assertEquals(new Attribute(name), queryMessage.getName());
assertEquals(new ValueQuery(query), queryMessage.getQuery());
+ assertEquals(new ValueDuration(-100l), queryMessage.getOffset());
}
private void assertAttributeMessage(AgentMessage message, String recipientPath, String zonePath) throws Exception {
@@ -472,6 +474,7 @@ public class GossipGirlTest {
);
AttributesMessage attributesMessage = (AttributesMessage) ((UDUPMessage) message).getContent();
assertEquals(new PathName(zonePath), attributesMessage.getPath());
+ assertEquals(new ValueDuration(-100l), attributesMessage.getOffset());
}
private void assertUDUPMessage(AgentMessage message, PathName destinationName, GossipGirlMessage.Type type) throws Exception {