diff options
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent')
-rw-r--r-- | src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java | 42 | ||||
-rw-r--r-- | src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java | 114 |
2 files changed, 139 insertions, 17 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index 73aef8b..e7bd227 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -1,5 +1,6 @@ package pl.edu.mimuw.cloudatlas.agent.modules; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -14,6 +15,8 @@ import pl.edu.mimuw.cloudatlas.agent.messages.QueryMessage; import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage; import pl.edu.mimuw.cloudatlas.agent.messages.StateMessage; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UpdateQueriesMessage; import pl.edu.mimuw.cloudatlas.model.Attribute; import pl.edu.mimuw.cloudatlas.model.AttributesMap; import pl.edu.mimuw.cloudatlas.model.PathName; @@ -38,6 +41,12 @@ public class GossipGirl extends Module { case NO_CO_TAM: handleNoCoTam((NoCoTamMessage) message); break; + case ATTRIBUTES: + handleAttributes((AttributesMessage) message); + break; + case QUERY: + handleQuery((QueryMessage) message); + break; default: throw new InvalidMessageType("This type of message cannot be handled by GossipGirl"); } @@ -103,4 +112,37 @@ public class GossipGirl extends Module { System.out.println("ERROR: GossipGirl got state for a nonexistent gossip"); } } + + private void handleAttributes(AttributesMessage message) throws InterruptedException { + GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); + if (state != null) { + state.gotAttributesFor(message.getPath()); + UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes()); + sendMessage(updateMessage); + if (state.state == GossipGirlState.State.FINISHED) { + gossipStates.remove(message.getReceiverGossipId()); + } + } else { + System.out.println("ERROR: GossipGirl got attributes for a nonexistent gossip"); + } + } + + private void handleQuery(QueryMessage message) throws InterruptedException { + GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); + if (state != null) { + state.gotQuery(message.getName()); + Map<Attribute, Entry<ValueQuery, ValueTime>> queries = new HashMap(); + queries.put( + message.getName(), + new SimpleImmutableEntry(message.getQuery(), state.getTheirQueryTimestamp(message.getName())) + ); + UpdateQueriesMessage updateMessage = new UpdateQueriesMessage("", 0, queries); + sendMessage(updateMessage); + if (state.state == GossipGirlState.State.FINISHED) { + gossipStates.remove(message.getReceiverGossipId()); + } + } else { + System.out.println("ERROR: GossipGirl got query for a nonexistent gossip"); + } + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index df17957..8b0711e 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -2,11 +2,13 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage; import pl.edu.mimuw.cloudatlas.model.Attribute; @@ -26,6 +28,7 @@ public class GossipGirlState { WAIT_FOR_NO_CO_TAM, WAIT_FOR_FIRST_INFO, WAIT_FOR_INFO, + FINISHED, ERROR } public PathName ourPath; @@ -42,6 +45,10 @@ public class GossipGirlState { public ValueTime noCoTamSendReceiveTimestamp; private Map<PathName, ValueTime> theirZoneTimestamps; private Map<Attribute, ValueTime> theirQueryTimestamps; + private List<PathName> zonesToSend; + private List<Attribute> queriesToSend; + private Set<PathName> waitingForZones; + private Set<Attribute> waitingForQueries; public GossipGirlState(long gossipId, PathName ourPath, ValueContact theirContact, boolean initiating) { this.gossipId = gossipId; @@ -91,6 +98,9 @@ public class GossipGirlState { theirQueryTimestamps = message.getQueryTimestamps(); hejkaSendTimestamp = message.getHejkaSendTimestamp(); hejkaReceiveTimestamp = message.getHejkaReceiveTimestamp(); + setZonesToSend(); + setQueriesToSend(); + setWaitingFor(); state = State.SEND_INFO; break; default: @@ -99,6 +109,25 @@ public class GossipGirlState { } } + private void setWaitingFor() { + setWaitingForZones(); + setWaitingForQueries(); + } + + private void setWaitingForZones() { + waitingForZones = new HashSet(theirZoneTimestamps.keySet()); + for (PathName path : zonesToSend) { + waitingForZones.remove(path); + } + } + + private void setWaitingForQueries() { + waitingForQueries = new HashSet(theirQueryTimestamps.keySet()); + for (Attribute name : queriesToSend) { + waitingForQueries.remove(name); + } + } + public Map<PathName, ValueTime> getZoneTimestampsToSend() { Map<PathName, ValueTime> timestamps = new HashMap(); collectZoneTimestamps(timestamps, hierarchy, theirContact.getName()); @@ -114,35 +143,48 @@ public class GossipGirlState { return queryTimestamps; } - public List<ZMI> getZMIsToSend() { - List<ZMI> zmis = new LinkedList(); + public void setZonesToSend() { + zonesToSend = new LinkedList(); for (Entry<PathName, ValueTime> timestampedPath : getZoneTimestampsToSend().entrySet()) { ValueTime theirTimestamp = theirZoneTimestamps.get(timestampedPath.getKey()); if (theirTimestamp == null || ValueUtils.valueLower(theirTimestamp, timestampedPath.getValue())) { - System.out.println("going to send " + timestampedPath.getKey().toString()); - try { - zmis.add(hierarchy.findDescendant(timestampedPath.getKey())); - } catch (ZMI.NoSuchZoneException e) { - System.out.println("ERROR: didn't find a zone we wanted to send in getZMIsToSend"); - } + zonesToSend.add(timestampedPath.getKey()); } } - return zmis; } - public List<Entry<Attribute, ValueQuery>> getQueriesToSend() { - List<Entry<Attribute, ValueQuery>> queryList = new LinkedList(); + public void setQueriesToSend() { + queriesToSend = new LinkedList(); for (Entry<Attribute, ValueTime> timestampedQuery : getQueryTimestampsToSend().entrySet()) { ValueTime theirTimestamp = theirQueryTimestamps.get(timestampedQuery.getKey()); if (theirTimestamp == null || ValueUtils.valueLower(theirTimestamp, timestampedQuery.getValue())) { - queryList.add( - new SimpleImmutableEntry( - timestampedQuery.getKey(), - queries.get(timestampedQuery.getKey()).getKey() - ) - ); + queriesToSend.add(timestampedQuery.getKey()); } } + } + + public List<ZMI> getZMIsToSend() { + List<ZMI> zmis = new LinkedList(); + for (PathName path : zonesToSend) { + try { + zmis.add(hierarchy.findDescendant(path)); + } catch (ZMI.NoSuchZoneException e) { + System.out.println("ERROR: didn't find a zone we wanted to send in getZMIsToSend"); + } + } + return zmis; + } + + public List<Entry<Attribute, ValueQuery>> getQueriesToSend() { + List<Entry<Attribute, ValueQuery>> queryList = new LinkedList(); + for (Attribute name : queriesToSend) { + queryList.add( + new SimpleImmutableEntry( + name, + queries.get(name).getKey() + ) + ); + } return queryList; } @@ -194,4 +236,42 @@ public class GossipGirlState { state = State.ERROR; } } + + public void gotAttributesFor(PathName path) { + switch (state) { + case WAIT_FOR_INFO: + if (!waitingForZones.remove(path)) { + System.out.println("DEBUG: got zone attributes we weren't expecting"); + } + if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { + System.out.println("INFO: done waiting for info"); + state = state.FINISHED; + } + break; + default: + System.out.println("ERROR: got attributes when not expected"); + state = State.ERROR; + } + } + + public void gotQuery(Attribute name) { + switch (state) { + case WAIT_FOR_INFO: + if (!waitingForQueries.remove(name)) { + System.out.println("DEBUG: got query we weren't expecting"); + } + if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { + System.out.println("INFO: done waiting for info"); + state = state.FINISHED; + } + break; + default: + System.out.println("ERROR: got query when not expected"); + state = State.ERROR; + } + } + + public ValueTime getTheirQueryTimestamp(Attribute name) { + return theirQueryTimestamps.get(name); + } } |