diff options
author | Marcin Chrzanowski <marcin.j.chrzanowski@gmail.com> | 2020-01-09 22:41:11 +0100 |
---|---|---|
committer | Marcin Chrzanowski <marcin.j.chrzanowski@gmail.com> | 2020-01-09 22:41:11 +0100 |
commit | 98ec32765bd769b457561dd9fdf34fee544dd54b (patch) | |
tree | e90b3bb4e551d4edbcd9629b4dceaada56a59f53 | |
parent | 4ba7fa9be088650dc32dbabaa1ef2ea47681ec81 (diff) |
Send info from gossip initiator
6 files changed, 176 insertions, 14 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java index 2016447..508fe88 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java @@ -8,7 +8,8 @@ public abstract class GossipGirlMessage extends AgentMessage { ATTRIBUTES, HEJKA, INITIATE, - NO_CO_TAM + NO_CO_TAM, + QUERY } private Type type; diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java new file mode 100644 index 0000000..2b3b064 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java @@ -0,0 +1,29 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.ValueQuery; + +public class QueryMessage extends RemoteGossipGirlMessage { + private Attribute name; + private ValueQuery query; + private long receiverGossipId; + + public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId) { + super(messageId, timestamp, Type.QUERY); + this.name = name; + this.query = query; + this.receiverGossipId = receiverGossipId; + } + + public Attribute getName() { + return name; + } + + public ValueQuery getQuery() { + return query; + } + + public long getReceiverGossipId() { + return receiverGossipId; + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index ccc7ee5..73aef8b 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -10,6 +10,7 @@ import pl.edu.mimuw.cloudatlas.agent.messages.GossipGirlMessage; import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage; import pl.edu.mimuw.cloudatlas.agent.messages.InitiateGossipMessage; import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.QueryMessage; import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage; import pl.edu.mimuw.cloudatlas.agent.messages.StateMessage; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; @@ -36,6 +37,7 @@ public class GossipGirl extends Module { break; case NO_CO_TAM: handleNoCoTam((NoCoTamMessage) message); + break; default: throw new InvalidMessageType("This type of message cannot be handled by GossipGirl"); } @@ -90,7 +92,12 @@ public class GossipGirl extends Module { UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage); sendMessage(udupMessage); } - // TODO: send queries + + for (Entry<Attribute, ValueQuery> query : state.getQueriesToSend()) { + QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId); + UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage); + sendMessage(udupMessage); + } state.sentInfo(); } else { System.out.println("ERROR: GossipGirl got state for a nonexistent gossip"); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index 8ec8ed2..df17957 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -1,5 +1,6 @@ package pl.edu.mimuw.cloudatlas.agent.modules; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -13,6 +14,7 @@ import pl.edu.mimuw.cloudatlas.model.PathName; import pl.edu.mimuw.cloudatlas.model.ValueContact; import pl.edu.mimuw.cloudatlas.model.ValueQuery; import pl.edu.mimuw.cloudatlas.model.ValueTime; +import pl.edu.mimuw.cloudatlas.model.ValueUtils; import pl.edu.mimuw.cloudatlas.model.ZMI; public class GossipGirlState { @@ -113,7 +115,35 @@ public class GossipGirlState { } public List<ZMI> getZMIsToSend() { - return new LinkedList(); + List<ZMI> zmis = new LinkedList(); + for (Entry<PathName, ValueTime> timestampedPath : getZoneTimestampsToSend().entrySet()) { + ValueTime theirTimestamp = theirZoneTimestamps.get(timestampedPath.getKey()); + if (theirTimestamp == null || ValueUtils.valueLower(theirTimestamp, timestampedPath.getValue())) { + System.out.println("going to send " + timestampedPath.getKey().toString()); + try { + zmis.add(hierarchy.findDescendant(timestampedPath.getKey())); + } catch (ZMI.NoSuchZoneException e) { + System.out.println("ERROR: didn't find a zone we wanted to send in getZMIsToSend"); + } + } + } + return zmis; + } + + public List<Entry<Attribute, ValueQuery>> getQueriesToSend() { + List<Entry<Attribute, ValueQuery>> queryList = new LinkedList(); + for (Entry<Attribute, ValueTime> timestampedQuery : getQueryTimestampsToSend().entrySet()) { + ValueTime theirTimestamp = theirQueryTimestamps.get(timestampedQuery.getKey()); + if (theirTimestamp == null || ValueUtils.valueLower(theirTimestamp, timestampedQuery.getValue())) { + queryList.add( + new SimpleImmutableEntry( + timestampedQuery.getKey(), + queries.get(timestampedQuery.getKey()).getKey() + ) + ); + } + } + return queryList; } public void collectZoneTimestamps(Map<PathName, ValueTime> timestamps, ZMI currentZMI, PathName recipientPath) { diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java index 1d6496c..7ac27e9 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java @@ -14,10 +14,13 @@ import java.util.Set; import pl.edu.mimuw.cloudatlas.agent.MockExecutor; import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage; import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage; import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage; import pl.edu.mimuw.cloudatlas.agent.messages.GossipGirlMessage; import pl.edu.mimuw.cloudatlas.agent.messages.InitiateGossipMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.QueryMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage; import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage; import pl.edu.mimuw.cloudatlas.agent.messages.StateMessage; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; @@ -41,10 +44,11 @@ public class GossipGirlTest { private final PathName theirPath = new PathName("/son/bro"); private ValueContact theirContact; private InitiateGossipMessage initiateGossipMessage; - private ZMI initiatorHierarchy; private ValueTime testTime; + private ZMI initiatorHierarchy; private Map<Attribute, Entry<ValueQuery, ValueTime>> initiatorQueries; private StateMessage initiatorStateMessage; + private NoCoTamMessage noCoTamMessage; @Before public void setupLocals() throws Exception { @@ -66,6 +70,36 @@ public class GossipGirlTest { setupHierarchy(); setupQueries(); initiatorStateMessage = new StateMessage("", ModuleType.GOSSIP, 0, 0, initiatorHierarchy, initiatorQueries); + + Map<PathName, ValueTime> otherZoneTimestamps = makeOtherZoneTimestamps(); + Map<Attribute, ValueTime> otherQueryTimestamps = makeOtherQueryTimestamps(); + + noCoTamMessage = new NoCoTamMessage("", 0, 0, 42, otherZoneTimestamps, otherQueryTimestamps, TestUtil.addToTime(testTime, 10), TestUtil.addToTime(testTime, 22)); + } + + public Map<PathName, ValueTime> makeOtherZoneTimestamps() { + Map<PathName, ValueTime> zoneTimestamps = new HashMap(); + addOtherZoneTimestamp(zoneTimestamps, "/son/sis", -100); + addOtherZoneTimestamp(zoneTimestamps, "/son/bro", 0); + addOtherZoneTimestamp(zoneTimestamps, "/son/whodis", -300); + + return zoneTimestamps; + } + + public Map<Attribute, ValueTime> makeOtherQueryTimestamps() { + Map<Attribute, ValueTime> queryTimestamps = new HashMap(); + addOtherQueryTimestamp(queryTimestamps, "&one", 10); + addOtherQueryTimestamp(queryTimestamps, "&query", -400); + addOtherQueryTimestamp(queryTimestamps, "&three", 0); + return queryTimestamps; + } + + public void addOtherQueryTimestamp(Map<Attribute, ValueTime> timestamps, String name, long offset) { + timestamps.put(new Attribute(name), TestUtil.addToTime(testTime, offset)); + } + + public void addOtherZoneTimestamp(Map<PathName, ValueTime> timestamps, String path, long offset) { + timestamps.put(new PathName(path), TestUtil.addToTime(testTime, offset)); } public void setupHierarchy() { @@ -80,6 +114,7 @@ public class GossipGirlTest { public void setupQueries() throws Exception { initiatorQueries = new HashMap(); addQuery(initiatorQueries, "&one", "SELECT 1 AS one", testTime); + addQuery(initiatorQueries, "&two", "SELECT 2 AS two", testTime); addQuery(initiatorQueries, "&query", "SELECT sum(foo) AS foo", testTime); } @@ -123,15 +158,12 @@ public class GossipGirlTest { gossipGirl.handleTyped(initiatorStateMessage); AgentMessage receivedMessage = executor.messagesToPass.poll(); - assertNotNull(receivedMessage); - assertEquals(ModuleType.UDP, receivedMessage.getDestinationModule()); - UDUPMessage udupMessage = (UDUPMessage) receivedMessage; - assertEquals(new PathName("/son/bro"), udupMessage.getContact().getName()); - assertEquals(ModuleType.GOSSIP, udupMessage.getContent().getDestinationModule()); - GossipGirlMessage gossipMessage = (GossipGirlMessage) udupMessage.getContent(); - - assertEquals(GossipGirlMessage.Type.HEJKA, gossipMessage.getType()); - HejkaMessage hejkaMessage = (HejkaMessage) gossipMessage; + assertUDUPMessage( + receivedMessage, + new PathName("/son/bro"), + GossipGirlMessage.Type.HEJKA + ); + HejkaMessage hejkaMessage = (HejkaMessage) ((UDUPMessage) receivedMessage).getContent(); assertEquals(0, hejkaMessage.getSenderGossipId()); System.out.println(hejkaMessage.getZoneTimestamps().keySet()); assertEquals(3, TestUtil.iterableSize(hejkaMessage.getZoneTimestamps().keySet())); @@ -140,6 +172,65 @@ public class GossipGirlTest { assertThat(zoneSet, hasItems(new PathName("/son/sis"))); assertThat(zoneSet, hasItems(new PathName("/son/grand"))); - assertEquals(2, TestUtil.iterableSize(hejkaMessage.getQueryTimestamps().keySet())); + assertEquals(3, TestUtil.iterableSize(hejkaMessage.getQueryTimestamps().keySet())); + Set<Attribute> querySet = hejkaMessage.getQueryTimestamps().keySet(); + assertThat(querySet, hasItems(new Attribute("&one"))); + assertThat(querySet, hasItems(new Attribute("&two"))); + assertThat(querySet, hasItems(new Attribute("&query"))); + } + + @Test + public void initiatorSendsZonesAndQueriesOnNoCoTam() throws Exception { + gossipGirl.handleTyped(initiateGossipMessage); + executor.messagesToPass.take(); + gossipGirl.handleTyped(initiatorStateMessage); + executor.messagesToPass.take(); + gossipGirl.handleTyped(noCoTamMessage); + + // 3 ZMIs, 2 queries + assertEquals(5, executor.messagesToPass.size()); + + AgentMessage receivedMessage1 = executor.messagesToPass.poll(); + assertAttributeMessage(receivedMessage1, "/son/bro", "/daughter"); + AgentMessage receivedMessage2 = executor.messagesToPass.poll(); + assertAttributeMessage(receivedMessage2, "/son/bro", "/son/sis"); + AgentMessage receivedMessage3 = executor.messagesToPass.poll(); + assertAttributeMessage(receivedMessage3, "/son/bro", "/son/grand"); + + AgentMessage receivedMessage4 = executor.messagesToPass.poll(); + assertQueryMessage(receivedMessage4, "/son/bro", "&two", "SELECT 2 AS two"); + AgentMessage receivedMessage5 = executor.messagesToPass.poll(); + assertQueryMessage(receivedMessage5, "/son/bro", "&query", "SELECT sum(foo) AS foo"); + } + + private void assertQueryMessage(AgentMessage message, String recipientPath, String name, String query) throws Exception { + assertUDUPMessage( + message, + new PathName(recipientPath), + GossipGirlMessage.Type.QUERY + ); + QueryMessage queryMessage = (QueryMessage) ((UDUPMessage) message).getContent(); + assertEquals(new Attribute(name), queryMessage.getName()); + assertEquals(new ValueQuery(query), queryMessage.getQuery()); + } + + private void assertAttributeMessage(AgentMessage message, String recipientPath, String zonePath) throws Exception { + assertUDUPMessage( + message, + new PathName(recipientPath), + GossipGirlMessage.Type.ATTRIBUTES + ); + AttributesMessage attributesMessage = (AttributesMessage) ((UDUPMessage) message).getContent(); + assertEquals(new PathName(zonePath), attributesMessage.getPath()); + } + + private void assertUDUPMessage(AgentMessage message, PathName destinationName, GossipGirlMessage.Type type) throws Exception { + assertNotNull(message); + assertEquals(ModuleType.UDP, message.getDestinationModule()); + UDUPMessage udupMessage = (UDUPMessage) message; + assertEquals(destinationName, udupMessage.getContact().getName()); + assertEquals(ModuleType.GOSSIP, udupMessage.getContent().getDestinationModule()); + GossipGirlMessage gossipMessage = (GossipGirlMessage) udupMessage.getContent(); + assertEquals(type, gossipMessage.getType()); } } diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/model/TestUtil.java b/src/test/java/pl/edu/mimuw/cloudatlas/model/TestUtil.java index ddc61a9..0fef6b4 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/model/TestUtil.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/model/TestUtil.java @@ -9,4 +9,8 @@ public class TestUtil { return count; } + + public static ValueTime addToTime(ValueTime time, long millis) { + return time.addValue(new ValueDuration(millis)); + } } |