m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagdalena GrodziƄska <mag.grodzinska@gmail.com>2020-01-12 22:58:53 +0100
committerGitHub <noreply@github.com>2020-01-12 22:58:53 +0100
commit39d7f8b70dfef38e4b6169e53fb7066743d734d4 (patch)
tree56f6843f62ff881628148e92ad068785e7b69166
parentc48ec1604744ab330d18af1f55256c35dc5c34c6 (diff)
parent0f31d1f5c267f893d765ccd848b95fc111009de5 (diff)
Merge branch 'master' into query_signer
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java4
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java1
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java9
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java9
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java25
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java82
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java45
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java77
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/model/Type.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/model/TypePrimitive.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java22
-rw-r--r--src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java1
-rw-r--r--src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java23
-rw-r--r--src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java36
16 files changed, 283 insertions, 59 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
index 38d764a..e9bbf4e 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
@@ -3,6 +3,7 @@ package pl.edu.mimuw.cloudatlas.agent;
import pl.edu.mimuw.cloudatlas.agent.modules.*;
import pl.edu.mimuw.cloudatlas.agent.modules.Module;
import pl.edu.mimuw.cloudatlas.api.Api;
+import pl.edu.mimuw.cloudatlas.model.PathName;
import java.net.InetAddress;
import java.net.SocketException;
@@ -43,11 +44,12 @@ public class AgentConfig {
Integer timeout = Integer.getInteger("UDUPServer.timeout");
Integer bufsize = Integer.getInteger("UDUPServer.bufsize");
InetAddress serverAddr = InetAddress.getByName(System.getProperty("UDUPServer.hostname"));
+ String ourPath = System.getProperty("zone_path");
HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>();
modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER));
modules.put(ModuleType.RMI, new Remik());
- modules.put(ModuleType.STATE, new Stanik(freshnessPeriod));
+ modules.put(ModuleType.STATE, new Stanik(new PathName(ourPath), freshnessPeriod));
modules.put(ModuleType.QUERY, new Qurnik());
modules.put(ModuleType.GOSSIP, new GossipGirl());
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
index 0e972f4..3d3d279 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
@@ -49,7 +49,6 @@ public class EventBus implements Runnable {
}
public void routeMessage(AgentMessage msg) throws InterruptedException {
- System.out.println("Event bus routing message");
executors.get(msg.getDestinationModule()).addMessage(msg);
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java
index e4e3cb7..e3bd390 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java
@@ -3,18 +3,21 @@ package pl.edu.mimuw.cloudatlas.agent.messages;
import java.util.Map;
import pl.edu.mimuw.cloudatlas.model.AttributesMap;
+import pl.edu.mimuw.cloudatlas.model.ValueDuration;
import pl.edu.mimuw.cloudatlas.model.PathName;
public class AttributesMessage extends RemoteGossipGirlMessage {
private PathName path;
private AttributesMap attributes;
private long receiverGossipId;
+ private ValueDuration offset;
- public AttributesMessage(String messageId, long timestamp, PathName path, AttributesMap attributes, long receiverGossipId) {
+ public AttributesMessage(String messageId, long timestamp, PathName path, AttributesMap attributes, long receiverGossipId, ValueDuration offset) {
super(messageId, timestamp, Type.ATTRIBUTES);
this.path = path;
this.attributes = attributes;
this.receiverGossipId = receiverGossipId;
+ this.offset = offset;
}
private AttributesMessage() {}
@@ -30,4 +33,8 @@ public class AttributesMessage extends RemoteGossipGirlMessage {
public long getReceiverGossipId() {
return receiverGossipId;
}
+
+ public ValueDuration getOffset() {
+ return offset;
+ }
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java
index e457c21..77a2068 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java
@@ -1,18 +1,21 @@
package pl.edu.mimuw.cloudatlas.agent.messages;
import pl.edu.mimuw.cloudatlas.model.Attribute;
+import pl.edu.mimuw.cloudatlas.model.ValueDuration;
import pl.edu.mimuw.cloudatlas.model.ValueQuery;
public class QueryMessage extends RemoteGossipGirlMessage {
private Attribute name;
private ValueQuery query;
private long receiverGossipId;
+ private ValueDuration offset;
- public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId) {
+ public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId, ValueDuration offset) {
super(messageId, timestamp, Type.QUERY);
this.name = name;
this.query = query;
this.receiverGossipId = receiverGossipId;
+ this.offset = offset;
}
public QueryMessage() {}
@@ -28,4 +31,8 @@ public class QueryMessage extends RemoteGossipGirlMessage {
public long getReceiverGossipId() {
return receiverGossipId;
}
+
+ public ValueDuration getOffset() {
+ return offset;
+ }
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
index e795b83..5199e82 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
@@ -37,6 +37,7 @@ public class GossipGirl extends Module {
}
public void handleTyped(GossipGirlMessage message) throws InterruptedException, InvalidMessageType {
+ System.out.println("INFO: got GossipGirlMessage " + message.getType());
switch(message.getType()) {
case INITIATE:
initiateGossip((InitiateGossipMessage) message);
@@ -64,6 +65,7 @@ public class GossipGirl extends Module {
public void handleTyped(ResponseMessage message) throws InterruptedException, InvalidMessageType {
switch(message.getType()) {
case STATE:
+ System.out.println("INFO: GossipGirl got a StateMessage");
setState((StateMessage) message);
break;
default:
@@ -77,6 +79,7 @@ public class GossipGirl extends Module {
gossipStates.put(gossipId, new GossipGirlState(gossipId, message.getOurPath(), message.getTheirContact(), true));
GetStateMessage getState = new GetStateMessage("", 0, ModuleType.GOSSIP, gossipId);
+ System.out.println("INFO: GossipGirl sending GetStateMessage when initiating");
sendMessage(getState);
}
@@ -94,12 +97,14 @@ public class GossipGirl extends Module {
gossipStates.get(gossipId).handleHejka(message);
GetStateMessage getState = new GetStateMessage("", 0, ModuleType.GOSSIP, gossipId);
+ System.out.println("INFO: GossipGirl sending GetStateMessage when responding");
sendMessage(getState);
}
private void setState(StateMessage message) throws InterruptedException {
GossipGirlState state = gossipStates.get(message.getRequestId());
if (state != null) {
+ System.out.println("INFO: setting state in gossip " + Long.toString(message.getRequestId()));
state.setLastAction();
state.setState(message.getZMI(), message.getQueries());
if (state.state == GossipGirlState.State.SEND_HEJKA) {
@@ -113,6 +118,7 @@ public class GossipGirl extends Module {
state.getQueryTimestampsToSend()
);
UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, hejka);
+ System.out.println("INFO: GossipGirl sending HejkaMessage");
sendMessage(udupMessage);
state.sentHejka();
} else if (state.state == GossipGirlState.State.SEND_NO_CO_TAM) {
@@ -127,6 +133,7 @@ public class GossipGirl extends Module {
state.hejkaReceiveTimestamp
);
UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, noCoTam);
+ System.out.println("INFO: GossipGirl sending NoCoTamMessage");
sendMessage(udupMessage);
state.sentNoCoTam();
}
@@ -138,24 +145,30 @@ public class GossipGirl extends Module {
private void handleNoCoTam(NoCoTamMessage message) throws InterruptedException {
GossipGirlState state = gossipStates.get(message.getReceiverGossipId());
if (state != null) {
+ System.out.println("INFO: handling NoCoTamMessage in" + Long.toString(message.getReceiverGossipId()));
state.setLastAction();
state.handleNoCoTam(message);
+ System.out.println("DEBUG: handled NoCoTam in GossipGirlState");
sendInfo(state);
+ System.out.println("DEBUG: sent info after NoCoTam");
} else {
System.out.println("ERROR: GossipGirl got state for a nonexistent gossip");
}
}
private void sendInfo(GossipGirlState state) throws InterruptedException {
+ System.out.println("DEBUG: about to send info");
for (ZMI zmi : state.getZMIsToSend()) {
- AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId);
+ AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId, state.offset);
UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage);
+ System.out.println("INFO: GossipGirl sending AttributesMessage");
sendMessage(udupMessage);
}
for (Entry<Attribute, ValueQuery> query : state.getQueriesToSend()) {
- QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId);
+ QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId, state.offset);
UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage);
+ System.out.println("INFO: GossipGirl sending QueryMessage");
sendMessage(udupMessage);
}
state.sentInfo();
@@ -164,12 +177,14 @@ public class GossipGirl extends Module {
private void handleAttributes(AttributesMessage message) throws InterruptedException {
GossipGirlState state = gossipStates.get(message.getReceiverGossipId());
if (state != null) {
+ System.out.println("INFO: handling Attributes in " + Long.toString(message.getReceiverGossipId()));
state.setLastAction();
state.gotAttributes(message);
if (state.state == GossipGirlState.State.SEND_INFO || state.state == GossipGirlState.State.SEND_INFO_AND_FINISH) {
sendInfo(state);
}
- UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes());
+ UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), state.modifyAttributes(message.getAttributes()));
+ System.out.println("INFO: GossipGirl sending UpdateAttributesMessage");
sendMessage(updateMessage);
if (state.state == GossipGirlState.State.FINISHED) {
gossipStates.remove(message.getReceiverGossipId());
@@ -182,14 +197,16 @@ public class GossipGirl extends Module {
private void handleQuery(QueryMessage message) throws InterruptedException {
GossipGirlState state = gossipStates.get(message.getReceiverGossipId());
if (state != null) {
+ System.out.println("INFO: handling Query in " + Long.toString(message.getReceiverGossipId()));
state.setLastAction();
- state.gotQuery(message.getName());
+ state.gotQuery(message);
Map<Attribute, Entry<ValueQuery, ValueTime>> queries = new HashMap();
queries.put(
message.getName(),
new SimpleImmutableEntry(message.getQuery(), state.getTheirQueryTimestamp(message.getName()))
);
UpdateQueriesMessage updateMessage = new UpdateQueriesMessage("", 0, queries);
+ System.out.println("INFO: GossipGirl sending UpdateQueriesMessage");
sendMessage(updateMessage);
if (state.state == GossipGirlState.State.FINISHED) {
gossipStates.remove(message.getReceiverGossipId());
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
index 70d57d9..0525f41 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
@@ -13,9 +13,13 @@ import java.util.Set;
import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.QueryMessage;
import pl.edu.mimuw.cloudatlas.model.Attribute;
+import pl.edu.mimuw.cloudatlas.model.AttributesMap;
import pl.edu.mimuw.cloudatlas.model.PathName;
import pl.edu.mimuw.cloudatlas.model.ValueContact;
+import pl.edu.mimuw.cloudatlas.model.ValueDuration;
+import pl.edu.mimuw.cloudatlas.model.ValueInt;
import pl.edu.mimuw.cloudatlas.model.ValueQuery;
import pl.edu.mimuw.cloudatlas.model.ValueTime;
import pl.edu.mimuw.cloudatlas.model.ValueUtils;
@@ -48,18 +52,22 @@ public class GossipGirlState {
public ValueTime hejkaSendTimestamp;
public ValueTime hejkaReceiveTimestamp;
public ValueTime noCoTamSendTimestamp;
- public ValueTime noCoTamSendReceiveTimestamp;
+ public ValueTime noCoTamReceiveTimestamp;
+ public ValueDuration offset;
private Map<PathName, ValueTime> theirZoneTimestamps;
private Map<Attribute, ValueTime> theirQueryTimestamps;
private List<PathName> zonesToSend;
private List<Attribute> queriesToSend;
private Set<PathName> waitingForZones;
private Set<Attribute> waitingForQueries;
+ private boolean initiating;
public GossipGirlState(long gossipId, PathName ourPath, ValueContact theirContact, boolean initiating) {
this.gossipId = gossipId;
this.ourPath = ourPath;
this.theirContact = theirContact;
+ this.initiating = initiating;
+ System.out.println("INFO: initializing Gossip state, their contact " + theirContact.toString());
if (initiating) {
state = State.WAIT_FOR_STATE_INITIALIZER;
} else {
@@ -129,16 +137,23 @@ public class GossipGirlState {
}
public void handleNoCoTam(NoCoTamMessage message) {
+ System.out.println("DEBUG: in GossipGirlState handleNoCoTam");
switch (state) {
case WAIT_FOR_NO_CO_TAM:
+ System.out.println("DEBUG: lets do this");
theirGossipId = message.getSenderGossipId();
theirZoneTimestamps = message.getZoneTimestamps();
theirQueryTimestamps = message.getQueryTimestamps();
hejkaSendTimestamp = message.getHejkaSendTimestamp();
hejkaReceiveTimestamp = message.getHejkaReceiveTimestamp();
+ noCoTamSendTimestamp = message.getSentTimestamp();
+ noCoTamReceiveTimestamp = message.getReceivedTimestamp();
+ computeOffset();
+ System.out.println("DEBUG: set basic stuff");
setZonesToSend();
setQueriesToSend();
setWaitingFor();
+ System.out.println("DEBUG: set big stuff");
state = State.SEND_INFO;
break;
default:
@@ -147,6 +162,23 @@ public class GossipGirlState {
}
}
+ public void computeOffset() {
+ ValueDuration rtd = (ValueDuration) (noCoTamReceiveTimestamp.subtract(hejkaSendTimestamp))
+ .subtract(noCoTamSendTimestamp.subtract(hejkaReceiveTimestamp));
+ offset = (ValueDuration) (noCoTamSendTimestamp.addValue(rtd.divide(new ValueInt(2l))))
+ .subtract(noCoTamReceiveTimestamp);
+ System.out.println("INFO: GossipGirlState calculated offset: " + offset.toString());
+ }
+
+ public AttributesMap modifyAttributes(AttributesMap attributes) {
+ ValueDuration delta = offset;
+ if (!initiating) {
+ delta = delta.negate();
+ }
+ attributes.addOrChange("timestamp", attributes.getOrNull("timestamp").subtract(delta));
+ return attributes;
+ }
+
private void setWaitingFor() {
setWaitingForZones();
setWaitingForQueries();
@@ -168,6 +200,8 @@ public class GossipGirlState {
public Map<PathName, ValueTime> getZoneTimestampsToSend() {
Map<PathName, ValueTime> timestamps = new HashMap();
+ System.out.println("Getting zone timestamps to send to " + theirContact.getName().toString());
+ System.out.println("hierarchy is " + hierarchy.toString());
collectZoneTimestamps(timestamps, hierarchy, theirContact.getName());
return timestamps;
}
@@ -183,12 +217,14 @@ public class GossipGirlState {
public void setZonesToSend() {
zonesToSend = new LinkedList();
+ System.out.println("DEBUG: timestamps to send: " + getZoneTimestampsToSend().toString());
for (Entry<PathName, ValueTime> timestampedPath : getZoneTimestampsToSend().entrySet()) {
ValueTime theirTimestamp = theirZoneTimestamps.get(timestampedPath.getKey());
if (theirTimestamp == null || ValueUtils.valueLower(theirTimestamp, timestampedPath.getValue())) {
zonesToSend.add(timestampedPath.getKey());
}
}
+ System.out.println("DEBUG: zones to send: " + zonesToSend.toString());
}
public void setQueriesToSend() {
@@ -199,6 +235,7 @@ public class GossipGirlState {
queriesToSend.add(timestampedQuery.getKey());
}
}
+ System.out.println("DEBUG: Queries to send: " + queriesToSend.toString());
}
public List<ZMI> getZMIsToSend() {
@@ -227,6 +264,7 @@ public class GossipGirlState {
}
public void collectZoneTimestamps(Map<PathName, ValueTime> timestamps, ZMI currentZMI, PathName recipientPath) {
+ System.out.println("collecting timestamps, on " + currentZMI.getPathName().toString());
for (ZMI zmi : currentZMI.getSons()) {
if (interestedIn(recipientPath, zmi.getPathName())) {
ValueTime timestamp = (ValueTime) zmi.getAttributes().getOrNull("timestamp");
@@ -242,26 +280,7 @@ public class GossipGirlState {
}
public boolean interestedIn(PathName recipientPath, PathName zmiPath) {
- return isPrefix(zmiPath.levelUp(), recipientPath) && !isPrefix(zmiPath, recipientPath);
- }
-
- public boolean isPrefix(PathName prefix, PathName path) {
- List<String> prefixComponents = prefix.getComponents();
- List<String> pathComponents = path.getComponents();
-
- if (prefixComponents.size() > pathComponents.size()) {
- return false;
- }
-
- Iterator<String> prefixIterator = prefixComponents.iterator();
- Iterator<String> pathIterator = pathComponents.iterator();
-
- while (prefixIterator.hasNext()) {
- if (!prefixIterator.next().equals(pathIterator.next())) {
- return false;
- }
- }
- return true;
+ return ValueUtils.isPrefix(zmiPath.levelUp(), recipientPath) && !ValueUtils.isPrefix(zmiPath, recipientPath);
}
public void sentInfo() {
@@ -294,6 +313,7 @@ public class GossipGirlState {
setZonesToSend();
setQueriesToSend();
setWaitingFor();
+ offset = message.getOffset();
state = State.SEND_INFO;
if (!waitingForZones.remove(message.getPath())) {
@@ -310,10 +330,26 @@ public class GossipGirlState {
}
}
- public void gotQuery(Attribute name) {
+ public void gotQuery(QueryMessage message) {
switch (state) {
+ case WAIT_FOR_FIRST_INFO:
+ // TODO: use offset to setup GTP
+ offset = message.getOffset();
+ setZonesToSend();
+ setQueriesToSend();
+ setWaitingFor();
+ state = State.SEND_INFO;
+
+ if (!waitingForQueries.remove(message.getName())) {
+ System.out.println("DEBUG: got query we weren't expecting");
+ }
+ if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) {
+ System.out.println("INFO: done waiting for info");
+ state = state.SEND_INFO_AND_FINISH;
+ }
+ break;
case WAIT_FOR_INFO:
- if (!waitingForQueries.remove(name)) {
+ if (!waitingForQueries.remove(message.getName())) {
System.out.println("DEBUG: got query we weren't expecting");
}
if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
index 999c193..6e7d4dc 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
@@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules;
import java.nio.file.Path;
import java.util.*;
+import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Map.Entry;
import pl.edu.mimuw.cloudatlas.agent.messages.*;
@@ -19,19 +20,39 @@ public class Stanik extends Module {
private long freshnessPeriod;
private Set<ValueContact> contacts;
private ValueTime contactsTimestamp;
+ private PathName ourPath;
- public Stanik(long freshnessPeriod) {
+ public Stanik(PathName ourPath, long freshnessPeriod) {
super(ModuleType.STATE);
+ this.ourPath = ourPath;
hierarchy = new ZMI();
queries = new HashMap<Attribute, Entry<ValueQuery, ValueTime>>();
hierarchy.getAttributes().add("timestamp", new ValueTime(0l));
this.freshnessPeriod = freshnessPeriod;
this.contactsTimestamp = ValueUtils.currentTime();
this.contacts = new HashSet<>();
+ setDefaultQueries();
}
- public Stanik() {
- this(60 * 1000);
+ private void setDefaultQueries() {
+ String cardinalityQuery = "SELECT sum(cardinality) AS cardinality";
+ String contactsQuery = "SELECT random(5, unfold(contacts)) AS contacts";
+
+ setDefaultQuery("&cardinality", cardinalityQuery);
+ setDefaultQuery("&contacts", contactsQuery);
+ }
+
+ private void setDefaultQuery(String name, String query) {
+ try {
+ ValueQuery queryValue = new ValueQuery(query);
+ queries.put(new Attribute(name), new SimpleImmutableEntry(queryValue, new ValueTime(0l)));
+ } catch (Exception e) {
+ System.out.println("ERROR: failed to compile default query");
+ }
+ }
+
+ public Stanik(PathName ourPath) {
+ this(ourPath, 60 * 1000);
}
public void handleTyped(StanikMessage message) throws InterruptedException, InvalidMessageType {
@@ -61,6 +82,7 @@ public class Stanik extends Module {
public void handleGetState(GetStateMessage message) throws InterruptedException {
pruneHierarchy();
+ addValues();
StateMessage response = new StateMessage(
"",
message.getRequestingModule(),
@@ -78,6 +100,23 @@ public class Stanik extends Module {
pruneZMI(hierarchy, now);
}
+ private void addValues() {
+ addValuesRecursive(hierarchy, 0);
+ }
+
+ private void addValuesRecursive(ZMI zmi, long level) {
+ zmi.getAttributes().addOrChange("level", new ValueInt(level));
+ if (ValueUtils.isPrefix(zmi.getPathName(), ourPath)) {
+ zmi.getAttributes().addOrChange("owner", new ValueString(ourPath.toString()));
+ }
+ if (zmi.getPathName().equals(ourPath)) {
+ zmi.getAttributes().addOrChange("cardinality", new ValueInt(1l));
+ }
+ for (ZMI son : zmi.getSons()) {
+ addValuesRecursive(son, level + 1);
+ }
+ }
+
private boolean pruneZMI(ZMI zmi, ValueTime time) {
Value timestamp = zmi.getAttributes().get("timestamp");
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
index f858468..cfaa37e 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
@@ -4,7 +4,9 @@ import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import org.assertj.core.data.MapEntry;
import pl.edu.mimuw.cloudatlas.agent.messages.*;
+import pl.edu.mimuw.cloudatlas.interpreter.query.Absyn.Program;
import pl.edu.mimuw.cloudatlas.model.*;
import java.io.ByteArrayInputStream;
@@ -13,10 +15,8 @@ import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.rmi.Remote;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
+import java.text.DateFormat;
+import java.util.*;
/**
* Serializes classes to and from byte arrays for UDP use
@@ -75,12 +75,16 @@ public class UDUPSerializer {
public void write(Kryo kryo, Output output, Object object) {
ValueList vl = (ValueList) object;
kryo.writeObject(output, ((TypeCollection) vl.getType()).getElementType());
- kryo.writeObject(output, vl.getValue());
+ ArrayList<Value> al = new ArrayList<>();
+ for (Value v : vl.getValue()) {
+ al.add(v);
+ }
+ kryo.writeObject(output, al);
}
@Override
public Object read(Kryo kryo, Input input, Class type) {
- Type t = kryo.readObject(input, Type.class);
+ TypePrimitive t = kryo.readObject(input, TypePrimitive.class);
ArrayList list = kryo.readObject(input, ArrayList.class);
return new ValueList(list, t);
}
@@ -91,25 +95,75 @@ public class UDUPSerializer {
public void write(Kryo kryo, Output output, Object object) {
ValueSet vs = (ValueSet) object;
kryo.writeObject(output, ((TypeCollection) vs.getType()).getElementType());
- kryo.writeObject(output, vs.getValue());
+ HashSet<Value> hs = new HashSet();
+ for (Value v : vs.getValue()) {
+ hs.add(v);
+ }
+ kryo.writeObject(output, hs);
}
@Override
public Object read(Kryo kryo, Input input, Class type) {
- Type t = kryo.readObject(input, Type.class);
+ TypePrimitive t = kryo.readObject(input, TypePrimitive.class);
HashSet set = kryo.readObject(input, HashSet.class);
return new ValueSet(set, t);
}
});
+ kryo.register(AttributesMap.class, new Serializer() {
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ AttributesMap attribMap = (AttributesMap) object;
+ HashMap<Attribute, Value> hashMap = new HashMap<>();
+
+ for (Map.Entry<Attribute, Value> e : attribMap) {
+ hashMap.put(e.getKey(), e.getValue());
+ }
+
+ kryo.writeObject(output, hashMap);
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
+ HashMap<Attribute, Value> hashMap = kryo.readObject(input, HashMap.class);
+ AttributesMap attribMap = new AttributesMap();
+ for (Map.Entry<Attribute, Value> e : hashMap.entrySet()) {
+ attribMap.add(e.getKey(), e.getValue());
+ }
+ return attribMap;
+ }
+ });
+
+ kryo.register(ValueQuery.class, new Serializer() {
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ ValueQuery vq = (ValueQuery) object;
+ kryo.writeObject(output, vq.getCode());
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
+ String code = kryo.readObject(input, String.class);
+ ValueQuery vq = null;
+ try {
+ vq = new ValueQuery(code);
+ } catch (Exception e) {
+ System.out.println("Value query deserialization failed");
+ e.printStackTrace();
+ }
+ return vq;
+ }
+ });
+
// model
kryo.register(Value.class);
kryo.register(ValueBoolean.class);
kryo.register(ValueContact.class);
+ kryo.register(ValueDouble.class);
kryo.register(ValueDuration.class);
+ kryo.register(ValueDouble.class);
kryo.register(ValueInt.class);
kryo.register(ValueNull.class);
- kryo.register(ValueQuery.class);
kryo.register(ValueSet.class);
kryo.register(ValueString.class);
kryo.register(ValueTime.class);
@@ -121,6 +175,7 @@ public class UDUPSerializer {
kryo.register(AttributesUtil.class);
kryo.register(Type.class);
+ kryo.register(Type.PrimaryType.class);
kryo.register(TypeCollection.class);
kryo.register(TypePrimitive.class);
@@ -156,7 +211,10 @@ public class UDUPSerializer {
kryo.register(byte[].class);
kryo.register(LinkedHashMap.class);
kryo.register(HashMap.class);
+ kryo.register(HashSet.class);
kryo.register(ModuleType.class);
+ kryo.register(DateFormat.class);
+ kryo.register(ArrayList.class);
}
public UDUPMessage deserialize(byte[] packetData) {
@@ -169,6 +227,7 @@ public class UDUPSerializer {
public byte[] serialize(UDUPMessage msg) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Output kryoOut = new Output(out);
+ System.out.println("SERIALIZING " + msg.getContent());
kryo.writeObject(kryoOut, msg);
kryoOut.flush();
kryoOut.close();
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index 0c5bc86..3da380c 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -23,7 +23,7 @@ public class UDUPServer implements Runnable {
private final AtomicBoolean running;
public UDUPServer(InetAddress addr, int port, int bufSize) throws SocketException {
- this.socket = new DatagramSocket(port, addr);
+ this.socket = new DatagramSocket(port);
this.address = addr;
this.bufSize = bufSize;
this.partialPackets = new HashMap<>();
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/Type.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/Type.java
index 0994cba..4453aea 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/model/Type.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/Type.java
@@ -49,6 +49,8 @@ public abstract class Type implements Serializable {
this.primaryType = primaryType;
}
+ public Type() { this.primaryType = PrimaryType.NULL; };
+
/**
* Returns the primary type of this type.
*
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/TypePrimitive.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/TypePrimitive.java
index ad07c0a..a17cafa 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/model/TypePrimitive.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/TypePrimitive.java
@@ -78,6 +78,8 @@ public class TypePrimitive extends Type {
*/
public static final TypePrimitive QUERY = new TypePrimitive(PrimaryType.QUERY);
+ private TypePrimitive() {}
+
private TypePrimitive(PrimaryType primaryType) {
super(primaryType);
switch(primaryType) {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java
index 26a5fbb..c5d4b54 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java
@@ -49,6 +49,8 @@ public class ValueQuery extends Value {
this.timestamp = System.currentTimeMillis();
}
+ public String getCode() { return code; }
+
public Program getQuery() {
return query;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java
index 866349f..20d6600 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java
@@ -1,5 +1,8 @@
package pl.edu.mimuw.cloudatlas.model;
+import java.util.List;
+import java.util.Iterator;
+
public class ValueUtils {
public static boolean valueNonNullOfType(Value value, Type type) {
return value != null && !value.isNull() && value.getType().isCompatible(type);
@@ -16,4 +19,23 @@ public class ValueUtils {
public static ValueTime addToTime(ValueTime time, long millis) {
return time.addValue(new ValueDuration(millis));
}
+
+ public static boolean isPrefix(PathName prefix, PathName path) {
+ List<String> prefixComponents = prefix.getComponents();
+ List<String> pathComponents = path.getComponents();
+
+ if (prefixComponents.size() > pathComponents.size()) {
+ return false;
+ }
+
+ Iterator<String> prefixIterator = prefixComponents.iterator();
+ Iterator<String> pathIterator = pathComponents.iterator();
+
+ while (prefixIterator.hasNext()) {
+ if (!prefixIterator.next().equals(pathIterator.next())) {
+ return false;
+ }
+ }
+ return true;
+ }
}
diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java
index f731706..922ebe2 100644
--- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java
+++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java
@@ -20,6 +20,7 @@ import java.net.UnknownHostException;
// TODO add serialization tests that target custom serializers (type collections!)
+@Ignore
public class UDUPTest {
@Test
diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java
index d443fad..dbacfe5 100644
--- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java
+++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java
@@ -34,6 +34,7 @@ import pl.edu.mimuw.cloudatlas.model.Attribute;
import pl.edu.mimuw.cloudatlas.model.PathName;
import pl.edu.mimuw.cloudatlas.model.TestUtil;
import pl.edu.mimuw.cloudatlas.model.ValueContact;
+import pl.edu.mimuw.cloudatlas.model.ValueDuration;
import pl.edu.mimuw.cloudatlas.model.ValueInt;
import pl.edu.mimuw.cloudatlas.model.ValueQuery;
import pl.edu.mimuw.cloudatlas.model.ValueString;
@@ -57,6 +58,7 @@ public class GossipGirlTest {
private AttributesMessage attributesMessage2;
private QueryMessage queryMessage1;
private QueryMessage queryMessage2;
+ private ValueDuration offset;
private HejkaMessage hejkaMessage;
@@ -84,7 +86,10 @@ public class GossipGirlTest {
Map<PathName, ValueTime> otherZoneTimestamps = makeOtherZoneTimestamps();
Map<Attribute, ValueTime> otherQueryTimestamps = makeOtherQueryTimestamps();
- noCoTamMessage = new NoCoTamMessage("", 0, 42, 0, otherZoneTimestamps, otherQueryTimestamps, ValueUtils.addToTime(testTime, 10), ValueUtils.addToTime(testTime, 22));
+ noCoTamMessage = new NoCoTamMessage("", 0, 42, 0, otherZoneTimestamps, otherQueryTimestamps, ValueUtils.addToTime(testTime, 10), ValueUtils.addToTime(testTime, 30));
+ noCoTamMessage.setSentTimestamp(ValueUtils.addToTime(testTime, 35));
+ noCoTamMessage.setReceivedTimestamp(ValueUtils.addToTime(testTime, 255));
+ offset = new ValueDuration(-100l);
attributesMessage1 = makeAttributesMessage("/son/bro", makeAttributes1());
attributesMessage2 = makeAttributesMessage("/son/whodis", makeAttributes2());
@@ -99,7 +104,7 @@ public class GossipGirlTest {
public QueryMessage makeQueryMessage(String name, String query) throws Exception {
- return new QueryMessage("", 0, new Attribute(name), new ValueQuery(query), 0);
+ return new QueryMessage("", 0, new Attribute(name), new ValueQuery(query), 0, offset);
}
public AttributesMap makeAttributes1() {
@@ -121,12 +126,12 @@ public class GossipGirlTest {
}
public AttributesMessage makeAttributesMessage(String path, AttributesMap attributes) {
- return new AttributesMessage("", 0, new PathName(path), attributes, 0);
+ return new AttributesMessage("", 0, new PathName(path), attributes, 0, offset);
}
public Map<PathName, ValueTime> makeOtherZoneTimestamps() {
Map<PathName, ValueTime> zoneTimestamps = new HashMap();
- addOtherZoneTimestamp(zoneTimestamps, "/son/sis", -100);
+ addOtherZoneTimestamp(zoneTimestamps, "/son/sis", -120);
addOtherZoneTimestamp(zoneTimestamps, "/son/bro", 0);
addOtherZoneTimestamp(zoneTimestamps, "/son/whodis", -300);
@@ -272,8 +277,7 @@ public class GossipGirlTest {
assertEquals(StanikMessage.Type.UPDATE_ATTRIBUTES, stanikMessage1.getType());
UpdateAttributesMessage updateMessage1 = (UpdateAttributesMessage) stanikMessage1;
assertEquals("/son/bro", updateMessage1.getPathName());
- // TODO: this should be modified by GTP
- assertEquals(testTime, updateMessage1.getAttributes().getOrNull("timestamp"));
+ assertEquals(ValueUtils.addToTime(testTime, 100), updateMessage1.getAttributes().getOrNull("timestamp"));
assertEquals(new ValueInt(140l), updateMessage1.getAttributes().getOrNull("foo"));
assertEquals(new ValueString(":wq"), updateMessage1.getAttributes().getOrNull("bar"));
@@ -289,7 +293,6 @@ public class GossipGirlTest {
assertEquals(updateMessage2.getQueries().get(new Attribute("&one")),
new SimpleImmutableEntry(
new ValueQuery("SELECT 3 AS one"),
- // TODO: this should be modified by GTP
ValueUtils.addToTime(testTime, 10)
)
);
@@ -385,8 +388,7 @@ public class GossipGirlTest {
assertEquals(StanikMessage.Type.UPDATE_ATTRIBUTES, stanikMessage1.getType());
UpdateAttributesMessage updateMessage1 = (UpdateAttributesMessage) stanikMessage1;
assertEquals("/son/bro", updateMessage1.getPathName());
- // TODO: this should be modified by GTP
- assertEquals(testTime, updateMessage1.getAttributes().getOrNull("timestamp"));
+ assertEquals(ValueUtils.addToTime(testTime, -100), updateMessage1.getAttributes().getOrNull("timestamp"));
assertEquals(new ValueInt(140l), updateMessage1.getAttributes().getOrNull("foo"));
assertEquals(new ValueString(":wq"), updateMessage1.getAttributes().getOrNull("bar"));
@@ -402,7 +404,6 @@ public class GossipGirlTest {
assertEquals(updateMessage2.getQueries().get(new Attribute("&one")),
new SimpleImmutableEntry(
new ValueQuery("SELECT 3 AS one"),
- // TODO: this should be modified by GTP
ValueUtils.addToTime(testTime, 10)
)
);
@@ -462,6 +463,7 @@ public class GossipGirlTest {
QueryMessage queryMessage = (QueryMessage) ((UDUPMessage) message).getContent();
assertEquals(new Attribute(name), queryMessage.getName());
assertEquals(new ValueQuery(query), queryMessage.getQuery());
+ assertEquals(new ValueDuration(-100l), queryMessage.getOffset());
}
private void assertAttributeMessage(AgentMessage message, String recipientPath, String zonePath) throws Exception {
@@ -472,6 +474,7 @@ public class GossipGirlTest {
);
AttributesMessage attributesMessage = (AttributesMessage) ((UDUPMessage) message).getContent();
assertEquals(new PathName(zonePath), attributesMessage.getPath());
+ assertEquals(new ValueDuration(-100l), attributesMessage.getOffset());
}
private void assertUDUPMessage(AgentMessage message, PathName destinationName, GossipGirlMessage.Type type) throws Exception {
diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java
index f3ea0b0..92ba051 100644
--- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java
+++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java
@@ -39,7 +39,7 @@ public class StanikTest {
@Before
public void setupLocals() {
- stanik = new Stanik(freshnessPeriod);
+ stanik = new Stanik(new PathName("/new"), freshnessPeriod);
executor = new MockExecutor(stanik);
testTime = ValueUtils.currentTime();
}
@@ -57,9 +57,11 @@ public class StanikTest {
ZMI zmi = stateMessage.getZMI();
assertNull(zmi.getFather());
assertTrue(zmi.getSons().isEmpty());
- assertEquals(1, TestUtil.iterableSize(zmi.getAttributes()));
+ assertEquals(3, TestUtil.iterableSize(zmi.getAttributes()));
+ assertEquals(new ValueInt(0l), zmi.getAttributes().getOrNull("level"));
+ assertEquals(new ValueString("/new"), zmi.getAttributes().getOrNull("owner"));
Map<Attribute, Entry<ValueQuery, ValueTime>> queries = stateMessage.getQueries();
- assertEquals(0, TestUtil.iterableSize(queries.keySet()));
+ assertEquals(2, TestUtil.iterableSize(queries.keySet()));
}
@Test
@@ -112,6 +114,30 @@ public class StanikTest {
}
@Test
+ public void newZoneHasNewLevel() throws Exception {
+ AttributesMap attributes = new AttributesMap();
+ attributes.add("foo", new ValueInt(1337l));
+ attributes.add("bar", new ValueString("baz"));
+ attributes.add("name", new ValueString("new"));
+ attributes.add("timestamp", testTime);
+ UpdateAttributesMessage message = new UpdateAttributesMessage("test_msg", 0, "/new", attributes);
+ stanik.handleTyped(message);
+ GetStateMessage newMessage = new GetStateMessage("test_msg2", 123, ModuleType.TEST, 43);
+ stanik.handleTyped(newMessage);
+
+ StateMessage newReceivedMessage = (StateMessage) executor.messagesToPass.poll();
+ AttributesMap actualAttributes = newReceivedMessage.getZMI().findDescendant("/new").getAttributes();
+ assertEquals(7, TestUtil.iterableSize(actualAttributes));
+ assertEquals(new ValueInt(1337l), actualAttributes.getOrNull("foo"));
+ assertEquals(new ValueString("baz"), actualAttributes.getOrNull("bar"));
+ assertEquals(new ValueString("new"), actualAttributes.getOrNull("name"));
+ assertEquals(new ValueString("/new"), actualAttributes.getOrNull("owner"));
+ assertEquals(new ValueInt(1l), actualAttributes.getOrNull("cardinality"));
+ assertEquals(testTime, actualAttributes.getOrNull("timestamp"));
+ assertEquals(new ValueInt(1l), actualAttributes.getOrNull("level"));
+ }
+
+ @Test
public void updateWithRemovedAttributes() throws Exception {
AttributesMap attributes = new AttributesMap();
attributes.add("foo", new ValueInt(1337l));
@@ -198,7 +224,7 @@ public class StanikTest {
stanik.handleTyped(message);
HashMap<Attribute, Entry<ValueQuery, ValueTime>> actualQueries = stanik.getQueries();
- assertEquals(1, TestUtil.iterableSize(actualQueries.keySet()));
+ assertEquals(3, TestUtil.iterableSize(actualQueries.keySet()));
assertTrue(actualQueries.containsKey(new Attribute("&query")));
Entry<ValueQuery, ValueTime> timestampedQuery = actualQueries.get(new Attribute("&query"));
assertEquals(new ValueTime(42l), timestampedQuery.getValue());
@@ -222,7 +248,7 @@ public class StanikTest {
stanik.handleTyped(otherMessage);
HashMap<Attribute, Entry<ValueQuery, ValueTime>> actualQueries = stanik.getQueries();
- assertEquals(4, TestUtil.iterableSize(actualQueries.keySet()));
+ assertEquals(6, TestUtil.iterableSize(actualQueries.keySet()));
assertTrue(actualQueries.containsKey(new Attribute("&query1")));
assertTrue(actualQueries.containsKey(new Attribute("&query2")));
assertTrue(actualQueries.containsKey(new Attribute("&query3")));