From 0fca21f2011958d709a25aa1f4d863c1d646da6e Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sat, 11 Jan 2020 12:08:02 +0100 Subject: Send info back from responder --- .../mimuw/cloudatlas/agent/modules/GossipGirl.java | 33 +++++++++++++--------- .../cloudatlas/agent/modules/GossipGirlState.java | 25 ++++++++++++++-- .../cloudatlas/agent/modules/GossipGirlTest.java | 27 +++++++++++++++++- 3 files changed, 68 insertions(+), 17 deletions(-) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index 0c02f1f..dd8f0b7 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -133,27 +133,34 @@ public class GossipGirl extends Module { GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); if (state != null) { state.handleNoCoTam(message); - for (ZMI zmi : state.getZMIsToSend()) { - AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId); - UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage); - sendMessage(udupMessage); - } - - for (Entry query : state.getQueriesToSend()) { - QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId); - UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage); - sendMessage(udupMessage); - } - state.sentInfo(); + sendInfo(state); } else { System.out.println("ERROR: GossipGirl got state for a nonexistent gossip"); } } + private void sendInfo(GossipGirlState state) throws InterruptedException { + for (ZMI zmi : state.getZMIsToSend()) { + AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId); + UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage); + sendMessage(udupMessage); + } + + for (Entry query : state.getQueriesToSend()) { + QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId); + UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage); + sendMessage(udupMessage); + } + state.sentInfo(); + } + private void handleAttributes(AttributesMessage message) throws InterruptedException { GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); if (state != null) { - state.gotAttributesFor(message.getPath()); + state.gotAttributes(message); + if (state.state == GossipGirlState.State.SEND_INFO) { + sendInfo(state); + } UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes()); sendMessage(updateMessage); if (state.state == GossipGirlState.State.FINISHED) { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index 1629914..eafbcca 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -10,6 +10,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage; import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage; import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage; import pl.edu.mimuw.cloudatlas.model.Attribute; @@ -28,6 +29,7 @@ public class GossipGirlState { SEND_HEJKA, SEND_NO_CO_TAM, SEND_INFO, + SEND_INFO_AND_FINISH, WAIT_FOR_NO_CO_TAM, WAIT_FOR_FIRST_INFO, WAIT_FOR_INFO, @@ -107,7 +109,6 @@ public class GossipGirlState { public void handleHejka(HejkaMessage message) { switch (state) { case APPLY_HEJKA: - System.out.println("setting sender gossip id to " + message.getSenderGossipId()); theirGossipId = message.getSenderGossipId(); theirZoneTimestamps = message.getZoneTimestamps(); theirQueryTimestamps = message.getQueryTimestamps(); @@ -262,16 +263,19 @@ public class GossipGirlState { case SEND_INFO: state = State.WAIT_FOR_INFO; break; + case SEND_INFO_AND_FINISH: + state = State.FINISHED; + break; default: System.out.println("ERROR: tried to set gossip state when not expected"); state = State.ERROR; } } - public void gotAttributesFor(PathName path) { + public void gotAttributes(AttributesMessage message) { switch (state) { case WAIT_FOR_INFO: - if (!waitingForZones.remove(path)) { + if (!waitingForZones.remove(message.getPath())) { System.out.println("DEBUG: got zone attributes we weren't expecting"); } if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { @@ -279,6 +283,21 @@ public class GossipGirlState { state = state.FINISHED; } break; + case WAIT_FOR_FIRST_INFO: + // TODO: use offset to setup GTP + setZonesToSend(); + setQueriesToSend(); + setWaitingFor(); + state = State.SEND_INFO; + + if (!waitingForZones.remove(message.getPath())) { + System.out.println("DEBUG: got zone attributes we weren't expecting"); + } + if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { + System.out.println("INFO: done waiting for info"); + state = state.SEND_INFO_AND_FINISH; + } + break; default: System.out.println("ERROR: got attributes when not expected"); state = State.ERROR; diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java index 170b76e..37fdbe7 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java @@ -312,7 +312,7 @@ public class GossipGirlTest { @Test public void receiverSendsNoCoTamOnState() throws Exception { gossipGirl.handleTyped(hejkaMessage); - executor.messagesToPass.poll(); + executor.messagesToPass.take(); gossipGirl.handleTyped(stateMessage); AgentMessage receivedMessage = executor.messagesToPass.poll(); @@ -337,6 +337,31 @@ public class GossipGirlTest { assertThat(querySet, hasItems(new Attribute("&query"))); } + @Test + public void receiverSendsInfoOnFirstReceivedInfo() throws Exception { + gossipGirl.handleTyped(hejkaMessage); + executor.messagesToPass.take(); + gossipGirl.handleTyped(stateMessage); + executor.messagesToPass.take(); + + gossipGirl.handleTyped(attributesMessage1); + + // 3 ZMIs, 2 queries, 1 own attributes update + assertEquals(6, executor.messagesToPass.size()); + + AgentMessage receivedMessage1 = executor.messagesToPass.poll(); + assertAttributeMessage(receivedMessage1, "/son/bro", "/daughter"); + AgentMessage receivedMessage2 = executor.messagesToPass.poll(); + assertAttributeMessage(receivedMessage2, "/son/bro", "/son/sis"); + AgentMessage receivedMessage3 = executor.messagesToPass.poll(); + assertAttributeMessage(receivedMessage3, "/son/bro", "/son/grand"); + + AgentMessage receivedMessage4 = executor.messagesToPass.poll(); + assertQueryMessage(receivedMessage4, "/son/bro", "&two", "SELECT 2 AS two"); + AgentMessage receivedMessage5 = executor.messagesToPass.poll(); + assertQueryMessage(receivedMessage5, "/son/bro", "&query", "SELECT sum(foo) AS foo"); + } + private void assertQueryMessage(AgentMessage message, String recipientPath, String name, String query) throws Exception { assertUDUPMessage( message, -- cgit v1.2.3