From 84b686eb2e4e2eccde13f7cee1987b4211660729 Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sun, 12 Jan 2020 21:00:58 +0100 Subject: Use GTP time protocol for gossiping --- .../mimuw/cloudatlas/agent/modules/GossipGirl.java | 8 ++-- .../cloudatlas/agent/modules/GossipGirlState.java | 50 ++++++++++++++++++++-- 2 files changed, 51 insertions(+), 7 deletions(-) (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent/modules') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index 440df33..5199e82 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -159,14 +159,14 @@ public class GossipGirl extends Module { private void sendInfo(GossipGirlState state) throws InterruptedException { System.out.println("DEBUG: about to send info"); for (ZMI zmi : state.getZMIsToSend()) { - AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId); + AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId, state.offset); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage); System.out.println("INFO: GossipGirl sending AttributesMessage"); sendMessage(udupMessage); } for (Entry query : state.getQueriesToSend()) { - QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId); + QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId, state.offset); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage); System.out.println("INFO: GossipGirl sending QueryMessage"); sendMessage(udupMessage); @@ -183,7 +183,7 @@ public class GossipGirl extends Module { if (state.state == GossipGirlState.State.SEND_INFO || state.state == GossipGirlState.State.SEND_INFO_AND_FINISH) { sendInfo(state); } - UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes()); + UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), state.modifyAttributes(message.getAttributes())); System.out.println("INFO: GossipGirl sending UpdateAttributesMessage"); sendMessage(updateMessage); if (state.state == GossipGirlState.State.FINISHED) { @@ -199,7 +199,7 @@ public class GossipGirl extends Module { if (state != null) { System.out.println("INFO: handling Query in " + Long.toString(message.getReceiverGossipId())); state.setLastAction(); - state.gotQuery(message.getName()); + state.gotQuery(message); Map> queries = new HashMap(); queries.put( message.getName(), diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index e9bc02a..251d8b3 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -13,9 +13,13 @@ import java.util.Set; import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage; import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage; import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.QueryMessage; import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.AttributesMap; import pl.edu.mimuw.cloudatlas.model.PathName; import pl.edu.mimuw.cloudatlas.model.ValueContact; +import pl.edu.mimuw.cloudatlas.model.ValueDuration; +import pl.edu.mimuw.cloudatlas.model.ValueInt; import pl.edu.mimuw.cloudatlas.model.ValueQuery; import pl.edu.mimuw.cloudatlas.model.ValueTime; import pl.edu.mimuw.cloudatlas.model.ValueUtils; @@ -48,18 +52,21 @@ public class GossipGirlState { public ValueTime hejkaSendTimestamp; public ValueTime hejkaReceiveTimestamp; public ValueTime noCoTamSendTimestamp; - public ValueTime noCoTamSendReceiveTimestamp; + public ValueTime noCoTamReceiveTimestamp; + public ValueDuration offset; private Map theirZoneTimestamps; private Map theirQueryTimestamps; private List zonesToSend; private List queriesToSend; private Set waitingForZones; private Set waitingForQueries; + private boolean initiating; public GossipGirlState(long gossipId, PathName ourPath, ValueContact theirContact, boolean initiating) { this.gossipId = gossipId; this.ourPath = ourPath; this.theirContact = theirContact; + this.initiating = initiating; System.out.println("INFO: initializing Gossip state, their contact " + theirContact.toString()); if (initiating) { state = State.WAIT_FOR_STATE_INITIALIZER; @@ -139,6 +146,9 @@ public class GossipGirlState { theirQueryTimestamps = message.getQueryTimestamps(); hejkaSendTimestamp = message.getHejkaSendTimestamp(); hejkaReceiveTimestamp = message.getHejkaReceiveTimestamp(); + noCoTamSendTimestamp = message.getSentTimestamp(); + noCoTamReceiveTimestamp = message.getReceivedTimestamp(); + computeOffset(); System.out.println("DEBUG: set basic stuff"); setZonesToSend(); setQueriesToSend(); @@ -152,6 +162,23 @@ public class GossipGirlState { } } + public void computeOffset() { + ValueDuration rtd = (ValueDuration) (noCoTamReceiveTimestamp.subtract(hejkaSendTimestamp)) + .subtract(noCoTamSendTimestamp.subtract(hejkaReceiveTimestamp)); + offset = (ValueDuration) (noCoTamSendTimestamp.addValue(rtd.divide(new ValueInt(2l)))) + .subtract(noCoTamReceiveTimestamp); + System.out.println("INFO: GossipGirlState calculated offset: " + offset.toString()); + } + + public AttributesMap modifyAttributes(AttributesMap attributes) { + ValueDuration delta = offset; + if (!initiating) { + delta = delta.negate(); + } + attributes.addOrChange("timestamp", attributes.getOrNull("timestamp").subtract(delta)); + return attributes; + } + private void setWaitingFor() { setWaitingForZones(); setWaitingForQueries(); @@ -305,6 +332,7 @@ public class GossipGirlState { setZonesToSend(); setQueriesToSend(); setWaitingFor(); + offset = message.getOffset(); state = State.SEND_INFO; if (!waitingForZones.remove(message.getPath())) { @@ -321,10 +349,26 @@ public class GossipGirlState { } } - public void gotQuery(Attribute name) { + public void gotQuery(QueryMessage message) { switch (state) { + case WAIT_FOR_FIRST_INFO: + // TODO: use offset to setup GTP + offset = message.getOffset(); + setZonesToSend(); + setQueriesToSend(); + setWaitingFor(); + state = State.SEND_INFO; + + if (!waitingForQueries.remove(message.getName())) { + System.out.println("DEBUG: got query we weren't expecting"); + } + if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { + System.out.println("INFO: done waiting for info"); + state = state.SEND_INFO_AND_FINISH; + } + break; case WAIT_FOR_INFO: - if (!waitingForQueries.remove(name)) { + if (!waitingForQueries.remove(message.getName())) { System.out.println("DEBUG: got query we weren't expecting"); } if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { -- cgit v1.2.3