m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/HejkaMessage.java14
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/NoCoTamMessage.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java106
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java168
-rw-r--r--src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java219
5 files changed, 472 insertions, 37 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/HejkaMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/HejkaMessage.java
index 340d939..62554a5 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/HejkaMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/HejkaMessage.java
@@ -8,12 +8,16 @@ import pl.edu.mimuw.cloudatlas.model.ValueTime;
public class HejkaMessage extends RemoteGossipGirlMessage {
private long senderGossipId;
+ private PathName senderPath;
+ private PathName receiverPath;
private Map<PathName, ValueTime> zoneTimestamps;
private Map<Attribute, ValueTime> queryTimestamps;
- public HejkaMessage(String messageId, long timestamp, long senderGossipId, Map<PathName, ValueTime> zoneTimestamps, Map<Attribute, ValueTime> queryTimestamps) {
+ public HejkaMessage(String messageId, long timestamp, long senderGossipId, PathName senderPath, PathName receiverPath, Map<PathName, ValueTime> zoneTimestamps, Map<Attribute, ValueTime> queryTimestamps) {
super(messageId, timestamp, Type.HEJKA);
this.senderGossipId = senderGossipId;
+ this.senderPath = senderPath;
+ this.receiverPath = receiverPath;
this.zoneTimestamps = zoneTimestamps;
this.queryTimestamps = queryTimestamps;
}
@@ -22,6 +26,14 @@ public class HejkaMessage extends RemoteGossipGirlMessage {
return senderGossipId;
}
+ public PathName getSenderPath() {
+ return senderPath;
+ }
+
+ public PathName getReceiverPath() {
+ return receiverPath;
+ }
+
public Map<PathName, ValueTime> getZoneTimestamps() {
return zoneTimestamps;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/NoCoTamMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/NoCoTamMessage.java
index 3dd0c4d..c23722c 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/NoCoTamMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/NoCoTamMessage.java
@@ -14,7 +14,7 @@ public class NoCoTamMessage extends RemoteGossipGirlMessage {
private ValueTime hejkaSendTimestamp;
private ValueTime hejkaReceiveTimestamp;
- public NoCoTamMessage(String messageId, long timestamp, long receiverGossipId, long senderGossipId, Map<PathName, ValueTime> zoneTimestamps, Map<Attribute, ValueTime> queryTimestamps, ValueTime hejkaSendTimestamp, ValueTime hejkaReceiveTimestamp) {
+ public NoCoTamMessage(String messageId, long timestamp, long senderGossipId, long receiverGossipId, Map<PathName, ValueTime> zoneTimestamps, Map<Attribute, ValueTime> queryTimestamps, ValueTime hejkaSendTimestamp, ValueTime hejkaReceiveTimestamp) {
super(messageId, timestamp, Type.NO_CO_TAM);
this.receiverGossipId = receiverGossipId;
this.senderGossipId = senderGossipId;
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
index 73aef8b..dd8f0b7 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
@@ -1,5 +1,6 @@
package pl.edu.mimuw.cloudatlas.agent.modules;
+import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -14,9 +15,12 @@ import pl.edu.mimuw.cloudatlas.agent.messages.QueryMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.StateMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.UpdateQueriesMessage;
import pl.edu.mimuw.cloudatlas.model.Attribute;
import pl.edu.mimuw.cloudatlas.model.AttributesMap;
import pl.edu.mimuw.cloudatlas.model.PathName;
+import pl.edu.mimuw.cloudatlas.model.ValueContact;
import pl.edu.mimuw.cloudatlas.model.ValueQuery;
import pl.edu.mimuw.cloudatlas.model.ValueTime;
import pl.edu.mimuw.cloudatlas.model.ZMI;
@@ -35,9 +39,18 @@ public class GossipGirl extends Module {
case INITIATE:
initiateGossip((InitiateGossipMessage) message);
break;
+ case HEJKA:
+ receiveGossip((HejkaMessage) message);
+ break;
case NO_CO_TAM:
handleNoCoTam((NoCoTamMessage) message);
break;
+ case ATTRIBUTES:
+ handleAttributes((AttributesMessage) message);
+ break;
+ case QUERY:
+ handleQuery((QueryMessage) message);
+ break;
default:
throw new InvalidMessageType("This type of message cannot be handled by GossipGirl");
}
@@ -62,6 +75,23 @@ public class GossipGirl extends Module {
sendMessage(getState);
}
+ private void receiveGossip(HejkaMessage message) throws InterruptedException {
+ Long gossipId = nextGossipId;
+ nextGossipId++;
+ gossipStates.put(gossipId, new GossipGirlState(
+ gossipId,
+ message.getReceiverPath(),
+ new ValueContact(message.getSenderPath(), message.getSenderAddress()),
+ false
+ )
+ );
+
+ gossipStates.get(gossipId).handleHejka(message);
+
+ GetStateMessage getState = new GetStateMessage("", 0, ModuleType.GOSSIP, gossipId);
+ sendMessage(getState);
+ }
+
private void setState(StateMessage message) throws InterruptedException {
GossipGirlState state = gossipStates.get(message.getRequestId());
if (state != null) {
@@ -71,12 +101,28 @@ public class GossipGirl extends Module {
"",
0,
state.gossipId,
+ state.ourPath,
+ state.theirContact.getName(),
state.getZoneTimestampsToSend(),
state.getQueryTimestampsToSend()
);
UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, hejka);
sendMessage(udupMessage);
state.sentHejka();
+ } else if (state.state == GossipGirlState.State.SEND_NO_CO_TAM) {
+ NoCoTamMessage noCoTam = new NoCoTamMessage(
+ "",
+ 0,
+ state.gossipId,
+ state.theirGossipId,
+ state.getZoneTimestampsToSend(),
+ state.getQueryTimestampsToSend(),
+ state.hejkaSendTimestamp,
+ state.hejkaReceiveTimestamp
+ );
+ UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, noCoTam);
+ sendMessage(udupMessage);
+ state.sentNoCoTam();
}
} else {
System.out.println("ERROR: GossipGirl got state for a nonexistent gossip");
@@ -87,20 +133,60 @@ public class GossipGirl extends Module {
GossipGirlState state = gossipStates.get(message.getReceiverGossipId());
if (state != null) {
state.handleNoCoTam(message);
- for (ZMI zmi : state.getZMIsToSend()) {
- AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId);
- UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage);
- sendMessage(udupMessage);
+ sendInfo(state);
+ } else {
+ System.out.println("ERROR: GossipGirl got state for a nonexistent gossip");
+ }
+ }
+
+ private void sendInfo(GossipGirlState state) throws InterruptedException {
+ for (ZMI zmi : state.getZMIsToSend()) {
+ AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId);
+ UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage);
+ sendMessage(udupMessage);
+ }
+
+ for (Entry<Attribute, ValueQuery> query : state.getQueriesToSend()) {
+ QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId);
+ UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage);
+ sendMessage(udupMessage);
+ }
+ state.sentInfo();
+ }
+
+ private void handleAttributes(AttributesMessage message) throws InterruptedException {
+ GossipGirlState state = gossipStates.get(message.getReceiverGossipId());
+ if (state != null) {
+ state.gotAttributes(message);
+ if (state.state == GossipGirlState.State.SEND_INFO) {
+ sendInfo(state);
+ }
+ UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes());
+ sendMessage(updateMessage);
+ if (state.state == GossipGirlState.State.FINISHED) {
+ gossipStates.remove(message.getReceiverGossipId());
}
+ } else {
+ System.out.println("ERROR: GossipGirl got attributes for a nonexistent gossip");
+ }
+ }
- for (Entry<Attribute, ValueQuery> query : state.getQueriesToSend()) {
- QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId);
- UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage);
- sendMessage(udupMessage);
+ private void handleQuery(QueryMessage message) throws InterruptedException {
+ GossipGirlState state = gossipStates.get(message.getReceiverGossipId());
+ if (state != null) {
+ state.gotQuery(message.getName());
+ Map<Attribute, Entry<ValueQuery, ValueTime>> queries = new HashMap();
+ queries.put(
+ message.getName(),
+ new SimpleImmutableEntry(message.getQuery(), state.getTheirQueryTimestamp(message.getName()))
+ );
+ UpdateQueriesMessage updateMessage = new UpdateQueriesMessage("", 0, queries);
+ sendMessage(updateMessage);
+ if (state.state == GossipGirlState.State.FINISHED) {
+ gossipStates.remove(message.getReceiverGossipId());
}
- state.sentInfo();
} else {
- System.out.println("ERROR: GossipGirl got state for a nonexistent gossip");
+ System.out.println("ERROR: GossipGirl got query for a nonexistent gossip");
}
}
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
index df17957..eafbcca 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
@@ -2,12 +2,16 @@ package pl.edu.mimuw.cloudatlas.agent.modules;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
+import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage;
import pl.edu.mimuw.cloudatlas.model.Attribute;
import pl.edu.mimuw.cloudatlas.model.PathName;
@@ -20,12 +24,16 @@ import pl.edu.mimuw.cloudatlas.model.ZMI;
public class GossipGirlState {
public enum State {
WAIT_FOR_STATE_INITIALIZER,
+ APPLY_HEJKA,
WAIT_FOR_STATE_RESPONDER,
SEND_HEJKA,
+ SEND_NO_CO_TAM,
SEND_INFO,
+ SEND_INFO_AND_FINISH,
WAIT_FOR_NO_CO_TAM,
WAIT_FOR_FIRST_INFO,
WAIT_FOR_INFO,
+ FINISHED,
ERROR
}
public PathName ourPath;
@@ -42,6 +50,10 @@ public class GossipGirlState {
public ValueTime noCoTamSendReceiveTimestamp;
private Map<PathName, ValueTime> theirZoneTimestamps;
private Map<Attribute, ValueTime> theirQueryTimestamps;
+ private List<PathName> zonesToSend;
+ private List<Attribute> queriesToSend;
+ private Set<PathName> waitingForZones;
+ private Set<Attribute> waitingForQueries;
public GossipGirlState(long gossipId, PathName ourPath, ValueContact theirContact, boolean initiating) {
this.gossipId = gossipId;
@@ -50,7 +62,7 @@ public class GossipGirlState {
if (initiating) {
state = State.WAIT_FOR_STATE_INITIALIZER;
} else {
- state = State.WAIT_FOR_STATE_RESPONDER;
+ state = State.APPLY_HEJKA;
}
}
@@ -64,7 +76,7 @@ public class GossipGirlState {
case WAIT_FOR_STATE_RESPONDER:
this.hierarchy = hierarchy;
this.queries = queries;
- state = State.WAIT_FOR_FIRST_INFO;
+ state = State.SEND_NO_CO_TAM;
break;
default:
System.out.println("ERROR: tried to set gossip state when not expected");
@@ -83,6 +95,33 @@ public class GossipGirlState {
}
}
+ public void sentNoCoTam() {
+ switch (state) {
+ case SEND_NO_CO_TAM:
+ state = state.WAIT_FOR_FIRST_INFO;
+ break;
+ default:
+ System.out.println("ERROR: tried to set gossip state when not expected");
+ state = State.ERROR;
+ }
+ }
+
+ public void handleHejka(HejkaMessage message) {
+ switch (state) {
+ case APPLY_HEJKA:
+ theirGossipId = message.getSenderGossipId();
+ theirZoneTimestamps = message.getZoneTimestamps();
+ theirQueryTimestamps = message.getQueryTimestamps();
+ hejkaSendTimestamp = message.getSentTimestamp();
+ hejkaReceiveTimestamp = message.getReceivedTimestamp();
+ state = State.WAIT_FOR_STATE_RESPONDER;
+ break;
+ default:
+ System.out.println("ERROR: tried to set gossip state when not expected");
+ state = State.ERROR;
+ }
+ }
+
public void handleNoCoTam(NoCoTamMessage message) {
switch (state) {
case WAIT_FOR_NO_CO_TAM:
@@ -91,6 +130,9 @@ public class GossipGirlState {
theirQueryTimestamps = message.getQueryTimestamps();
hejkaSendTimestamp = message.getHejkaSendTimestamp();
hejkaReceiveTimestamp = message.getHejkaReceiveTimestamp();
+ setZonesToSend();
+ setQueriesToSend();
+ setWaitingFor();
state = State.SEND_INFO;
break;
default:
@@ -99,6 +141,25 @@ public class GossipGirlState {
}
}
+ private void setWaitingFor() {
+ setWaitingForZones();
+ setWaitingForQueries();
+ }
+
+ private void setWaitingForZones() {
+ waitingForZones = new HashSet(theirZoneTimestamps.keySet());
+ for (PathName path : zonesToSend) {
+ waitingForZones.remove(path);
+ }
+ }
+
+ private void setWaitingForQueries() {
+ waitingForQueries = new HashSet(theirQueryTimestamps.keySet());
+ for (Attribute name : queriesToSend) {
+ waitingForQueries.remove(name);
+ }
+ }
+
public Map<PathName, ValueTime> getZoneTimestampsToSend() {
Map<PathName, ValueTime> timestamps = new HashMap();
collectZoneTimestamps(timestamps, hierarchy, theirContact.getName());
@@ -114,35 +175,48 @@ public class GossipGirlState {
return queryTimestamps;
}
- public List<ZMI> getZMIsToSend() {
- List<ZMI> zmis = new LinkedList();
+ public void setZonesToSend() {
+ zonesToSend = new LinkedList();
for (Entry<PathName, ValueTime> timestampedPath : getZoneTimestampsToSend().entrySet()) {
ValueTime theirTimestamp = theirZoneTimestamps.get(timestampedPath.getKey());
if (theirTimestamp == null || ValueUtils.valueLower(theirTimestamp, timestampedPath.getValue())) {
- System.out.println("going to send " + timestampedPath.getKey().toString());
- try {
- zmis.add(hierarchy.findDescendant(timestampedPath.getKey()));
- } catch (ZMI.NoSuchZoneException e) {
- System.out.println("ERROR: didn't find a zone we wanted to send in getZMIsToSend");
- }
+ zonesToSend.add(timestampedPath.getKey());
}
}
- return zmis;
}
- public List<Entry<Attribute, ValueQuery>> getQueriesToSend() {
- List<Entry<Attribute, ValueQuery>> queryList = new LinkedList();
+ public void setQueriesToSend() {
+ queriesToSend = new LinkedList();
for (Entry<Attribute, ValueTime> timestampedQuery : getQueryTimestampsToSend().entrySet()) {
ValueTime theirTimestamp = theirQueryTimestamps.get(timestampedQuery.getKey());
if (theirTimestamp == null || ValueUtils.valueLower(theirTimestamp, timestampedQuery.getValue())) {
- queryList.add(
- new SimpleImmutableEntry(
- timestampedQuery.getKey(),
- queries.get(timestampedQuery.getKey()).getKey()
- )
- );
+ queriesToSend.add(timestampedQuery.getKey());
}
}
+ }
+
+ public List<ZMI> getZMIsToSend() {
+ List<ZMI> zmis = new LinkedList();
+ for (PathName path : zonesToSend) {
+ try {
+ zmis.add(hierarchy.findDescendant(path));
+ } catch (ZMI.NoSuchZoneException e) {
+ System.out.println("ERROR: didn't find a zone we wanted to send in getZMIsToSend");
+ }
+ }
+ return zmis;
+ }
+
+ public List<Entry<Attribute, ValueQuery>> getQueriesToSend() {
+ List<Entry<Attribute, ValueQuery>> queryList = new LinkedList();
+ for (Attribute name : queriesToSend) {
+ queryList.add(
+ new SimpleImmutableEntry(
+ name,
+ queries.get(name).getKey()
+ )
+ );
+ }
return queryList;
}
@@ -189,9 +263,65 @@ public class GossipGirlState {
case SEND_INFO:
state = State.WAIT_FOR_INFO;
break;
+ case SEND_INFO_AND_FINISH:
+ state = State.FINISHED;
+ break;
default:
System.out.println("ERROR: tried to set gossip state when not expected");
state = State.ERROR;
}
}
+
+ public void gotAttributes(AttributesMessage message) {
+ switch (state) {
+ case WAIT_FOR_INFO:
+ if (!waitingForZones.remove(message.getPath())) {
+ System.out.println("DEBUG: got zone attributes we weren't expecting");
+ }
+ if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) {
+ System.out.println("INFO: done waiting for info");
+ state = state.FINISHED;
+ }
+ break;
+ case WAIT_FOR_FIRST_INFO:
+ // TODO: use offset to setup GTP
+ setZonesToSend();
+ setQueriesToSend();
+ setWaitingFor();
+ state = State.SEND_INFO;
+
+ if (!waitingForZones.remove(message.getPath())) {
+ System.out.println("DEBUG: got zone attributes we weren't expecting");
+ }
+ if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) {
+ System.out.println("INFO: done waiting for info");
+ state = state.SEND_INFO_AND_FINISH;
+ }
+ break;
+ default:
+ System.out.println("ERROR: got attributes when not expected");
+ state = State.ERROR;
+ }
+ }
+
+ public void gotQuery(Attribute name) {
+ switch (state) {
+ case WAIT_FOR_INFO:
+ if (!waitingForQueries.remove(name)) {
+ System.out.println("DEBUG: got query we weren't expecting");
+ }
+ if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) {
+ System.out.println("INFO: done waiting for info");
+ state = state.FINISHED;
+ }
+ break;
+ default:
+ System.out.println("ERROR: got query when not expected");
+ state = State.ERROR;
+ }
+ }
+
+ public ValueTime getTheirQueryTimestamp(Attribute name) {
+ return theirQueryTimestamps.get(name);
+ }
}
diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java
index 7ac27e9..bf6255e 100644
--- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java
+++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java
@@ -24,6 +24,8 @@ import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.StateMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.UpdateQueriesMessage;
import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
import pl.edu.mimuw.cloudatlas.model.AttributesMap;
import pl.edu.mimuw.cloudatlas.model.Attribute;
@@ -47,8 +49,14 @@ public class GossipGirlTest {
private ValueTime testTime;
private ZMI initiatorHierarchy;
private Map<Attribute, Entry<ValueQuery, ValueTime>> initiatorQueries;
- private StateMessage initiatorStateMessage;
+ private StateMessage stateMessage;
private NoCoTamMessage noCoTamMessage;
+ private AttributesMessage attributesMessage1;
+ private AttributesMessage attributesMessage2;
+ private QueryMessage queryMessage1;
+ private QueryMessage queryMessage2;
+
+ private HejkaMessage hejkaMessage;
@Before
public void setupLocals() throws Exception {
@@ -69,12 +77,49 @@ public class GossipGirlTest {
testTime = ValueUtils.currentTime();
setupHierarchy();
setupQueries();
- initiatorStateMessage = new StateMessage("", ModuleType.GOSSIP, 0, 0, initiatorHierarchy, initiatorQueries);
+ stateMessage = new StateMessage("", ModuleType.GOSSIP, 0, 0, initiatorHierarchy, initiatorQueries);
Map<PathName, ValueTime> otherZoneTimestamps = makeOtherZoneTimestamps();
Map<Attribute, ValueTime> otherQueryTimestamps = makeOtherQueryTimestamps();
- noCoTamMessage = new NoCoTamMessage("", 0, 0, 42, otherZoneTimestamps, otherQueryTimestamps, TestUtil.addToTime(testTime, 10), TestUtil.addToTime(testTime, 22));
+ noCoTamMessage = new NoCoTamMessage("", 0, 42, 0, otherZoneTimestamps, otherQueryTimestamps, TestUtil.addToTime(testTime, 10), TestUtil.addToTime(testTime, 22));
+
+ attributesMessage1 = makeAttributesMessage("/son/bro", makeAttributes1());
+ attributesMessage2 = makeAttributesMessage("/son/whodis", makeAttributes2());
+ queryMessage1 = makeQueryMessage("&one", "SELECT 3 AS one");
+ queryMessage2 = makeQueryMessage("&three", "SELECT 3 AS three");
+
+ hejkaMessage = new HejkaMessage("", 0, 123, new PathName("/son/bro"), new PathName("/son/grand"), otherZoneTimestamps, otherQueryTimestamps);
+ hejkaMessage.setSentTimestamp(testTime);
+ hejkaMessage.setReceivedTimestamp(TestUtil.addToTime(testTime, 15));
+ hejkaMessage.setSenderAddress(theirContact.getAddress());
+ }
+
+
+ public QueryMessage makeQueryMessage(String name, String query) throws Exception {
+ return new QueryMessage("", 0, new Attribute(name), new ValueQuery(query), 0);
+ }
+
+ public AttributesMap makeAttributes1() {
+ AttributesMap attributes = new AttributesMap();
+ attributes.add("name", new ValueString("bro"));
+ attributes.add("timestamp", testTime);
+ attributes.add("foo", new ValueInt(140l));
+ attributes.add("bar", new ValueString(":wq"));
+ return attributes;
+ }
+
+ public AttributesMap makeAttributes2() {
+ AttributesMap attributes = new AttributesMap();
+ attributes.add("name", new ValueString("whodis"));
+ attributes.add("timestamp", TestUtil.addToTime(testTime, -300));
+ attributes.add("foo", new ValueInt(61l));
+ attributes.add("bar", new ValueString("nice"));
+ return attributes;
+ }
+
+ public AttributesMessage makeAttributesMessage(String path, AttributesMap attributes) {
+ return new AttributesMessage("", 0, new PathName(path), attributes, 0);
}
public Map<PathName, ValueTime> makeOtherZoneTimestamps() {
@@ -155,7 +200,7 @@ public class GossipGirlTest {
public void initiatorSendsHejkaOnState() throws Exception {
gossipGirl.handleTyped(initiateGossipMessage);
executor.messagesToPass.take();
- gossipGirl.handleTyped(initiatorStateMessage);
+ gossipGirl.handleTyped(stateMessage);
AgentMessage receivedMessage = executor.messagesToPass.poll();
assertUDUPMessage(
@@ -165,7 +210,8 @@ public class GossipGirlTest {
);
HejkaMessage hejkaMessage = (HejkaMessage) ((UDUPMessage) receivedMessage).getContent();
assertEquals(0, hejkaMessage.getSenderGossipId());
- System.out.println(hejkaMessage.getZoneTimestamps().keySet());
+ assertEquals(new PathName("/son/grand"), hejkaMessage.getSenderPath());
+ assertEquals(new PathName("/son/bro"), hejkaMessage.getReceiverPath());
assertEquals(3, TestUtil.iterableSize(hejkaMessage.getZoneTimestamps().keySet()));
Set<PathName> zoneSet = hejkaMessage.getZoneTimestamps().keySet();
assertThat(zoneSet, hasItems(new PathName("/daughter")));
@@ -183,7 +229,7 @@ public class GossipGirlTest {
public void initiatorSendsZonesAndQueriesOnNoCoTam() throws Exception {
gossipGirl.handleTyped(initiateGossipMessage);
executor.messagesToPass.take();
- gossipGirl.handleTyped(initiatorStateMessage);
+ gossipGirl.handleTyped(stateMessage);
executor.messagesToPass.take();
gossipGirl.handleTyped(noCoTamMessage);
@@ -203,6 +249,167 @@ public class GossipGirlTest {
assertQueryMessage(receivedMessage5, "/son/bro", "&query", "SELECT sum(foo) AS foo");
}
+ @Test
+ public void initiatorModifiesStateOnAttributes() throws Exception {
+ gossipGirl.handleTyped(initiateGossipMessage);
+ executor.messagesToPass.take();
+ gossipGirl.handleTyped(stateMessage);
+ executor.messagesToPass.take();
+ gossipGirl.handleTyped(noCoTamMessage);
+ executor.messagesToPass.take();
+ executor.messagesToPass.take();
+ executor.messagesToPass.take();
+ executor.messagesToPass.take();
+ executor.messagesToPass.take();
+
+ gossipGirl.handleTyped(attributesMessage1);
+ AgentMessage receivedMessage1 = executor.messagesToPass.poll();
+ assertNotNull(receivedMessage1);
+ assertEquals(ModuleType.STATE, receivedMessage1.getDestinationModule());
+ StanikMessage stanikMessage1 = (StanikMessage) receivedMessage1;
+ assertEquals(StanikMessage.Type.UPDATE_ATTRIBUTES, stanikMessage1.getType());
+ UpdateAttributesMessage updateMessage1 = (UpdateAttributesMessage) stanikMessage1;
+ assertEquals("/son/bro", updateMessage1.getPathName());
+ // TODO: this should be modified by GTP
+ assertEquals(testTime, updateMessage1.getAttributes().getOrNull("timestamp"));
+ assertEquals(new ValueInt(140l), updateMessage1.getAttributes().getOrNull("foo"));
+ assertEquals(new ValueString(":wq"), updateMessage1.getAttributes().getOrNull("bar"));
+
+ gossipGirl.handleTyped(queryMessage1);
+ AgentMessage receivedMessage2 = executor.messagesToPass.poll();
+ assertNotNull(receivedMessage2);
+ assertEquals(ModuleType.STATE, receivedMessage2.getDestinationModule());
+ StanikMessage stanikMessage2 = (StanikMessage) receivedMessage2;
+ assertEquals(StanikMessage.Type.UPDATE_QUERIES, stanikMessage2.getType());
+ UpdateQueriesMessage updateMessage2 = (UpdateQueriesMessage) stanikMessage2;
+ assertEquals(1, updateMessage2.getQueries().keySet().size());
+ assertThat(updateMessage2.getQueries().keySet(), hasItems(new Attribute("&one")));
+ assertEquals(updateMessage2.getQueries().get(new Attribute("&one")),
+ new SimpleImmutableEntry(
+ new ValueQuery("SELECT 3 AS one"),
+ // TODO: this should be modified by GTP
+ TestUtil.addToTime(testTime, 10)
+ )
+ );
+
+ gossipGirl.handleTyped(attributesMessage2);
+ gossipGirl.handleTyped(queryMessage2);
+ }
+
+ @Test
+ public void getsStateOnHejka() throws Exception {
+ gossipGirl.handleTyped(hejkaMessage);
+
+ AgentMessage receivedMessage = executor.messagesToPass.poll();
+ assertNotNull(receivedMessage);
+ assertEquals(ModuleType.STATE, receivedMessage.getDestinationModule());
+ StanikMessage stanikMessage = (StanikMessage) receivedMessage;
+ assertEquals(StanikMessage.Type.GET_STATE, stanikMessage.getType());
+ GetStateMessage getStateMessage = (GetStateMessage) stanikMessage;
+ assertEquals(ModuleType.GOSSIP, getStateMessage.getRequestingModule());
+ }
+
+ @Test
+ public void receiverSendsNoCoTamOnState() throws Exception {
+ gossipGirl.handleTyped(hejkaMessage);
+ executor.messagesToPass.take();
+
+ gossipGirl.handleTyped(stateMessage);
+ AgentMessage receivedMessage = executor.messagesToPass.poll();
+ assertUDUPMessage(
+ receivedMessage,
+ new PathName("/son/bro"),
+ GossipGirlMessage.Type.NO_CO_TAM
+ );
+ NoCoTamMessage noCoTamMessage = (NoCoTamMessage) ((UDUPMessage) receivedMessage).getContent();
+ assertEquals(0, noCoTamMessage.getSenderGossipId());
+ assertEquals(123, noCoTamMessage.getReceiverGossipId());
+ assertEquals(3, TestUtil.iterableSize(noCoTamMessage.getZoneTimestamps().keySet()));
+ Set<PathName> zoneSet = noCoTamMessage.getZoneTimestamps().keySet();
+ assertThat(zoneSet, hasItems(new PathName("/daughter")));
+ assertThat(zoneSet, hasItems(new PathName("/son/sis")));
+ assertThat(zoneSet, hasItems(new PathName("/son/grand")));
+
+ assertEquals(3, TestUtil.iterableSize(noCoTamMessage.getQueryTimestamps().keySet()));
+ Set<Attribute> querySet = noCoTamMessage.getQueryTimestamps().keySet();
+ assertThat(querySet, hasItems(new Attribute("&one")));
+ assertThat(querySet, hasItems(new Attribute("&two")));
+ assertThat(querySet, hasItems(new Attribute("&query")));
+ }
+
+ @Test
+ public void receiverSendsInfoOnFirstReceivedInfo() throws Exception {
+ gossipGirl.handleTyped(hejkaMessage);
+ executor.messagesToPass.take();
+ gossipGirl.handleTyped(stateMessage);
+ executor.messagesToPass.take();
+
+ gossipGirl.handleTyped(attributesMessage1);
+
+ // 3 ZMIs, 2 queries, 1 own attributes update
+ assertEquals(6, executor.messagesToPass.size());
+
+ AgentMessage receivedMessage1 = executor.messagesToPass.poll();
+ assertAttributeMessage(receivedMessage1, "/son/bro", "/daughter");
+ AgentMessage receivedMessage2 = executor.messagesToPass.poll();
+ assertAttributeMessage(receivedMessage2, "/son/bro", "/son/sis");
+ AgentMessage receivedMessage3 = executor.messagesToPass.poll();
+ assertAttributeMessage(receivedMessage3, "/son/bro", "/son/grand");
+
+ AgentMessage receivedMessage4 = executor.messagesToPass.poll();
+ assertQueryMessage(receivedMessage4, "/son/bro", "&two", "SELECT 2 AS two");
+ AgentMessage receivedMessage5 = executor.messagesToPass.poll();
+ assertQueryMessage(receivedMessage5, "/son/bro", "&query", "SELECT sum(foo) AS foo");
+ }
+
+ @Test
+ public void receiverModifiesStateOnReceivedInfo() throws Exception {
+ gossipGirl.handleTyped(hejkaMessage);
+ executor.messagesToPass.take();
+ gossipGirl.handleTyped(stateMessage);
+ executor.messagesToPass.take();
+
+ gossipGirl.handleTyped(attributesMessage1);
+ executor.messagesToPass.take();
+ executor.messagesToPass.take();
+ executor.messagesToPass.take();
+ executor.messagesToPass.take();
+ executor.messagesToPass.take();
+
+ AgentMessage receivedMessage1 = executor.messagesToPass.poll();
+ assertNotNull(receivedMessage1);
+ assertEquals(ModuleType.STATE, receivedMessage1.getDestinationModule());
+ StanikMessage stanikMessage1 = (StanikMessage) receivedMessage1;
+ assertEquals(StanikMessage.Type.UPDATE_ATTRIBUTES, stanikMessage1.getType());
+ UpdateAttributesMessage updateMessage1 = (UpdateAttributesMessage) stanikMessage1;
+ assertEquals("/son/bro", updateMessage1.getPathName());
+ // TODO: this should be modified by GTP
+ assertEquals(testTime, updateMessage1.getAttributes().getOrNull("timestamp"));
+ assertEquals(new ValueInt(140l), updateMessage1.getAttributes().getOrNull("foo"));
+ assertEquals(new ValueString(":wq"), updateMessage1.getAttributes().getOrNull("bar"));
+
+ gossipGirl.handleTyped(queryMessage1);
+ AgentMessage receivedMessage2 = executor.messagesToPass.poll();
+ assertNotNull(receivedMessage2);
+ assertEquals(ModuleType.STATE, receivedMessage2.getDestinationModule());
+ StanikMessage stanikMessage2 = (StanikMessage) receivedMessage2;
+ assertEquals(StanikMessage.Type.UPDATE_QUERIES, stanikMessage2.getType());
+ UpdateQueriesMessage updateMessage2 = (UpdateQueriesMessage) stanikMessage2;
+ assertEquals(1, updateMessage2.getQueries().keySet().size());
+ assertThat(updateMessage2.getQueries().keySet(), hasItems(new Attribute("&one")));
+ assertEquals(updateMessage2.getQueries().get(new Attribute("&one")),
+ new SimpleImmutableEntry(
+ new ValueQuery("SELECT 3 AS one"),
+ // TODO: this should be modified by GTP
+ TestUtil.addToTime(testTime, 10)
+ )
+ );
+
+ gossipGirl.handleTyped(attributesMessage2);
+ gossipGirl.handleTyped(queryMessage2);
+ assertTrue(false);
+ }
+
private void assertQueryMessage(AgentMessage message, String recipientPath, String name, String query) throws Exception {
assertUDUPMessage(
message,