diff options
5 files changed, 80 insertions, 19 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java index e4e3cb7..e3bd390 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java @@ -3,18 +3,21 @@ package pl.edu.mimuw.cloudatlas.agent.messages; import java.util.Map; import pl.edu.mimuw.cloudatlas.model.AttributesMap; +import pl.edu.mimuw.cloudatlas.model.ValueDuration; import pl.edu.mimuw.cloudatlas.model.PathName; public class AttributesMessage extends RemoteGossipGirlMessage { private PathName path; private AttributesMap attributes; private long receiverGossipId; + private ValueDuration offset; - public AttributesMessage(String messageId, long timestamp, PathName path, AttributesMap attributes, long receiverGossipId) { + public AttributesMessage(String messageId, long timestamp, PathName path, AttributesMap attributes, long receiverGossipId, ValueDuration offset) { super(messageId, timestamp, Type.ATTRIBUTES); this.path = path; this.attributes = attributes; this.receiverGossipId = receiverGossipId; + this.offset = offset; } private AttributesMessage() {} @@ -30,4 +33,8 @@ public class AttributesMessage extends RemoteGossipGirlMessage { public long getReceiverGossipId() { return receiverGossipId; } + + public ValueDuration getOffset() { + return offset; + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java index e457c21..77a2068 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java @@ -1,18 +1,21 @@ package pl.edu.mimuw.cloudatlas.agent.messages; import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.ValueDuration; import pl.edu.mimuw.cloudatlas.model.ValueQuery; public class QueryMessage extends RemoteGossipGirlMessage { private Attribute name; private ValueQuery query; private long receiverGossipId; + private ValueDuration offset; - public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId) { + public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId, ValueDuration offset) { super(messageId, timestamp, Type.QUERY); this.name = name; this.query = query; this.receiverGossipId = receiverGossipId; + this.offset = offset; } public QueryMessage() {} @@ -28,4 +31,8 @@ public class QueryMessage extends RemoteGossipGirlMessage { public long getReceiverGossipId() { return receiverGossipId; } + + public ValueDuration getOffset() { + return offset; + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index 440df33..5199e82 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -159,14 +159,14 @@ public class GossipGirl extends Module { private void sendInfo(GossipGirlState state) throws InterruptedException { System.out.println("DEBUG: about to send info"); for (ZMI zmi : state.getZMIsToSend()) { - AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId); + AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId, state.offset); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage); System.out.println("INFO: GossipGirl sending AttributesMessage"); sendMessage(udupMessage); } for (Entry<Attribute, ValueQuery> query : state.getQueriesToSend()) { - QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId); + QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId, state.offset); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage); System.out.println("INFO: GossipGirl sending QueryMessage"); sendMessage(udupMessage); @@ -183,7 +183,7 @@ public class GossipGirl extends Module { if (state.state == GossipGirlState.State.SEND_INFO || state.state == GossipGirlState.State.SEND_INFO_AND_FINISH) { sendInfo(state); } - UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes()); + UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), state.modifyAttributes(message.getAttributes())); System.out.println("INFO: GossipGirl sending UpdateAttributesMessage"); sendMessage(updateMessage); if (state.state == GossipGirlState.State.FINISHED) { @@ -199,7 +199,7 @@ public class GossipGirl extends Module { if (state != null) { System.out.println("INFO: handling Query in " + Long.toString(message.getReceiverGossipId())); state.setLastAction(); - state.gotQuery(message.getName()); + state.gotQuery(message); Map<Attribute, Entry<ValueQuery, ValueTime>> queries = new HashMap(); queries.put( message.getName(), diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index e9bc02a..251d8b3 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -13,9 +13,13 @@ import java.util.Set; import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage; import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage; import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.QueryMessage; import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.AttributesMap; import pl.edu.mimuw.cloudatlas.model.PathName; import pl.edu.mimuw.cloudatlas.model.ValueContact; +import pl.edu.mimuw.cloudatlas.model.ValueDuration; +import pl.edu.mimuw.cloudatlas.model.ValueInt; import pl.edu.mimuw.cloudatlas.model.ValueQuery; import pl.edu.mimuw.cloudatlas.model.ValueTime; import pl.edu.mimuw.cloudatlas.model.ValueUtils; @@ -48,18 +52,21 @@ public class GossipGirlState { public ValueTime hejkaSendTimestamp; public ValueTime hejkaReceiveTimestamp; public ValueTime noCoTamSendTimestamp; - public ValueTime noCoTamSendReceiveTimestamp; + public ValueTime noCoTamReceiveTimestamp; + public ValueDuration offset; private Map<PathName, ValueTime> theirZoneTimestamps; private Map<Attribute, ValueTime> theirQueryTimestamps; private List<PathName> zonesToSend; private List<Attribute> queriesToSend; private Set<PathName> waitingForZones; private Set<Attribute> waitingForQueries; + private boolean initiating; public GossipGirlState(long gossipId, PathName ourPath, ValueContact theirContact, boolean initiating) { this.gossipId = gossipId; this.ourPath = ourPath; this.theirContact = theirContact; + this.initiating = initiating; System.out.println("INFO: initializing Gossip state, their contact " + theirContact.toString()); if (initiating) { state = State.WAIT_FOR_STATE_INITIALIZER; @@ -139,6 +146,9 @@ public class GossipGirlState { theirQueryTimestamps = message.getQueryTimestamps(); hejkaSendTimestamp = message.getHejkaSendTimestamp(); hejkaReceiveTimestamp = message.getHejkaReceiveTimestamp(); + noCoTamSendTimestamp = message.getSentTimestamp(); + noCoTamReceiveTimestamp = message.getReceivedTimestamp(); + computeOffset(); System.out.println("DEBUG: set basic stuff"); setZonesToSend(); setQueriesToSend(); @@ -152,6 +162,23 @@ public class GossipGirlState { } } + public void computeOffset() { + ValueDuration rtd = (ValueDuration) (noCoTamReceiveTimestamp.subtract(hejkaSendTimestamp)) + .subtract(noCoTamSendTimestamp.subtract(hejkaReceiveTimestamp)); + offset = (ValueDuration) (noCoTamSendTimestamp.addValue(rtd.divide(new ValueInt(2l)))) + .subtract(noCoTamReceiveTimestamp); + System.out.println("INFO: GossipGirlState calculated offset: " + offset.toString()); + } + + public AttributesMap modifyAttributes(AttributesMap attributes) { + ValueDuration delta = offset; + if (!initiating) { + delta = delta.negate(); + } + attributes.addOrChange("timestamp", attributes.getOrNull("timestamp").subtract(delta)); + return attributes; + } + private void setWaitingFor() { setWaitingForZones(); setWaitingForQueries(); @@ -305,6 +332,7 @@ public class GossipGirlState { setZonesToSend(); setQueriesToSend(); setWaitingFor(); + offset = message.getOffset(); state = State.SEND_INFO; if (!waitingForZones.remove(message.getPath())) { @@ -321,10 +349,26 @@ public class GossipGirlState { } } - public void gotQuery(Attribute name) { + public void gotQuery(QueryMessage message) { switch (state) { + case WAIT_FOR_FIRST_INFO: + // TODO: use offset to setup GTP + offset = message.getOffset(); + setZonesToSend(); + setQueriesToSend(); + setWaitingFor(); + state = State.SEND_INFO; + + if (!waitingForQueries.remove(message.getName())) { + System.out.println("DEBUG: got query we weren't expecting"); + } + if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { + System.out.println("INFO: done waiting for info"); + state = state.SEND_INFO_AND_FINISH; + } + break; case WAIT_FOR_INFO: - if (!waitingForQueries.remove(name)) { + if (!waitingForQueries.remove(message.getName())) { System.out.println("DEBUG: got query we weren't expecting"); } if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java index d443fad..dbacfe5 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java @@ -34,6 +34,7 @@ import pl.edu.mimuw.cloudatlas.model.Attribute; import pl.edu.mimuw.cloudatlas.model.PathName; import pl.edu.mimuw.cloudatlas.model.TestUtil; import pl.edu.mimuw.cloudatlas.model.ValueContact; +import pl.edu.mimuw.cloudatlas.model.ValueDuration; import pl.edu.mimuw.cloudatlas.model.ValueInt; import pl.edu.mimuw.cloudatlas.model.ValueQuery; import pl.edu.mimuw.cloudatlas.model.ValueString; @@ -57,6 +58,7 @@ public class GossipGirlTest { private AttributesMessage attributesMessage2; private QueryMessage queryMessage1; private QueryMessage queryMessage2; + private ValueDuration offset; private HejkaMessage hejkaMessage; @@ -84,7 +86,10 @@ public class GossipGirlTest { Map<PathName, ValueTime> otherZoneTimestamps = makeOtherZoneTimestamps(); Map<Attribute, ValueTime> otherQueryTimestamps = makeOtherQueryTimestamps(); - noCoTamMessage = new NoCoTamMessage("", 0, 42, 0, otherZoneTimestamps, otherQueryTimestamps, ValueUtils.addToTime(testTime, 10), ValueUtils.addToTime(testTime, 22)); + noCoTamMessage = new NoCoTamMessage("", 0, 42, 0, otherZoneTimestamps, otherQueryTimestamps, ValueUtils.addToTime(testTime, 10), ValueUtils.addToTime(testTime, 30)); + noCoTamMessage.setSentTimestamp(ValueUtils.addToTime(testTime, 35)); + noCoTamMessage.setReceivedTimestamp(ValueUtils.addToTime(testTime, 255)); + offset = new ValueDuration(-100l); attributesMessage1 = makeAttributesMessage("/son/bro", makeAttributes1()); attributesMessage2 = makeAttributesMessage("/son/whodis", makeAttributes2()); @@ -99,7 +104,7 @@ public class GossipGirlTest { public QueryMessage makeQueryMessage(String name, String query) throws Exception { - return new QueryMessage("", 0, new Attribute(name), new ValueQuery(query), 0); + return new QueryMessage("", 0, new Attribute(name), new ValueQuery(query), 0, offset); } public AttributesMap makeAttributes1() { @@ -121,12 +126,12 @@ public class GossipGirlTest { } public AttributesMessage makeAttributesMessage(String path, AttributesMap attributes) { - return new AttributesMessage("", 0, new PathName(path), attributes, 0); + return new AttributesMessage("", 0, new PathName(path), attributes, 0, offset); } public Map<PathName, ValueTime> makeOtherZoneTimestamps() { Map<PathName, ValueTime> zoneTimestamps = new HashMap(); - addOtherZoneTimestamp(zoneTimestamps, "/son/sis", -100); + addOtherZoneTimestamp(zoneTimestamps, "/son/sis", -120); addOtherZoneTimestamp(zoneTimestamps, "/son/bro", 0); addOtherZoneTimestamp(zoneTimestamps, "/son/whodis", -300); @@ -272,8 +277,7 @@ public class GossipGirlTest { assertEquals(StanikMessage.Type.UPDATE_ATTRIBUTES, stanikMessage1.getType()); UpdateAttributesMessage updateMessage1 = (UpdateAttributesMessage) stanikMessage1; assertEquals("/son/bro", updateMessage1.getPathName()); - // TODO: this should be modified by GTP - assertEquals(testTime, updateMessage1.getAttributes().getOrNull("timestamp")); + assertEquals(ValueUtils.addToTime(testTime, 100), updateMessage1.getAttributes().getOrNull("timestamp")); assertEquals(new ValueInt(140l), updateMessage1.getAttributes().getOrNull("foo")); assertEquals(new ValueString(":wq"), updateMessage1.getAttributes().getOrNull("bar")); @@ -289,7 +293,6 @@ public class GossipGirlTest { assertEquals(updateMessage2.getQueries().get(new Attribute("&one")), new SimpleImmutableEntry( new ValueQuery("SELECT 3 AS one"), - // TODO: this should be modified by GTP ValueUtils.addToTime(testTime, 10) ) ); @@ -385,8 +388,7 @@ public class GossipGirlTest { assertEquals(StanikMessage.Type.UPDATE_ATTRIBUTES, stanikMessage1.getType()); UpdateAttributesMessage updateMessage1 = (UpdateAttributesMessage) stanikMessage1; assertEquals("/son/bro", updateMessage1.getPathName()); - // TODO: this should be modified by GTP - assertEquals(testTime, updateMessage1.getAttributes().getOrNull("timestamp")); + assertEquals(ValueUtils.addToTime(testTime, -100), updateMessage1.getAttributes().getOrNull("timestamp")); assertEquals(new ValueInt(140l), updateMessage1.getAttributes().getOrNull("foo")); assertEquals(new ValueString(":wq"), updateMessage1.getAttributes().getOrNull("bar")); @@ -402,7 +404,6 @@ public class GossipGirlTest { assertEquals(updateMessage2.getQueries().get(new Attribute("&one")), new SimpleImmutableEntry( new ValueQuery("SELECT 3 AS one"), - // TODO: this should be modified by GTP ValueUtils.addToTime(testTime, 10) ) ); @@ -462,6 +463,7 @@ public class GossipGirlTest { QueryMessage queryMessage = (QueryMessage) ((UDUPMessage) message).getContent(); assertEquals(new Attribute(name), queryMessage.getName()); assertEquals(new ValueQuery(query), queryMessage.getQuery()); + assertEquals(new ValueDuration(-100l), queryMessage.getOffset()); } private void assertAttributeMessage(AgentMessage message, String recipientPath, String zonePath) throws Exception { @@ -472,6 +474,7 @@ public class GossipGirlTest { ); AttributesMessage attributesMessage = (AttributesMessage) ((UDUPMessage) message).getContent(); assertEquals(new PathName(zonePath), attributesMessage.getPath()); + assertEquals(new ValueDuration(-100l), attributesMessage.getOffset()); } private void assertUDUPMessage(AgentMessage message, PathName destinationName, GossipGirlMessage.Type type) throws Exception { |