diff options
Diffstat (limited to 'src/main')
4 files changed, 67 insertions, 9 deletions
| diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java index e4e3cb7..e3bd390 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java @@ -3,18 +3,21 @@ package pl.edu.mimuw.cloudatlas.agent.messages;  import java.util.Map;  import pl.edu.mimuw.cloudatlas.model.AttributesMap; +import pl.edu.mimuw.cloudatlas.model.ValueDuration;  import pl.edu.mimuw.cloudatlas.model.PathName;  public class AttributesMessage extends RemoteGossipGirlMessage {      private PathName path;      private AttributesMap attributes;      private long receiverGossipId; +    private ValueDuration offset; -    public AttributesMessage(String messageId, long timestamp, PathName path, AttributesMap attributes, long receiverGossipId) { +    public AttributesMessage(String messageId, long timestamp, PathName path, AttributesMap attributes, long receiverGossipId, ValueDuration offset) {          super(messageId, timestamp, Type.ATTRIBUTES);          this.path = path;          this.attributes = attributes;          this.receiverGossipId = receiverGossipId; +        this.offset = offset;      }      private AttributesMessage() {} @@ -30,4 +33,8 @@ public class AttributesMessage extends RemoteGossipGirlMessage {      public long getReceiverGossipId() {          return receiverGossipId;      } + +    public ValueDuration getOffset() { +        return offset; +    }  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java index e457c21..77a2068 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java @@ -1,18 +1,21 @@  package pl.edu.mimuw.cloudatlas.agent.messages;  import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.ValueDuration;  import pl.edu.mimuw.cloudatlas.model.ValueQuery;  public class QueryMessage extends RemoteGossipGirlMessage {      private Attribute name;      private ValueQuery query;      private long receiverGossipId; +    private ValueDuration offset; -    public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId) { +    public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId, ValueDuration offset) {          super(messageId, timestamp, Type.QUERY);          this.name = name;          this.query = query;          this.receiverGossipId = receiverGossipId; +        this.offset = offset;      }      public QueryMessage() {} @@ -28,4 +31,8 @@ public class QueryMessage extends RemoteGossipGirlMessage {      public long getReceiverGossipId() {          return receiverGossipId;      } + +    public ValueDuration getOffset() { +        return offset; +    }  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index 440df33..5199e82 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -159,14 +159,14 @@ public class GossipGirl extends Module {      private void sendInfo(GossipGirlState state) throws InterruptedException {          System.out.println("DEBUG: about to send info");          for (ZMI zmi : state.getZMIsToSend()) { -            AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId); +            AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId, state.offset);              UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage);              System.out.println("INFO: GossipGirl sending AttributesMessage");              sendMessage(udupMessage);          }          for (Entry<Attribute, ValueQuery> query : state.getQueriesToSend()) { -            QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId); +            QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId, state.offset);              UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage);              System.out.println("INFO: GossipGirl sending QueryMessage");              sendMessage(udupMessage); @@ -183,7 +183,7 @@ public class GossipGirl extends Module {              if (state.state == GossipGirlState.State.SEND_INFO || state.state == GossipGirlState.State.SEND_INFO_AND_FINISH) {                  sendInfo(state);              } -            UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes()); +            UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), state.modifyAttributes(message.getAttributes()));              System.out.println("INFO: GossipGirl sending UpdateAttributesMessage");              sendMessage(updateMessage);              if (state.state == GossipGirlState.State.FINISHED) { @@ -199,7 +199,7 @@ public class GossipGirl extends Module {          if (state != null) {              System.out.println("INFO: handling Query in " + Long.toString(message.getReceiverGossipId()));              state.setLastAction(); -            state.gotQuery(message.getName()); +            state.gotQuery(message);              Map<Attribute, Entry<ValueQuery, ValueTime>> queries = new HashMap();              queries.put(                  message.getName(), diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index e9bc02a..251d8b3 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -13,9 +13,13 @@ import java.util.Set;  import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage;  import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage;  import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.QueryMessage;  import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.AttributesMap;  import pl.edu.mimuw.cloudatlas.model.PathName;  import pl.edu.mimuw.cloudatlas.model.ValueContact; +import pl.edu.mimuw.cloudatlas.model.ValueDuration; +import pl.edu.mimuw.cloudatlas.model.ValueInt;  import pl.edu.mimuw.cloudatlas.model.ValueQuery;  import pl.edu.mimuw.cloudatlas.model.ValueTime;  import pl.edu.mimuw.cloudatlas.model.ValueUtils; @@ -48,18 +52,21 @@ public class GossipGirlState {      public ValueTime hejkaSendTimestamp;      public ValueTime hejkaReceiveTimestamp;      public ValueTime noCoTamSendTimestamp; -    public ValueTime noCoTamSendReceiveTimestamp; +    public ValueTime noCoTamReceiveTimestamp; +    public ValueDuration offset;      private Map<PathName, ValueTime> theirZoneTimestamps;      private Map<Attribute, ValueTime> theirQueryTimestamps;      private List<PathName> zonesToSend;      private List<Attribute> queriesToSend;      private Set<PathName> waitingForZones;      private Set<Attribute> waitingForQueries; +    private boolean initiating;      public GossipGirlState(long gossipId, PathName ourPath, ValueContact theirContact, boolean initiating) {          this.gossipId = gossipId;          this.ourPath = ourPath;          this.theirContact = theirContact; +        this.initiating = initiating;          System.out.println("INFO: initializing Gossip state, their contact " + theirContact.toString());          if (initiating) {              state = State.WAIT_FOR_STATE_INITIALIZER; @@ -139,6 +146,9 @@ public class GossipGirlState {                  theirQueryTimestamps = message.getQueryTimestamps();                  hejkaSendTimestamp = message.getHejkaSendTimestamp();                  hejkaReceiveTimestamp = message.getHejkaReceiveTimestamp(); +                noCoTamSendTimestamp = message.getSentTimestamp(); +                noCoTamReceiveTimestamp = message.getReceivedTimestamp(); +                computeOffset();                  System.out.println("DEBUG: set basic stuff");                  setZonesToSend();                  setQueriesToSend(); @@ -152,6 +162,23 @@ public class GossipGirlState {          }      } +    public void computeOffset() { +        ValueDuration rtd = (ValueDuration) (noCoTamReceiveTimestamp.subtract(hejkaSendTimestamp)) +                .subtract(noCoTamSendTimestamp.subtract(hejkaReceiveTimestamp)); +        offset = (ValueDuration) (noCoTamSendTimestamp.addValue(rtd.divide(new ValueInt(2l)))) +                .subtract(noCoTamReceiveTimestamp); +        System.out.println("INFO: GossipGirlState calculated offset: " + offset.toString()); +    } + +    public AttributesMap modifyAttributes(AttributesMap attributes) { +        ValueDuration delta = offset; +        if (!initiating) { +            delta = delta.negate(); +        } +        attributes.addOrChange("timestamp", attributes.getOrNull("timestamp").subtract(delta)); +        return attributes; +    } +      private void setWaitingFor() {          setWaitingForZones();          setWaitingForQueries(); @@ -305,6 +332,7 @@ public class GossipGirlState {                  setZonesToSend();                  setQueriesToSend();                  setWaitingFor(); +                offset = message.getOffset();                  state = State.SEND_INFO;                  if (!waitingForZones.remove(message.getPath())) { @@ -321,10 +349,26 @@ public class GossipGirlState {          }      } -    public void gotQuery(Attribute name) { +    public void gotQuery(QueryMessage message) {          switch (state) { +            case WAIT_FOR_FIRST_INFO: +                // TODO: use offset to setup GTP +                offset = message.getOffset(); +                setZonesToSend(); +                setQueriesToSend(); +                setWaitingFor(); +                state = State.SEND_INFO; + +                if (!waitingForQueries.remove(message.getName())) { +                    System.out.println("DEBUG: got query we weren't expecting"); +                } +                if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { +                    System.out.println("INFO: done waiting for info"); +                    state = state.SEND_INFO_AND_FINISH; +                } +                break;              case WAIT_FOR_INFO: -                if (!waitingForQueries.remove(name)) { +                if (!waitingForQueries.remove(message.getName())) {                      System.out.println("DEBUG: got query we weren't expecting");                  }                  if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { |