From 7956fb8b67e6f10760431cbe77db2fcf33d5e9e0 Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Fri, 10 Jan 2020 20:14:43 +0100 Subject: Send state updates after gossip --- .../mimuw/cloudatlas/agent/modules/GossipGirl.java | 42 ++++++++ .../cloudatlas/agent/modules/GossipGirlState.java | 114 ++++++++++++++++++--- .../cloudatlas/agent/modules/GossipGirlTest.java | 85 ++++++++++++++- 3 files changed, 223 insertions(+), 18 deletions(-) (limited to 'src') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index 73aef8b..e7bd227 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -1,5 +1,6 @@ package pl.edu.mimuw.cloudatlas.agent.modules; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -14,6 +15,8 @@ import pl.edu.mimuw.cloudatlas.agent.messages.QueryMessage; import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage; import pl.edu.mimuw.cloudatlas.agent.messages.StateMessage; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UpdateQueriesMessage; import pl.edu.mimuw.cloudatlas.model.Attribute; import pl.edu.mimuw.cloudatlas.model.AttributesMap; import pl.edu.mimuw.cloudatlas.model.PathName; @@ -38,6 +41,12 @@ public class GossipGirl extends Module { case NO_CO_TAM: handleNoCoTam((NoCoTamMessage) message); break; + case ATTRIBUTES: + handleAttributes((AttributesMessage) message); + break; + case QUERY: + handleQuery((QueryMessage) message); + break; default: throw new InvalidMessageType("This type of message cannot be handled by GossipGirl"); } @@ -103,4 +112,37 @@ public class GossipGirl extends Module { System.out.println("ERROR: GossipGirl got state for a nonexistent gossip"); } } + + private void handleAttributes(AttributesMessage message) throws InterruptedException { + GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); + if (state != null) { + state.gotAttributesFor(message.getPath()); + UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes()); + sendMessage(updateMessage); + if (state.state == GossipGirlState.State.FINISHED) { + gossipStates.remove(message.getReceiverGossipId()); + } + } else { + System.out.println("ERROR: GossipGirl got attributes for a nonexistent gossip"); + } + } + + private void handleQuery(QueryMessage message) throws InterruptedException { + GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); + if (state != null) { + state.gotQuery(message.getName()); + Map> queries = new HashMap(); + queries.put( + message.getName(), + new SimpleImmutableEntry(message.getQuery(), state.getTheirQueryTimestamp(message.getName())) + ); + UpdateQueriesMessage updateMessage = new UpdateQueriesMessage("", 0, queries); + sendMessage(updateMessage); + if (state.state == GossipGirlState.State.FINISHED) { + gossipStates.remove(message.getReceiverGossipId()); + } + } else { + System.out.println("ERROR: GossipGirl got query for a nonexistent gossip"); + } + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index df17957..8b0711e 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -2,11 +2,13 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage; import pl.edu.mimuw.cloudatlas.model.Attribute; @@ -26,6 +28,7 @@ public class GossipGirlState { WAIT_FOR_NO_CO_TAM, WAIT_FOR_FIRST_INFO, WAIT_FOR_INFO, + FINISHED, ERROR } public PathName ourPath; @@ -42,6 +45,10 @@ public class GossipGirlState { public ValueTime noCoTamSendReceiveTimestamp; private Map theirZoneTimestamps; private Map theirQueryTimestamps; + private List zonesToSend; + private List queriesToSend; + private Set waitingForZones; + private Set waitingForQueries; public GossipGirlState(long gossipId, PathName ourPath, ValueContact theirContact, boolean initiating) { this.gossipId = gossipId; @@ -91,6 +98,9 @@ public class GossipGirlState { theirQueryTimestamps = message.getQueryTimestamps(); hejkaSendTimestamp = message.getHejkaSendTimestamp(); hejkaReceiveTimestamp = message.getHejkaReceiveTimestamp(); + setZonesToSend(); + setQueriesToSend(); + setWaitingFor(); state = State.SEND_INFO; break; default: @@ -99,6 +109,25 @@ public class GossipGirlState { } } + private void setWaitingFor() { + setWaitingForZones(); + setWaitingForQueries(); + } + + private void setWaitingForZones() { + waitingForZones = new HashSet(theirZoneTimestamps.keySet()); + for (PathName path : zonesToSend) { + waitingForZones.remove(path); + } + } + + private void setWaitingForQueries() { + waitingForQueries = new HashSet(theirQueryTimestamps.keySet()); + for (Attribute name : queriesToSend) { + waitingForQueries.remove(name); + } + } + public Map getZoneTimestampsToSend() { Map timestamps = new HashMap(); collectZoneTimestamps(timestamps, hierarchy, theirContact.getName()); @@ -114,35 +143,48 @@ public class GossipGirlState { return queryTimestamps; } - public List getZMIsToSend() { - List zmis = new LinkedList(); + public void setZonesToSend() { + zonesToSend = new LinkedList(); for (Entry timestampedPath : getZoneTimestampsToSend().entrySet()) { ValueTime theirTimestamp = theirZoneTimestamps.get(timestampedPath.getKey()); if (theirTimestamp == null || ValueUtils.valueLower(theirTimestamp, timestampedPath.getValue())) { - System.out.println("going to send " + timestampedPath.getKey().toString()); - try { - zmis.add(hierarchy.findDescendant(timestampedPath.getKey())); - } catch (ZMI.NoSuchZoneException e) { - System.out.println("ERROR: didn't find a zone we wanted to send in getZMIsToSend"); - } + zonesToSend.add(timestampedPath.getKey()); } } - return zmis; } - public List> getQueriesToSend() { - List> queryList = new LinkedList(); + public void setQueriesToSend() { + queriesToSend = new LinkedList(); for (Entry timestampedQuery : getQueryTimestampsToSend().entrySet()) { ValueTime theirTimestamp = theirQueryTimestamps.get(timestampedQuery.getKey()); if (theirTimestamp == null || ValueUtils.valueLower(theirTimestamp, timestampedQuery.getValue())) { - queryList.add( - new SimpleImmutableEntry( - timestampedQuery.getKey(), - queries.get(timestampedQuery.getKey()).getKey() - ) - ); + queriesToSend.add(timestampedQuery.getKey()); } } + } + + public List getZMIsToSend() { + List zmis = new LinkedList(); + for (PathName path : zonesToSend) { + try { + zmis.add(hierarchy.findDescendant(path)); + } catch (ZMI.NoSuchZoneException e) { + System.out.println("ERROR: didn't find a zone we wanted to send in getZMIsToSend"); + } + } + return zmis; + } + + public List> getQueriesToSend() { + List> queryList = new LinkedList(); + for (Attribute name : queriesToSend) { + queryList.add( + new SimpleImmutableEntry( + name, + queries.get(name).getKey() + ) + ); + } return queryList; } @@ -194,4 +236,42 @@ public class GossipGirlState { state = State.ERROR; } } + + public void gotAttributesFor(PathName path) { + switch (state) { + case WAIT_FOR_INFO: + if (!waitingForZones.remove(path)) { + System.out.println("DEBUG: got zone attributes we weren't expecting"); + } + if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { + System.out.println("INFO: done waiting for info"); + state = state.FINISHED; + } + break; + default: + System.out.println("ERROR: got attributes when not expected"); + state = State.ERROR; + } + } + + public void gotQuery(Attribute name) { + switch (state) { + case WAIT_FOR_INFO: + if (!waitingForQueries.remove(name)) { + System.out.println("DEBUG: got query we weren't expecting"); + } + if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { + System.out.println("INFO: done waiting for info"); + state = state.FINISHED; + } + break; + default: + System.out.println("ERROR: got query when not expected"); + state = State.ERROR; + } + } + + public ValueTime getTheirQueryTimestamp(Attribute name) { + return theirQueryTimestamps.get(name); + } } diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java index 7ac27e9..fffb577 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java @@ -24,6 +24,8 @@ import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage; import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage; import pl.edu.mimuw.cloudatlas.agent.messages.StateMessage; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UpdateQueriesMessage; import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; import pl.edu.mimuw.cloudatlas.model.AttributesMap; import pl.edu.mimuw.cloudatlas.model.Attribute; @@ -49,6 +51,10 @@ public class GossipGirlTest { private Map> initiatorQueries; private StateMessage initiatorStateMessage; private NoCoTamMessage noCoTamMessage; + private AttributesMessage attributesMessage1; + private AttributesMessage attributesMessage2; + private QueryMessage queryMessage1; + private QueryMessage queryMessage2; @Before public void setupLocals() throws Exception { @@ -75,6 +81,37 @@ public class GossipGirlTest { Map otherQueryTimestamps = makeOtherQueryTimestamps(); noCoTamMessage = new NoCoTamMessage("", 0, 0, 42, otherZoneTimestamps, otherQueryTimestamps, TestUtil.addToTime(testTime, 10), TestUtil.addToTime(testTime, 22)); + + attributesMessage1 = makeAttributesMessage("/son/bro", makeAttributes1()); + attributesMessage2 = makeAttributesMessage("/son/whodis", makeAttributes2()); + queryMessage1 = makeQueryMessage("&one", "SELECT 3 AS one"); + queryMessage2 = makeQueryMessage("&three", "SELECT 3 AS three"); + } + + public QueryMessage makeQueryMessage(String name, String query) throws Exception { + return new QueryMessage("", 0, new Attribute(name), new ValueQuery(query), 0); + } + + public AttributesMap makeAttributes1() { + AttributesMap attributes = new AttributesMap(); + attributes.add("name", new ValueString("bro")); + attributes.add("timestamp", testTime); + attributes.add("foo", new ValueInt(140l)); + attributes.add("bar", new ValueString(":wq")); + return attributes; + } + + public AttributesMap makeAttributes2() { + AttributesMap attributes = new AttributesMap(); + attributes.add("name", new ValueString("whodis")); + attributes.add("timestamp", TestUtil.addToTime(testTime, -300)); + attributes.add("foo", new ValueInt(61l)); + attributes.add("bar", new ValueString("nice")); + return attributes; + } + + public AttributesMessage makeAttributesMessage(String path, AttributesMap attributes) { + return new AttributesMessage("", 0, new PathName(path), attributes, 0); } public Map makeOtherZoneTimestamps() { @@ -165,7 +202,6 @@ public class GossipGirlTest { ); HejkaMessage hejkaMessage = (HejkaMessage) ((UDUPMessage) receivedMessage).getContent(); assertEquals(0, hejkaMessage.getSenderGossipId()); - System.out.println(hejkaMessage.getZoneTimestamps().keySet()); assertEquals(3, TestUtil.iterableSize(hejkaMessage.getZoneTimestamps().keySet())); Set zoneSet = hejkaMessage.getZoneTimestamps().keySet(); assertThat(zoneSet, hasItems(new PathName("/daughter"))); @@ -203,6 +239,53 @@ public class GossipGirlTest { assertQueryMessage(receivedMessage5, "/son/bro", "&query", "SELECT sum(foo) AS foo"); } + @Test + public void initiatorModifiesStateOnAttributes() throws Exception { + gossipGirl.handleTyped(initiateGossipMessage); + executor.messagesToPass.take(); + gossipGirl.handleTyped(initiatorStateMessage); + executor.messagesToPass.take(); + gossipGirl.handleTyped(noCoTamMessage); + executor.messagesToPass.take(); + executor.messagesToPass.take(); + executor.messagesToPass.take(); + executor.messagesToPass.take(); + executor.messagesToPass.take(); + + gossipGirl.handleTyped(attributesMessage1); + AgentMessage receivedMessage1 = executor.messagesToPass.poll(); + assertNotNull(receivedMessage1); + assertEquals(ModuleType.STATE, receivedMessage1.getDestinationModule()); + StanikMessage stanikMessage1 = (StanikMessage) receivedMessage1; + assertEquals(StanikMessage.Type.UPDATE_ATTRIBUTES, stanikMessage1.getType()); + UpdateAttributesMessage updateMessage1 = (UpdateAttributesMessage) stanikMessage1; + assertEquals("/son/bro", updateMessage1.getPathName()); + // TODO: this should be modified by GTP + assertEquals(testTime, updateMessage1.getAttributes().getOrNull("timestamp")); + assertEquals(new ValueInt(140l), updateMessage1.getAttributes().getOrNull("foo")); + assertEquals(new ValueString(":wq"), updateMessage1.getAttributes().getOrNull("bar")); + + gossipGirl.handleTyped(queryMessage1); + AgentMessage receivedMessage2 = executor.messagesToPass.poll(); + assertNotNull(receivedMessage2); + assertEquals(ModuleType.STATE, receivedMessage2.getDestinationModule()); + StanikMessage stanikMessage2 = (StanikMessage) receivedMessage2; + assertEquals(StanikMessage.Type.UPDATE_QUERIES, stanikMessage2.getType()); + UpdateQueriesMessage updateMessage2 = (UpdateQueriesMessage) stanikMessage2; + assertEquals(1, updateMessage2.getQueries().keySet().size()); + assertThat(updateMessage2.getQueries().keySet(), hasItems(new Attribute("&one"))); + assertEquals(updateMessage2.getQueries().get(new Attribute("&one")), + new SimpleImmutableEntry( + new ValueQuery("SELECT 3 AS one"), + // TODO: this should be modified by GTP + TestUtil.addToTime(testTime, 10) + ) + ); + + gossipGirl.handleTyped(attributesMessage2); + gossipGirl.handleTyped(queryMessage2); + } + private void assertQueryMessage(AgentMessage message, String recipientPath, String name, String query) throws Exception { assertUDUPMessage( message, -- cgit v1.2.3 From 89af38da0b95445180440e85bb8248ab910ef9f8 Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Fri, 10 Jan 2020 22:58:32 +0100 Subject: Send NoCoTam when responder gets state --- .../cloudatlas/agent/messages/HejkaMessage.java | 14 ++++- .../cloudatlas/agent/messages/NoCoTamMessage.java | 2 +- .../mimuw/cloudatlas/agent/modules/GossipGirl.java | 37 +++++++++++++ .../cloudatlas/agent/modules/GossipGirlState.java | 35 +++++++++++- .../cloudatlas/agent/modules/GossipGirlTest.java | 63 +++++++++++++++++++--- 5 files changed, 141 insertions(+), 10 deletions(-) (limited to 'src') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/HejkaMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/HejkaMessage.java index 340d939..62554a5 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/HejkaMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/HejkaMessage.java @@ -8,12 +8,16 @@ import pl.edu.mimuw.cloudatlas.model.ValueTime; public class HejkaMessage extends RemoteGossipGirlMessage { private long senderGossipId; + private PathName senderPath; + private PathName receiverPath; private Map zoneTimestamps; private Map queryTimestamps; - public HejkaMessage(String messageId, long timestamp, long senderGossipId, Map zoneTimestamps, Map queryTimestamps) { + public HejkaMessage(String messageId, long timestamp, long senderGossipId, PathName senderPath, PathName receiverPath, Map zoneTimestamps, Map queryTimestamps) { super(messageId, timestamp, Type.HEJKA); this.senderGossipId = senderGossipId; + this.senderPath = senderPath; + this.receiverPath = receiverPath; this.zoneTimestamps = zoneTimestamps; this.queryTimestamps = queryTimestamps; } @@ -22,6 +26,14 @@ public class HejkaMessage extends RemoteGossipGirlMessage { return senderGossipId; } + public PathName getSenderPath() { + return senderPath; + } + + public PathName getReceiverPath() { + return receiverPath; + } + public Map getZoneTimestamps() { return zoneTimestamps; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/NoCoTamMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/NoCoTamMessage.java index 3dd0c4d..c23722c 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/NoCoTamMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/NoCoTamMessage.java @@ -14,7 +14,7 @@ public class NoCoTamMessage extends RemoteGossipGirlMessage { private ValueTime hejkaSendTimestamp; private ValueTime hejkaReceiveTimestamp; - public NoCoTamMessage(String messageId, long timestamp, long receiverGossipId, long senderGossipId, Map zoneTimestamps, Map queryTimestamps, ValueTime hejkaSendTimestamp, ValueTime hejkaReceiveTimestamp) { + public NoCoTamMessage(String messageId, long timestamp, long senderGossipId, long receiverGossipId, Map zoneTimestamps, Map queryTimestamps, ValueTime hejkaSendTimestamp, ValueTime hejkaReceiveTimestamp) { super(messageId, timestamp, Type.NO_CO_TAM); this.receiverGossipId = receiverGossipId; this.senderGossipId = senderGossipId; diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index e7bd227..0c02f1f 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -20,6 +20,7 @@ import pl.edu.mimuw.cloudatlas.agent.messages.UpdateQueriesMessage; import pl.edu.mimuw.cloudatlas.model.Attribute; import pl.edu.mimuw.cloudatlas.model.AttributesMap; import pl.edu.mimuw.cloudatlas.model.PathName; +import pl.edu.mimuw.cloudatlas.model.ValueContact; import pl.edu.mimuw.cloudatlas.model.ValueQuery; import pl.edu.mimuw.cloudatlas.model.ValueTime; import pl.edu.mimuw.cloudatlas.model.ZMI; @@ -38,6 +39,9 @@ public class GossipGirl extends Module { case INITIATE: initiateGossip((InitiateGossipMessage) message); break; + case HEJKA: + receiveGossip((HejkaMessage) message); + break; case NO_CO_TAM: handleNoCoTam((NoCoTamMessage) message); break; @@ -71,6 +75,23 @@ public class GossipGirl extends Module { sendMessage(getState); } + private void receiveGossip(HejkaMessage message) throws InterruptedException { + Long gossipId = nextGossipId; + nextGossipId++; + gossipStates.put(gossipId, new GossipGirlState( + gossipId, + message.getReceiverPath(), + new ValueContact(message.getSenderPath(), message.getSenderAddress()), + false + ) + ); + + gossipStates.get(gossipId).handleHejka(message); + + GetStateMessage getState = new GetStateMessage("", 0, ModuleType.GOSSIP, gossipId); + sendMessage(getState); + } + private void setState(StateMessage message) throws InterruptedException { GossipGirlState state = gossipStates.get(message.getRequestId()); if (state != null) { @@ -80,12 +101,28 @@ public class GossipGirl extends Module { "", 0, state.gossipId, + state.ourPath, + state.theirContact.getName(), state.getZoneTimestampsToSend(), state.getQueryTimestampsToSend() ); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, hejka); sendMessage(udupMessage); state.sentHejka(); + } else if (state.state == GossipGirlState.State.SEND_NO_CO_TAM) { + NoCoTamMessage noCoTam = new NoCoTamMessage( + "", + 0, + state.gossipId, + state.theirGossipId, + state.getZoneTimestampsToSend(), + state.getQueryTimestampsToSend(), + state.hejkaSendTimestamp, + state.hejkaReceiveTimestamp + ); + UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, noCoTam); + sendMessage(udupMessage); + state.sentNoCoTam(); } } else { System.out.println("ERROR: GossipGirl got state for a nonexistent gossip"); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index 8b0711e..1629914 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -10,6 +10,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage; import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage; import pl.edu.mimuw.cloudatlas.model.Attribute; import pl.edu.mimuw.cloudatlas.model.PathName; @@ -22,8 +23,10 @@ import pl.edu.mimuw.cloudatlas.model.ZMI; public class GossipGirlState { public enum State { WAIT_FOR_STATE_INITIALIZER, + APPLY_HEJKA, WAIT_FOR_STATE_RESPONDER, SEND_HEJKA, + SEND_NO_CO_TAM, SEND_INFO, WAIT_FOR_NO_CO_TAM, WAIT_FOR_FIRST_INFO, @@ -57,7 +60,7 @@ public class GossipGirlState { if (initiating) { state = State.WAIT_FOR_STATE_INITIALIZER; } else { - state = State.WAIT_FOR_STATE_RESPONDER; + state = State.APPLY_HEJKA; } } @@ -71,7 +74,7 @@ public class GossipGirlState { case WAIT_FOR_STATE_RESPONDER: this.hierarchy = hierarchy; this.queries = queries; - state = State.WAIT_FOR_FIRST_INFO; + state = State.SEND_NO_CO_TAM; break; default: System.out.println("ERROR: tried to set gossip state when not expected"); @@ -90,6 +93,34 @@ public class GossipGirlState { } } + public void sentNoCoTam() { + switch (state) { + case SEND_NO_CO_TAM: + state = state.WAIT_FOR_FIRST_INFO; + break; + default: + System.out.println("ERROR: tried to set gossip state when not expected"); + state = State.ERROR; + } + } + + public void handleHejka(HejkaMessage message) { + switch (state) { + case APPLY_HEJKA: + System.out.println("setting sender gossip id to " + message.getSenderGossipId()); + theirGossipId = message.getSenderGossipId(); + theirZoneTimestamps = message.getZoneTimestamps(); + theirQueryTimestamps = message.getQueryTimestamps(); + hejkaSendTimestamp = message.getSentTimestamp(); + hejkaReceiveTimestamp = message.getReceivedTimestamp(); + state = State.WAIT_FOR_STATE_RESPONDER; + break; + default: + System.out.println("ERROR: tried to set gossip state when not expected"); + state = State.ERROR; + } + } + public void handleNoCoTam(NoCoTamMessage message) { switch (state) { case WAIT_FOR_NO_CO_TAM: diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java index fffb577..170b76e 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java @@ -49,13 +49,15 @@ public class GossipGirlTest { private ValueTime testTime; private ZMI initiatorHierarchy; private Map> initiatorQueries; - private StateMessage initiatorStateMessage; + private StateMessage stateMessage; private NoCoTamMessage noCoTamMessage; private AttributesMessage attributesMessage1; private AttributesMessage attributesMessage2; private QueryMessage queryMessage1; private QueryMessage queryMessage2; + private HejkaMessage hejkaMessage; + @Before public void setupLocals() throws Exception { gossipGirl = new GossipGirl(); @@ -75,19 +77,25 @@ public class GossipGirlTest { testTime = ValueUtils.currentTime(); setupHierarchy(); setupQueries(); - initiatorStateMessage = new StateMessage("", ModuleType.GOSSIP, 0, 0, initiatorHierarchy, initiatorQueries); + stateMessage = new StateMessage("", ModuleType.GOSSIP, 0, 0, initiatorHierarchy, initiatorQueries); Map otherZoneTimestamps = makeOtherZoneTimestamps(); Map otherQueryTimestamps = makeOtherQueryTimestamps(); - noCoTamMessage = new NoCoTamMessage("", 0, 0, 42, otherZoneTimestamps, otherQueryTimestamps, TestUtil.addToTime(testTime, 10), TestUtil.addToTime(testTime, 22)); + noCoTamMessage = new NoCoTamMessage("", 0, 42, 0, otherZoneTimestamps, otherQueryTimestamps, TestUtil.addToTime(testTime, 10), TestUtil.addToTime(testTime, 22)); attributesMessage1 = makeAttributesMessage("/son/bro", makeAttributes1()); attributesMessage2 = makeAttributesMessage("/son/whodis", makeAttributes2()); queryMessage1 = makeQueryMessage("&one", "SELECT 3 AS one"); queryMessage2 = makeQueryMessage("&three", "SELECT 3 AS three"); + + hejkaMessage = new HejkaMessage("", 0, 123, new PathName("/son/bro"), new PathName("/son/grand"), otherZoneTimestamps, otherQueryTimestamps); + hejkaMessage.setSentTimestamp(testTime); + hejkaMessage.setReceivedTimestamp(TestUtil.addToTime(testTime, 15)); + hejkaMessage.setSenderAddress(theirContact.getAddress()); } + public QueryMessage makeQueryMessage(String name, String query) throws Exception { return new QueryMessage("", 0, new Attribute(name), new ValueQuery(query), 0); } @@ -192,7 +200,7 @@ public class GossipGirlTest { public void initiatorSendsHejkaOnState() throws Exception { gossipGirl.handleTyped(initiateGossipMessage); executor.messagesToPass.take(); - gossipGirl.handleTyped(initiatorStateMessage); + gossipGirl.handleTyped(stateMessage); AgentMessage receivedMessage = executor.messagesToPass.poll(); assertUDUPMessage( @@ -202,6 +210,8 @@ public class GossipGirlTest { ); HejkaMessage hejkaMessage = (HejkaMessage) ((UDUPMessage) receivedMessage).getContent(); assertEquals(0, hejkaMessage.getSenderGossipId()); + assertEquals(new PathName("/son/grand"), hejkaMessage.getSenderPath()); + assertEquals(new PathName("/son/bro"), hejkaMessage.getReceiverPath()); assertEquals(3, TestUtil.iterableSize(hejkaMessage.getZoneTimestamps().keySet())); Set zoneSet = hejkaMessage.getZoneTimestamps().keySet(); assertThat(zoneSet, hasItems(new PathName("/daughter"))); @@ -219,7 +229,7 @@ public class GossipGirlTest { public void initiatorSendsZonesAndQueriesOnNoCoTam() throws Exception { gossipGirl.handleTyped(initiateGossipMessage); executor.messagesToPass.take(); - gossipGirl.handleTyped(initiatorStateMessage); + gossipGirl.handleTyped(stateMessage); executor.messagesToPass.take(); gossipGirl.handleTyped(noCoTamMessage); @@ -243,7 +253,7 @@ public class GossipGirlTest { public void initiatorModifiesStateOnAttributes() throws Exception { gossipGirl.handleTyped(initiateGossipMessage); executor.messagesToPass.take(); - gossipGirl.handleTyped(initiatorStateMessage); + gossipGirl.handleTyped(stateMessage); executor.messagesToPass.take(); gossipGirl.handleTyped(noCoTamMessage); executor.messagesToPass.take(); @@ -286,6 +296,47 @@ public class GossipGirlTest { gossipGirl.handleTyped(queryMessage2); } + @Test + public void getsStateOnHejka() throws Exception { + gossipGirl.handleTyped(hejkaMessage); + + AgentMessage receivedMessage = executor.messagesToPass.poll(); + assertNotNull(receivedMessage); + assertEquals(ModuleType.STATE, receivedMessage.getDestinationModule()); + StanikMessage stanikMessage = (StanikMessage) receivedMessage; + assertEquals(StanikMessage.Type.GET_STATE, stanikMessage.getType()); + GetStateMessage getStateMessage = (GetStateMessage) stanikMessage; + assertEquals(ModuleType.GOSSIP, getStateMessage.getRequestingModule()); + } + + @Test + public void receiverSendsNoCoTamOnState() throws Exception { + gossipGirl.handleTyped(hejkaMessage); + executor.messagesToPass.poll(); + + gossipGirl.handleTyped(stateMessage); + AgentMessage receivedMessage = executor.messagesToPass.poll(); + assertUDUPMessage( + receivedMessage, + new PathName("/son/bro"), + GossipGirlMessage.Type.NO_CO_TAM + ); + NoCoTamMessage noCoTamMessage = (NoCoTamMessage) ((UDUPMessage) receivedMessage).getContent(); + assertEquals(0, noCoTamMessage.getSenderGossipId()); + assertEquals(123, noCoTamMessage.getReceiverGossipId()); + assertEquals(3, TestUtil.iterableSize(noCoTamMessage.getZoneTimestamps().keySet())); + Set zoneSet = noCoTamMessage.getZoneTimestamps().keySet(); + assertThat(zoneSet, hasItems(new PathName("/daughter"))); + assertThat(zoneSet, hasItems(new PathName("/son/sis"))); + assertThat(zoneSet, hasItems(new PathName("/son/grand"))); + + assertEquals(3, TestUtil.iterableSize(noCoTamMessage.getQueryTimestamps().keySet())); + Set querySet = noCoTamMessage.getQueryTimestamps().keySet(); + assertThat(querySet, hasItems(new Attribute("&one"))); + assertThat(querySet, hasItems(new Attribute("&two"))); + assertThat(querySet, hasItems(new Attribute("&query"))); + } + private void assertQueryMessage(AgentMessage message, String recipientPath, String name, String query) throws Exception { assertUDUPMessage( message, -- cgit v1.2.3 From 0fca21f2011958d709a25aa1f4d863c1d646da6e Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sat, 11 Jan 2020 12:08:02 +0100 Subject: Send info back from responder --- .../mimuw/cloudatlas/agent/modules/GossipGirl.java | 33 +++++++++++++--------- .../cloudatlas/agent/modules/GossipGirlState.java | 25 ++++++++++++++-- .../cloudatlas/agent/modules/GossipGirlTest.java | 27 +++++++++++++++++- 3 files changed, 68 insertions(+), 17 deletions(-) (limited to 'src') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index 0c02f1f..dd8f0b7 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -133,27 +133,34 @@ public class GossipGirl extends Module { GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); if (state != null) { state.handleNoCoTam(message); - for (ZMI zmi : state.getZMIsToSend()) { - AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId); - UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage); - sendMessage(udupMessage); - } - - for (Entry query : state.getQueriesToSend()) { - QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId); - UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage); - sendMessage(udupMessage); - } - state.sentInfo(); + sendInfo(state); } else { System.out.println("ERROR: GossipGirl got state for a nonexistent gossip"); } } + private void sendInfo(GossipGirlState state) throws InterruptedException { + for (ZMI zmi : state.getZMIsToSend()) { + AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId); + UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage); + sendMessage(udupMessage); + } + + for (Entry query : state.getQueriesToSend()) { + QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId); + UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage); + sendMessage(udupMessage); + } + state.sentInfo(); + } + private void handleAttributes(AttributesMessage message) throws InterruptedException { GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); if (state != null) { - state.gotAttributesFor(message.getPath()); + state.gotAttributes(message); + if (state.state == GossipGirlState.State.SEND_INFO) { + sendInfo(state); + } UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes()); sendMessage(updateMessage); if (state.state == GossipGirlState.State.FINISHED) { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index 1629914..eafbcca 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -10,6 +10,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage; import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage; import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage; import pl.edu.mimuw.cloudatlas.model.Attribute; @@ -28,6 +29,7 @@ public class GossipGirlState { SEND_HEJKA, SEND_NO_CO_TAM, SEND_INFO, + SEND_INFO_AND_FINISH, WAIT_FOR_NO_CO_TAM, WAIT_FOR_FIRST_INFO, WAIT_FOR_INFO, @@ -107,7 +109,6 @@ public class GossipGirlState { public void handleHejka(HejkaMessage message) { switch (state) { case APPLY_HEJKA: - System.out.println("setting sender gossip id to " + message.getSenderGossipId()); theirGossipId = message.getSenderGossipId(); theirZoneTimestamps = message.getZoneTimestamps(); theirQueryTimestamps = message.getQueryTimestamps(); @@ -262,16 +263,19 @@ public class GossipGirlState { case SEND_INFO: state = State.WAIT_FOR_INFO; break; + case SEND_INFO_AND_FINISH: + state = State.FINISHED; + break; default: System.out.println("ERROR: tried to set gossip state when not expected"); state = State.ERROR; } } - public void gotAttributesFor(PathName path) { + public void gotAttributes(AttributesMessage message) { switch (state) { case WAIT_FOR_INFO: - if (!waitingForZones.remove(path)) { + if (!waitingForZones.remove(message.getPath())) { System.out.println("DEBUG: got zone attributes we weren't expecting"); } if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { @@ -279,6 +283,21 @@ public class GossipGirlState { state = state.FINISHED; } break; + case WAIT_FOR_FIRST_INFO: + // TODO: use offset to setup GTP + setZonesToSend(); + setQueriesToSend(); + setWaitingFor(); + state = State.SEND_INFO; + + if (!waitingForZones.remove(message.getPath())) { + System.out.println("DEBUG: got zone attributes we weren't expecting"); + } + if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { + System.out.println("INFO: done waiting for info"); + state = state.SEND_INFO_AND_FINISH; + } + break; default: System.out.println("ERROR: got attributes when not expected"); state = State.ERROR; diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java index 170b76e..37fdbe7 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java @@ -312,7 +312,7 @@ public class GossipGirlTest { @Test public void receiverSendsNoCoTamOnState() throws Exception { gossipGirl.handleTyped(hejkaMessage); - executor.messagesToPass.poll(); + executor.messagesToPass.take(); gossipGirl.handleTyped(stateMessage); AgentMessage receivedMessage = executor.messagesToPass.poll(); @@ -337,6 +337,31 @@ public class GossipGirlTest { assertThat(querySet, hasItems(new Attribute("&query"))); } + @Test + public void receiverSendsInfoOnFirstReceivedInfo() throws Exception { + gossipGirl.handleTyped(hejkaMessage); + executor.messagesToPass.take(); + gossipGirl.handleTyped(stateMessage); + executor.messagesToPass.take(); + + gossipGirl.handleTyped(attributesMessage1); + + // 3 ZMIs, 2 queries, 1 own attributes update + assertEquals(6, executor.messagesToPass.size()); + + AgentMessage receivedMessage1 = executor.messagesToPass.poll(); + assertAttributeMessage(receivedMessage1, "/son/bro", "/daughter"); + AgentMessage receivedMessage2 = executor.messagesToPass.poll(); + assertAttributeMessage(receivedMessage2, "/son/bro", "/son/sis"); + AgentMessage receivedMessage3 = executor.messagesToPass.poll(); + assertAttributeMessage(receivedMessage3, "/son/bro", "/son/grand"); + + AgentMessage receivedMessage4 = executor.messagesToPass.poll(); + assertQueryMessage(receivedMessage4, "/son/bro", "&two", "SELECT 2 AS two"); + AgentMessage receivedMessage5 = executor.messagesToPass.poll(); + assertQueryMessage(receivedMessage5, "/son/bro", "&query", "SELECT sum(foo) AS foo"); + } + private void assertQueryMessage(AgentMessage message, String recipientPath, String name, String query) throws Exception { assertUDUPMessage( message, -- cgit v1.2.3 From 63ed859398a815e310129de1af1f8821b690b700 Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sat, 11 Jan 2020 12:15:17 +0100 Subject: Update state on receiving info --- .../cloudatlas/agent/modules/GossipGirlTest.java | 48 ++++++++++++++++++++++ 1 file changed, 48 insertions(+) (limited to 'src') diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java index 37fdbe7..bf6255e 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java @@ -362,6 +362,54 @@ public class GossipGirlTest { assertQueryMessage(receivedMessage5, "/son/bro", "&query", "SELECT sum(foo) AS foo"); } + @Test + public void receiverModifiesStateOnReceivedInfo() throws Exception { + gossipGirl.handleTyped(hejkaMessage); + executor.messagesToPass.take(); + gossipGirl.handleTyped(stateMessage); + executor.messagesToPass.take(); + + gossipGirl.handleTyped(attributesMessage1); + executor.messagesToPass.take(); + executor.messagesToPass.take(); + executor.messagesToPass.take(); + executor.messagesToPass.take(); + executor.messagesToPass.take(); + + AgentMessage receivedMessage1 = executor.messagesToPass.poll(); + assertNotNull(receivedMessage1); + assertEquals(ModuleType.STATE, receivedMessage1.getDestinationModule()); + StanikMessage stanikMessage1 = (StanikMessage) receivedMessage1; + assertEquals(StanikMessage.Type.UPDATE_ATTRIBUTES, stanikMessage1.getType()); + UpdateAttributesMessage updateMessage1 = (UpdateAttributesMessage) stanikMessage1; + assertEquals("/son/bro", updateMessage1.getPathName()); + // TODO: this should be modified by GTP + assertEquals(testTime, updateMessage1.getAttributes().getOrNull("timestamp")); + assertEquals(new ValueInt(140l), updateMessage1.getAttributes().getOrNull("foo")); + assertEquals(new ValueString(":wq"), updateMessage1.getAttributes().getOrNull("bar")); + + gossipGirl.handleTyped(queryMessage1); + AgentMessage receivedMessage2 = executor.messagesToPass.poll(); + assertNotNull(receivedMessage2); + assertEquals(ModuleType.STATE, receivedMessage2.getDestinationModule()); + StanikMessage stanikMessage2 = (StanikMessage) receivedMessage2; + assertEquals(StanikMessage.Type.UPDATE_QUERIES, stanikMessage2.getType()); + UpdateQueriesMessage updateMessage2 = (UpdateQueriesMessage) stanikMessage2; + assertEquals(1, updateMessage2.getQueries().keySet().size()); + assertThat(updateMessage2.getQueries().keySet(), hasItems(new Attribute("&one"))); + assertEquals(updateMessage2.getQueries().get(new Attribute("&one")), + new SimpleImmutableEntry( + new ValueQuery("SELECT 3 AS one"), + // TODO: this should be modified by GTP + TestUtil.addToTime(testTime, 10) + ) + ); + + gossipGirl.handleTyped(attributesMessage2); + gossipGirl.handleTyped(queryMessage2); + assertTrue(false); + } + private void assertQueryMessage(AgentMessage message, String recipientPath, String name, String query) throws Exception { assertUDUPMessage( message, -- cgit v1.2.3 From f624f7350e0018060ad3f38be1c3988bc3fb0545 Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sat, 11 Jan 2020 12:19:53 +0100 Subject: Fix test --- src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java | 1 - 1 file changed, 1 deletion(-) (limited to 'src') diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java index bf6255e..a812e61 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java @@ -407,7 +407,6 @@ public class GossipGirlTest { gossipGirl.handleTyped(attributesMessage2); gossipGirl.handleTyped(queryMessage2); - assertTrue(false); } private void assertQueryMessage(AgentMessage message, String recipientPath, String name, String query) throws Exception { -- cgit v1.2.3