diff options
Diffstat (limited to 'src/main')
| -rw-r--r-- | src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java | 42 | ||||
| -rw-r--r-- | src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java | 114 | 
2 files changed, 139 insertions, 17 deletions
| diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index 73aef8b..e7bd227 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -1,5 +1,6 @@  package pl.edu.mimuw.cloudatlas.agent.modules; +import java.util.AbstractMap.SimpleImmutableEntry;  import java.util.HashMap;  import java.util.Map;  import java.util.Map.Entry; @@ -14,6 +15,8 @@ import pl.edu.mimuw.cloudatlas.agent.messages.QueryMessage;  import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage;  import pl.edu.mimuw.cloudatlas.agent.messages.StateMessage;  import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UpdateQueriesMessage;  import pl.edu.mimuw.cloudatlas.model.Attribute;  import pl.edu.mimuw.cloudatlas.model.AttributesMap;  import pl.edu.mimuw.cloudatlas.model.PathName; @@ -38,6 +41,12 @@ public class GossipGirl extends Module {              case NO_CO_TAM:                  handleNoCoTam((NoCoTamMessage) message);                  break; +            case ATTRIBUTES: +                handleAttributes((AttributesMessage) message); +                break; +            case QUERY: +                handleQuery((QueryMessage) message); +                break;              default:                  throw new InvalidMessageType("This type of message cannot be handled by GossipGirl");          } @@ -103,4 +112,37 @@ public class GossipGirl extends Module {              System.out.println("ERROR: GossipGirl got state for a nonexistent gossip");          }      } + +    private void handleAttributes(AttributesMessage message) throws InterruptedException { +        GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); +        if (state != null) { +            state.gotAttributesFor(message.getPath()); +            UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes()); +            sendMessage(updateMessage); +            if (state.state == GossipGirlState.State.FINISHED) { +                gossipStates.remove(message.getReceiverGossipId()); +            } +        } else { +            System.out.println("ERROR: GossipGirl got attributes for a nonexistent gossip"); +        } +    } + +    private void handleQuery(QueryMessage message) throws InterruptedException { +        GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); +        if (state != null) { +            state.gotQuery(message.getName()); +            Map<Attribute, Entry<ValueQuery, ValueTime>> queries = new HashMap(); +            queries.put( +                message.getName(), +                new SimpleImmutableEntry(message.getQuery(), state.getTheirQueryTimestamp(message.getName())) +            ); +            UpdateQueriesMessage updateMessage = new UpdateQueriesMessage("", 0, queries); +            sendMessage(updateMessage); +            if (state.state == GossipGirlState.State.FINISHED) { +                gossipStates.remove(message.getReceiverGossipId()); +            } +        } else { +            System.out.println("ERROR: GossipGirl got query for a nonexistent gossip"); +        } +    }  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index df17957..8b0711e 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -2,11 +2,13 @@ package pl.edu.mimuw.cloudatlas.agent.modules;  import java.util.AbstractMap.SimpleImmutableEntry;  import java.util.HashMap; +import java.util.HashSet;  import java.util.Iterator;  import java.util.LinkedList;  import java.util.List;  import java.util.Map;  import java.util.Map.Entry; +import java.util.Set;  import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage;  import pl.edu.mimuw.cloudatlas.model.Attribute; @@ -26,6 +28,7 @@ public class GossipGirlState {          WAIT_FOR_NO_CO_TAM,          WAIT_FOR_FIRST_INFO,          WAIT_FOR_INFO, +        FINISHED,          ERROR      }      public PathName ourPath; @@ -42,6 +45,10 @@ public class GossipGirlState {      public ValueTime noCoTamSendReceiveTimestamp;      private Map<PathName, ValueTime> theirZoneTimestamps;      private Map<Attribute, ValueTime> theirQueryTimestamps; +    private List<PathName> zonesToSend; +    private List<Attribute> queriesToSend; +    private Set<PathName> waitingForZones; +    private Set<Attribute> waitingForQueries;      public GossipGirlState(long gossipId, PathName ourPath, ValueContact theirContact, boolean initiating) {          this.gossipId = gossipId; @@ -91,6 +98,9 @@ public class GossipGirlState {                  theirQueryTimestamps = message.getQueryTimestamps();                  hejkaSendTimestamp = message.getHejkaSendTimestamp();                  hejkaReceiveTimestamp = message.getHejkaReceiveTimestamp(); +                setZonesToSend(); +                setQueriesToSend(); +                setWaitingFor();                  state = State.SEND_INFO;                  break;              default: @@ -99,6 +109,25 @@ public class GossipGirlState {          }      } +    private void setWaitingFor() { +        setWaitingForZones(); +        setWaitingForQueries(); +    } + +    private void setWaitingForZones() { +        waitingForZones = new HashSet(theirZoneTimestamps.keySet()); +        for (PathName path : zonesToSend) { +            waitingForZones.remove(path); +        } +    } + +    private void setWaitingForQueries() { +        waitingForQueries = new HashSet(theirQueryTimestamps.keySet()); +        for (Attribute name : queriesToSend) { +            waitingForQueries.remove(name); +        } +    } +      public Map<PathName, ValueTime> getZoneTimestampsToSend() {          Map<PathName, ValueTime> timestamps = new HashMap();          collectZoneTimestamps(timestamps, hierarchy, theirContact.getName()); @@ -114,35 +143,48 @@ public class GossipGirlState {          return queryTimestamps;      } -    public List<ZMI> getZMIsToSend() { -        List<ZMI> zmis = new LinkedList(); +    public void setZonesToSend() { +        zonesToSend = new LinkedList();          for (Entry<PathName, ValueTime> timestampedPath : getZoneTimestampsToSend().entrySet()) {              ValueTime theirTimestamp = theirZoneTimestamps.get(timestampedPath.getKey());              if (theirTimestamp == null || ValueUtils.valueLower(theirTimestamp, timestampedPath.getValue())) { -                System.out.println("going to send " + timestampedPath.getKey().toString()); -                try { -                    zmis.add(hierarchy.findDescendant(timestampedPath.getKey())); -                } catch (ZMI.NoSuchZoneException e) { -                    System.out.println("ERROR: didn't find a zone we wanted to send in getZMIsToSend"); -                } +                zonesToSend.add(timestampedPath.getKey());              }          } -        return zmis;      } -    public List<Entry<Attribute, ValueQuery>> getQueriesToSend() { -        List<Entry<Attribute, ValueQuery>> queryList = new LinkedList(); +    public void setQueriesToSend() { +        queriesToSend = new LinkedList();          for (Entry<Attribute, ValueTime> timestampedQuery : getQueryTimestampsToSend().entrySet()) {              ValueTime theirTimestamp = theirQueryTimestamps.get(timestampedQuery.getKey());              if (theirTimestamp == null || ValueUtils.valueLower(theirTimestamp, timestampedQuery.getValue())) { -                queryList.add( -                    new SimpleImmutableEntry( -                        timestampedQuery.getKey(), -                        queries.get(timestampedQuery.getKey()).getKey() -                    ) -                ); +                queriesToSend.add(timestampedQuery.getKey());              }          } +    } + +    public List<ZMI> getZMIsToSend() { +        List<ZMI> zmis = new LinkedList(); +        for (PathName path : zonesToSend) { +            try { +                zmis.add(hierarchy.findDescendant(path)); +            } catch (ZMI.NoSuchZoneException e) { +                System.out.println("ERROR: didn't find a zone we wanted to send in getZMIsToSend"); +            } +        } +        return zmis; +    } + +    public List<Entry<Attribute, ValueQuery>> getQueriesToSend() { +        List<Entry<Attribute, ValueQuery>> queryList = new LinkedList(); +        for (Attribute name : queriesToSend) { +            queryList.add( +                new SimpleImmutableEntry( +                    name, +                    queries.get(name).getKey() +                ) +            ); +        }          return queryList;      } @@ -194,4 +236,42 @@ public class GossipGirlState {                  state = State.ERROR;          }      } + +    public void gotAttributesFor(PathName path) { +        switch (state) { +            case WAIT_FOR_INFO: +                if (!waitingForZones.remove(path)) { +                    System.out.println("DEBUG: got zone attributes we weren't expecting"); +                } +                if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { +                    System.out.println("INFO: done waiting for info"); +                    state = state.FINISHED; +                } +                break; +            default: +                System.out.println("ERROR: got attributes when not expected"); +                state = State.ERROR; +        } +    } + +    public void gotQuery(Attribute name) { +        switch (state) { +            case WAIT_FOR_INFO: +                if (!waitingForQueries.remove(name)) { +                    System.out.println("DEBUG: got query we weren't expecting"); +                } +                if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { +                    System.out.println("INFO: done waiting for info"); +                    state = state.FINISHED; +                } +                break; +            default: +                System.out.println("ERROR: got query when not expected"); +                state = State.ERROR; +        } +    } + +    public ValueTime getTheirQueryTimestamp(Attribute name) { +        return theirQueryTimestamps.get(name); +    }  } |