diff options
author | Martin <marcin.j.chrzanowski@gmail.com> | 2020-01-12 21:00:34 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-12 21:00:34 +0100 |
commit | 32bfe8f7efc1f4fb99ddf827a19ab466724dac06 (patch) | |
tree | 70b2bdcbd091cfe32a208198d0490f8b7a1c7ecd /src/main/java/pl/edu/mimuw/cloudatlas/agent/modules | |
parent | 48367af9a7c2e46de51c29cd9ad84e5fdae5c2df (diff) | |
parent | 84b686eb2e4e2eccde13f7cee1987b4211660729 (diff) |
Merge pull request #114 from m-chrzan/gjetep
Use GTP time protocol for gossiping
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent/modules')
-rw-r--r-- | src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java | 8 | ||||
-rw-r--r-- | src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java | 50 |
2 files changed, 51 insertions, 7 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index 440df33..5199e82 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -159,14 +159,14 @@ public class GossipGirl extends Module { private void sendInfo(GossipGirlState state) throws InterruptedException { System.out.println("DEBUG: about to send info"); for (ZMI zmi : state.getZMIsToSend()) { - AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId); + AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId, state.offset); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage); System.out.println("INFO: GossipGirl sending AttributesMessage"); sendMessage(udupMessage); } for (Entry<Attribute, ValueQuery> query : state.getQueriesToSend()) { - QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId); + QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId, state.offset); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage); System.out.println("INFO: GossipGirl sending QueryMessage"); sendMessage(udupMessage); @@ -183,7 +183,7 @@ public class GossipGirl extends Module { if (state.state == GossipGirlState.State.SEND_INFO || state.state == GossipGirlState.State.SEND_INFO_AND_FINISH) { sendInfo(state); } - UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes()); + UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), state.modifyAttributes(message.getAttributes())); System.out.println("INFO: GossipGirl sending UpdateAttributesMessage"); sendMessage(updateMessage); if (state.state == GossipGirlState.State.FINISHED) { @@ -199,7 +199,7 @@ public class GossipGirl extends Module { if (state != null) { System.out.println("INFO: handling Query in " + Long.toString(message.getReceiverGossipId())); state.setLastAction(); - state.gotQuery(message.getName()); + state.gotQuery(message); Map<Attribute, Entry<ValueQuery, ValueTime>> queries = new HashMap(); queries.put( message.getName(), diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index e9bc02a..251d8b3 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -13,9 +13,13 @@ import java.util.Set; import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage; import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage; import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.QueryMessage; import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.AttributesMap; import pl.edu.mimuw.cloudatlas.model.PathName; import pl.edu.mimuw.cloudatlas.model.ValueContact; +import pl.edu.mimuw.cloudatlas.model.ValueDuration; +import pl.edu.mimuw.cloudatlas.model.ValueInt; import pl.edu.mimuw.cloudatlas.model.ValueQuery; import pl.edu.mimuw.cloudatlas.model.ValueTime; import pl.edu.mimuw.cloudatlas.model.ValueUtils; @@ -48,18 +52,21 @@ public class GossipGirlState { public ValueTime hejkaSendTimestamp; public ValueTime hejkaReceiveTimestamp; public ValueTime noCoTamSendTimestamp; - public ValueTime noCoTamSendReceiveTimestamp; + public ValueTime noCoTamReceiveTimestamp; + public ValueDuration offset; private Map<PathName, ValueTime> theirZoneTimestamps; private Map<Attribute, ValueTime> theirQueryTimestamps; private List<PathName> zonesToSend; private List<Attribute> queriesToSend; private Set<PathName> waitingForZones; private Set<Attribute> waitingForQueries; + private boolean initiating; public GossipGirlState(long gossipId, PathName ourPath, ValueContact theirContact, boolean initiating) { this.gossipId = gossipId; this.ourPath = ourPath; this.theirContact = theirContact; + this.initiating = initiating; System.out.println("INFO: initializing Gossip state, their contact " + theirContact.toString()); if (initiating) { state = State.WAIT_FOR_STATE_INITIALIZER; @@ -139,6 +146,9 @@ public class GossipGirlState { theirQueryTimestamps = message.getQueryTimestamps(); hejkaSendTimestamp = message.getHejkaSendTimestamp(); hejkaReceiveTimestamp = message.getHejkaReceiveTimestamp(); + noCoTamSendTimestamp = message.getSentTimestamp(); + noCoTamReceiveTimestamp = message.getReceivedTimestamp(); + computeOffset(); System.out.println("DEBUG: set basic stuff"); setZonesToSend(); setQueriesToSend(); @@ -152,6 +162,23 @@ public class GossipGirlState { } } + public void computeOffset() { + ValueDuration rtd = (ValueDuration) (noCoTamReceiveTimestamp.subtract(hejkaSendTimestamp)) + .subtract(noCoTamSendTimestamp.subtract(hejkaReceiveTimestamp)); + offset = (ValueDuration) (noCoTamSendTimestamp.addValue(rtd.divide(new ValueInt(2l)))) + .subtract(noCoTamReceiveTimestamp); + System.out.println("INFO: GossipGirlState calculated offset: " + offset.toString()); + } + + public AttributesMap modifyAttributes(AttributesMap attributes) { + ValueDuration delta = offset; + if (!initiating) { + delta = delta.negate(); + } + attributes.addOrChange("timestamp", attributes.getOrNull("timestamp").subtract(delta)); + return attributes; + } + private void setWaitingFor() { setWaitingForZones(); setWaitingForQueries(); @@ -305,6 +332,7 @@ public class GossipGirlState { setZonesToSend(); setQueriesToSend(); setWaitingFor(); + offset = message.getOffset(); state = State.SEND_INFO; if (!waitingForZones.remove(message.getPath())) { @@ -321,10 +349,26 @@ public class GossipGirlState { } } - public void gotQuery(Attribute name) { + public void gotQuery(QueryMessage message) { switch (state) { + case WAIT_FOR_FIRST_INFO: + // TODO: use offset to setup GTP + offset = message.getOffset(); + setZonesToSend(); + setQueriesToSend(); + setWaitingFor(); + state = State.SEND_INFO; + + if (!waitingForQueries.remove(message.getName())) { + System.out.println("DEBUG: got query we weren't expecting"); + } + if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { + System.out.println("INFO: done waiting for info"); + state = state.SEND_INFO_AND_FINISH; + } + break; case WAIT_FOR_INFO: - if (!waitingForQueries.remove(name)) { + if (!waitingForQueries.remove(message.getName())) { System.out.println("DEBUG: got query we weren't expecting"); } if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { |