m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcin Chrzanowski <marcin.j.chrzanowski@gmail.com>2020-01-11 12:08:02 +0100
committerMarcin Chrzanowski <marcin.j.chrzanowski@gmail.com>2020-01-11 12:14:43 +0100
commit0fca21f2011958d709a25aa1f4d863c1d646da6e (patch)
tree5dba6d80fde59889101ea4210bf4033b53e18a44
parent89af38da0b95445180440e85bb8248ab910ef9f8 (diff)
Send info back from responder
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java33
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java25
-rw-r--r--src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java27
3 files changed, 68 insertions, 17 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
index 0c02f1f..dd8f0b7 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
@@ -133,27 +133,34 @@ public class GossipGirl extends Module {
GossipGirlState state = gossipStates.get(message.getReceiverGossipId());
if (state != null) {
state.handleNoCoTam(message);
- for (ZMI zmi : state.getZMIsToSend()) {
- AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId);
- UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage);
- sendMessage(udupMessage);
- }
-
- for (Entry<Attribute, ValueQuery> query : state.getQueriesToSend()) {
- QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId);
- UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage);
- sendMessage(udupMessage);
- }
- state.sentInfo();
+ sendInfo(state);
} else {
System.out.println("ERROR: GossipGirl got state for a nonexistent gossip");
}
}
+ private void sendInfo(GossipGirlState state) throws InterruptedException {
+ for (ZMI zmi : state.getZMIsToSend()) {
+ AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId);
+ UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage);
+ sendMessage(udupMessage);
+ }
+
+ for (Entry<Attribute, ValueQuery> query : state.getQueriesToSend()) {
+ QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId);
+ UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage);
+ sendMessage(udupMessage);
+ }
+ state.sentInfo();
+ }
+
private void handleAttributes(AttributesMessage message) throws InterruptedException {
GossipGirlState state = gossipStates.get(message.getReceiverGossipId());
if (state != null) {
- state.gotAttributesFor(message.getPath());
+ state.gotAttributes(message);
+ if (state.state == GossipGirlState.State.SEND_INFO) {
+ sendInfo(state);
+ }
UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes());
sendMessage(updateMessage);
if (state.state == GossipGirlState.State.FINISHED) {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
index 1629914..eafbcca 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
@@ -10,6 +10,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage;
import pl.edu.mimuw.cloudatlas.model.Attribute;
@@ -28,6 +29,7 @@ public class GossipGirlState {
SEND_HEJKA,
SEND_NO_CO_TAM,
SEND_INFO,
+ SEND_INFO_AND_FINISH,
WAIT_FOR_NO_CO_TAM,
WAIT_FOR_FIRST_INFO,
WAIT_FOR_INFO,
@@ -107,7 +109,6 @@ public class GossipGirlState {
public void handleHejka(HejkaMessage message) {
switch (state) {
case APPLY_HEJKA:
- System.out.println("setting sender gossip id to " + message.getSenderGossipId());
theirGossipId = message.getSenderGossipId();
theirZoneTimestamps = message.getZoneTimestamps();
theirQueryTimestamps = message.getQueryTimestamps();
@@ -262,16 +263,19 @@ public class GossipGirlState {
case SEND_INFO:
state = State.WAIT_FOR_INFO;
break;
+ case SEND_INFO_AND_FINISH:
+ state = State.FINISHED;
+ break;
default:
System.out.println("ERROR: tried to set gossip state when not expected");
state = State.ERROR;
}
}
- public void gotAttributesFor(PathName path) {
+ public void gotAttributes(AttributesMessage message) {
switch (state) {
case WAIT_FOR_INFO:
- if (!waitingForZones.remove(path)) {
+ if (!waitingForZones.remove(message.getPath())) {
System.out.println("DEBUG: got zone attributes we weren't expecting");
}
if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) {
@@ -279,6 +283,21 @@ public class GossipGirlState {
state = state.FINISHED;
}
break;
+ case WAIT_FOR_FIRST_INFO:
+ // TODO: use offset to setup GTP
+ setZonesToSend();
+ setQueriesToSend();
+ setWaitingFor();
+ state = State.SEND_INFO;
+
+ if (!waitingForZones.remove(message.getPath())) {
+ System.out.println("DEBUG: got zone attributes we weren't expecting");
+ }
+ if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) {
+ System.out.println("INFO: done waiting for info");
+ state = state.SEND_INFO_AND_FINISH;
+ }
+ break;
default:
System.out.println("ERROR: got attributes when not expected");
state = State.ERROR;
diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java
index 170b76e..37fdbe7 100644
--- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java
+++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java
@@ -312,7 +312,7 @@ public class GossipGirlTest {
@Test
public void receiverSendsNoCoTamOnState() throws Exception {
gossipGirl.handleTyped(hejkaMessage);
- executor.messagesToPass.poll();
+ executor.messagesToPass.take();
gossipGirl.handleTyped(stateMessage);
AgentMessage receivedMessage = executor.messagesToPass.poll();
@@ -337,6 +337,31 @@ public class GossipGirlTest {
assertThat(querySet, hasItems(new Attribute("&query")));
}
+ @Test
+ public void receiverSendsInfoOnFirstReceivedInfo() throws Exception {
+ gossipGirl.handleTyped(hejkaMessage);
+ executor.messagesToPass.take();
+ gossipGirl.handleTyped(stateMessage);
+ executor.messagesToPass.take();
+
+ gossipGirl.handleTyped(attributesMessage1);
+
+ // 3 ZMIs, 2 queries, 1 own attributes update
+ assertEquals(6, executor.messagesToPass.size());
+
+ AgentMessage receivedMessage1 = executor.messagesToPass.poll();
+ assertAttributeMessage(receivedMessage1, "/son/bro", "/daughter");
+ AgentMessage receivedMessage2 = executor.messagesToPass.poll();
+ assertAttributeMessage(receivedMessage2, "/son/bro", "/son/sis");
+ AgentMessage receivedMessage3 = executor.messagesToPass.poll();
+ assertAttributeMessage(receivedMessage3, "/son/bro", "/son/grand");
+
+ AgentMessage receivedMessage4 = executor.messagesToPass.poll();
+ assertQueryMessage(receivedMessage4, "/son/bro", "&two", "SELECT 2 AS two");
+ AgentMessage receivedMessage5 = executor.messagesToPass.poll();
+ assertQueryMessage(receivedMessage5, "/son/bro", "&query", "SELECT sum(foo) AS foo");
+ }
+
private void assertQueryMessage(AgentMessage message, String recipientPath, String name, String query) throws Exception {
assertUDUPMessage(
message,