m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/pl/edu/mimuw/cloudatlas/agent
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java4
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java1
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java9
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java9
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java25
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java82
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java45
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java77
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java2
9 files changed, 210 insertions, 44 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
index 38d764a..e9bbf4e 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
@@ -3,6 +3,7 @@ package pl.edu.mimuw.cloudatlas.agent;
import pl.edu.mimuw.cloudatlas.agent.modules.*;
import pl.edu.mimuw.cloudatlas.agent.modules.Module;
import pl.edu.mimuw.cloudatlas.api.Api;
+import pl.edu.mimuw.cloudatlas.model.PathName;
import java.net.InetAddress;
import java.net.SocketException;
@@ -43,11 +44,12 @@ public class AgentConfig {
Integer timeout = Integer.getInteger("UDUPServer.timeout");
Integer bufsize = Integer.getInteger("UDUPServer.bufsize");
InetAddress serverAddr = InetAddress.getByName(System.getProperty("UDUPServer.hostname"));
+ String ourPath = System.getProperty("zone_path");
HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>();
modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER));
modules.put(ModuleType.RMI, new Remik());
- modules.put(ModuleType.STATE, new Stanik(freshnessPeriod));
+ modules.put(ModuleType.STATE, new Stanik(new PathName(ourPath), freshnessPeriod));
modules.put(ModuleType.QUERY, new Qurnik());
modules.put(ModuleType.GOSSIP, new GossipGirl());
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
index 0e972f4..3d3d279 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
@@ -49,7 +49,6 @@ public class EventBus implements Runnable {
}
public void routeMessage(AgentMessage msg) throws InterruptedException {
- System.out.println("Event bus routing message");
executors.get(msg.getDestinationModule()).addMessage(msg);
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java
index e4e3cb7..e3bd390 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java
@@ -3,18 +3,21 @@ package pl.edu.mimuw.cloudatlas.agent.messages;
import java.util.Map;
import pl.edu.mimuw.cloudatlas.model.AttributesMap;
+import pl.edu.mimuw.cloudatlas.model.ValueDuration;
import pl.edu.mimuw.cloudatlas.model.PathName;
public class AttributesMessage extends RemoteGossipGirlMessage {
private PathName path;
private AttributesMap attributes;
private long receiverGossipId;
+ private ValueDuration offset;
- public AttributesMessage(String messageId, long timestamp, PathName path, AttributesMap attributes, long receiverGossipId) {
+ public AttributesMessage(String messageId, long timestamp, PathName path, AttributesMap attributes, long receiverGossipId, ValueDuration offset) {
super(messageId, timestamp, Type.ATTRIBUTES);
this.path = path;
this.attributes = attributes;
this.receiverGossipId = receiverGossipId;
+ this.offset = offset;
}
private AttributesMessage() {}
@@ -30,4 +33,8 @@ public class AttributesMessage extends RemoteGossipGirlMessage {
public long getReceiverGossipId() {
return receiverGossipId;
}
+
+ public ValueDuration getOffset() {
+ return offset;
+ }
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java
index e457c21..77a2068 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java
@@ -1,18 +1,21 @@
package pl.edu.mimuw.cloudatlas.agent.messages;
import pl.edu.mimuw.cloudatlas.model.Attribute;
+import pl.edu.mimuw.cloudatlas.model.ValueDuration;
import pl.edu.mimuw.cloudatlas.model.ValueQuery;
public class QueryMessage extends RemoteGossipGirlMessage {
private Attribute name;
private ValueQuery query;
private long receiverGossipId;
+ private ValueDuration offset;
- public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId) {
+ public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId, ValueDuration offset) {
super(messageId, timestamp, Type.QUERY);
this.name = name;
this.query = query;
this.receiverGossipId = receiverGossipId;
+ this.offset = offset;
}
public QueryMessage() {}
@@ -28,4 +31,8 @@ public class QueryMessage extends RemoteGossipGirlMessage {
public long getReceiverGossipId() {
return receiverGossipId;
}
+
+ public ValueDuration getOffset() {
+ return offset;
+ }
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
index e795b83..5199e82 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
@@ -37,6 +37,7 @@ public class GossipGirl extends Module {
}
public void handleTyped(GossipGirlMessage message) throws InterruptedException, InvalidMessageType {
+ System.out.println("INFO: got GossipGirlMessage " + message.getType());
switch(message.getType()) {
case INITIATE:
initiateGossip((InitiateGossipMessage) message);
@@ -64,6 +65,7 @@ public class GossipGirl extends Module {
public void handleTyped(ResponseMessage message) throws InterruptedException, InvalidMessageType {
switch(message.getType()) {
case STATE:
+ System.out.println("INFO: GossipGirl got a StateMessage");
setState((StateMessage) message);
break;
default:
@@ -77,6 +79,7 @@ public class GossipGirl extends Module {
gossipStates.put(gossipId, new GossipGirlState(gossipId, message.getOurPath(), message.getTheirContact(), true));
GetStateMessage getState = new GetStateMessage("", 0, ModuleType.GOSSIP, gossipId);
+ System.out.println("INFO: GossipGirl sending GetStateMessage when initiating");
sendMessage(getState);
}
@@ -94,12 +97,14 @@ public class GossipGirl extends Module {
gossipStates.get(gossipId).handleHejka(message);
GetStateMessage getState = new GetStateMessage("", 0, ModuleType.GOSSIP, gossipId);
+ System.out.println("INFO: GossipGirl sending GetStateMessage when responding");
sendMessage(getState);
}
private void setState(StateMessage message) throws InterruptedException {
GossipGirlState state = gossipStates.get(message.getRequestId());
if (state != null) {
+ System.out.println("INFO: setting state in gossip " + Long.toString(message.getRequestId()));
state.setLastAction();
state.setState(message.getZMI(), message.getQueries());
if (state.state == GossipGirlState.State.SEND_HEJKA) {
@@ -113,6 +118,7 @@ public class GossipGirl extends Module {
state.getQueryTimestampsToSend()
);
UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, hejka);
+ System.out.println("INFO: GossipGirl sending HejkaMessage");
sendMessage(udupMessage);
state.sentHejka();
} else if (state.state == GossipGirlState.State.SEND_NO_CO_TAM) {
@@ -127,6 +133,7 @@ public class GossipGirl extends Module {
state.hejkaReceiveTimestamp
);
UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, noCoTam);
+ System.out.println("INFO: GossipGirl sending NoCoTamMessage");
sendMessage(udupMessage);
state.sentNoCoTam();
}
@@ -138,24 +145,30 @@ public class GossipGirl extends Module {
private void handleNoCoTam(NoCoTamMessage message) throws InterruptedException {
GossipGirlState state = gossipStates.get(message.getReceiverGossipId());
if (state != null) {
+ System.out.println("INFO: handling NoCoTamMessage in" + Long.toString(message.getReceiverGossipId()));
state.setLastAction();
state.handleNoCoTam(message);
+ System.out.println("DEBUG: handled NoCoTam in GossipGirlState");
sendInfo(state);
+ System.out.println("DEBUG: sent info after NoCoTam");
} else {
System.out.println("ERROR: GossipGirl got state for a nonexistent gossip");
}
}
private void sendInfo(GossipGirlState state) throws InterruptedException {
+ System.out.println("DEBUG: about to send info");
for (ZMI zmi : state.getZMIsToSend()) {
- AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId);
+ AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId, state.offset);
UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage);
+ System.out.println("INFO: GossipGirl sending AttributesMessage");
sendMessage(udupMessage);
}
for (Entry<Attribute, ValueQuery> query : state.getQueriesToSend()) {
- QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId);
+ QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId, state.offset);
UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage);
+ System.out.println("INFO: GossipGirl sending QueryMessage");
sendMessage(udupMessage);
}
state.sentInfo();
@@ -164,12 +177,14 @@ public class GossipGirl extends Module {
private void handleAttributes(AttributesMessage message) throws InterruptedException {
GossipGirlState state = gossipStates.get(message.getReceiverGossipId());
if (state != null) {
+ System.out.println("INFO: handling Attributes in " + Long.toString(message.getReceiverGossipId()));
state.setLastAction();
state.gotAttributes(message);
if (state.state == GossipGirlState.State.SEND_INFO || state.state == GossipGirlState.State.SEND_INFO_AND_FINISH) {
sendInfo(state);
}
- UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes());
+ UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), state.modifyAttributes(message.getAttributes()));
+ System.out.println("INFO: GossipGirl sending UpdateAttributesMessage");
sendMessage(updateMessage);
if (state.state == GossipGirlState.State.FINISHED) {
gossipStates.remove(message.getReceiverGossipId());
@@ -182,14 +197,16 @@ public class GossipGirl extends Module {
private void handleQuery(QueryMessage message) throws InterruptedException {
GossipGirlState state = gossipStates.get(message.getReceiverGossipId());
if (state != null) {
+ System.out.println("INFO: handling Query in " + Long.toString(message.getReceiverGossipId()));
state.setLastAction();
- state.gotQuery(message.getName());
+ state.gotQuery(message);
Map<Attribute, Entry<ValueQuery, ValueTime>> queries = new HashMap();
queries.put(
message.getName(),
new SimpleImmutableEntry(message.getQuery(), state.getTheirQueryTimestamp(message.getName()))
);
UpdateQueriesMessage updateMessage = new UpdateQueriesMessage("", 0, queries);
+ System.out.println("INFO: GossipGirl sending UpdateQueriesMessage");
sendMessage(updateMessage);
if (state.state == GossipGirlState.State.FINISHED) {
gossipStates.remove(message.getReceiverGossipId());
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
index 70d57d9..0525f41 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
@@ -13,9 +13,13 @@ import java.util.Set;
import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.QueryMessage;
import pl.edu.mimuw.cloudatlas.model.Attribute;
+import pl.edu.mimuw.cloudatlas.model.AttributesMap;
import pl.edu.mimuw.cloudatlas.model.PathName;
import pl.edu.mimuw.cloudatlas.model.ValueContact;
+import pl.edu.mimuw.cloudatlas.model.ValueDuration;
+import pl.edu.mimuw.cloudatlas.model.ValueInt;
import pl.edu.mimuw.cloudatlas.model.ValueQuery;
import pl.edu.mimuw.cloudatlas.model.ValueTime;
import pl.edu.mimuw.cloudatlas.model.ValueUtils;
@@ -48,18 +52,22 @@ public class GossipGirlState {
public ValueTime hejkaSendTimestamp;
public ValueTime hejkaReceiveTimestamp;
public ValueTime noCoTamSendTimestamp;
- public ValueTime noCoTamSendReceiveTimestamp;
+ public ValueTime noCoTamReceiveTimestamp;
+ public ValueDuration offset;
private Map<PathName, ValueTime> theirZoneTimestamps;
private Map<Attribute, ValueTime> theirQueryTimestamps;
private List<PathName> zonesToSend;
private List<Attribute> queriesToSend;
private Set<PathName> waitingForZones;
private Set<Attribute> waitingForQueries;
+ private boolean initiating;
public GossipGirlState(long gossipId, PathName ourPath, ValueContact theirContact, boolean initiating) {
this.gossipId = gossipId;
this.ourPath = ourPath;
this.theirContact = theirContact;
+ this.initiating = initiating;
+ System.out.println("INFO: initializing Gossip state, their contact " + theirContact.toString());
if (initiating) {
state = State.WAIT_FOR_STATE_INITIALIZER;
} else {
@@ -129,16 +137,23 @@ public class GossipGirlState {
}
public void handleNoCoTam(NoCoTamMessage message) {
+ System.out.println("DEBUG: in GossipGirlState handleNoCoTam");
switch (state) {
case WAIT_FOR_NO_CO_TAM:
+ System.out.println("DEBUG: lets do this");
theirGossipId = message.getSenderGossipId();
theirZoneTimestamps = message.getZoneTimestamps();
theirQueryTimestamps = message.getQueryTimestamps();
hejkaSendTimestamp = message.getHejkaSendTimestamp();
hejkaReceiveTimestamp = message.getHejkaReceiveTimestamp();
+ noCoTamSendTimestamp = message.getSentTimestamp();
+ noCoTamReceiveTimestamp = message.getReceivedTimestamp();
+ computeOffset();
+ System.out.println("DEBUG: set basic stuff");
setZonesToSend();
setQueriesToSend();
setWaitingFor();
+ System.out.println("DEBUG: set big stuff");
state = State.SEND_INFO;
break;
default:
@@ -147,6 +162,23 @@ public class GossipGirlState {
}
}
+ public void computeOffset() {
+ ValueDuration rtd = (ValueDuration) (noCoTamReceiveTimestamp.subtract(hejkaSendTimestamp))
+ .subtract(noCoTamSendTimestamp.subtract(hejkaReceiveTimestamp));
+ offset = (ValueDuration) (noCoTamSendTimestamp.addValue(rtd.divide(new ValueInt(2l))))
+ .subtract(noCoTamReceiveTimestamp);
+ System.out.println("INFO: GossipGirlState calculated offset: " + offset.toString());
+ }
+
+ public AttributesMap modifyAttributes(AttributesMap attributes) {
+ ValueDuration delta = offset;
+ if (!initiating) {
+ delta = delta.negate();
+ }
+ attributes.addOrChange("timestamp", attributes.getOrNull("timestamp").subtract(delta));
+ return attributes;
+ }
+
private void setWaitingFor() {
setWaitingForZones();
setWaitingForQueries();
@@ -168,6 +200,8 @@ public class GossipGirlState {
public Map<PathName, ValueTime> getZoneTimestampsToSend() {
Map<PathName, ValueTime> timestamps = new HashMap();
+ System.out.println("Getting zone timestamps to send to " + theirContact.getName().toString());
+ System.out.println("hierarchy is " + hierarchy.toString());
collectZoneTimestamps(timestamps, hierarchy, theirContact.getName());
return timestamps;
}
@@ -183,12 +217,14 @@ public class GossipGirlState {
public void setZonesToSend() {
zonesToSend = new LinkedList();
+ System.out.println("DEBUG: timestamps to send: " + getZoneTimestampsToSend().toString());
for (Entry<PathName, ValueTime> timestampedPath : getZoneTimestampsToSend().entrySet()) {
ValueTime theirTimestamp = theirZoneTimestamps.get(timestampedPath.getKey());
if (theirTimestamp == null || ValueUtils.valueLower(theirTimestamp, timestampedPath.getValue())) {
zonesToSend.add(timestampedPath.getKey());
}
}
+ System.out.println("DEBUG: zones to send: " + zonesToSend.toString());
}
public void setQueriesToSend() {
@@ -199,6 +235,7 @@ public class GossipGirlState {
queriesToSend.add(timestampedQuery.getKey());
}
}
+ System.out.println("DEBUG: Queries to send: " + queriesToSend.toString());
}
public List<ZMI> getZMIsToSend() {
@@ -227,6 +264,7 @@ public class GossipGirlState {
}
public void collectZoneTimestamps(Map<PathName, ValueTime> timestamps, ZMI currentZMI, PathName recipientPath) {
+ System.out.println("collecting timestamps, on " + currentZMI.getPathName().toString());
for (ZMI zmi : currentZMI.getSons()) {
if (interestedIn(recipientPath, zmi.getPathName())) {
ValueTime timestamp = (ValueTime) zmi.getAttributes().getOrNull("timestamp");
@@ -242,26 +280,7 @@ public class GossipGirlState {
}
public boolean interestedIn(PathName recipientPath, PathName zmiPath) {
- return isPrefix(zmiPath.levelUp(), recipientPath) && !isPrefix(zmiPath, recipientPath);
- }
-
- public boolean isPrefix(PathName prefix, PathName path) {
- List<String> prefixComponents = prefix.getComponents();
- List<String> pathComponents = path.getComponents();
-
- if (prefixComponents.size() > pathComponents.size()) {
- return false;
- }
-
- Iterator<String> prefixIterator = prefixComponents.iterator();
- Iterator<String> pathIterator = pathComponents.iterator();
-
- while (prefixIterator.hasNext()) {
- if (!prefixIterator.next().equals(pathIterator.next())) {
- return false;
- }
- }
- return true;
+ return ValueUtils.isPrefix(zmiPath.levelUp(), recipientPath) && !ValueUtils.isPrefix(zmiPath, recipientPath);
}
public void sentInfo() {
@@ -294,6 +313,7 @@ public class GossipGirlState {
setZonesToSend();
setQueriesToSend();
setWaitingFor();
+ offset = message.getOffset();
state = State.SEND_INFO;
if (!waitingForZones.remove(message.getPath())) {
@@ -310,10 +330,26 @@ public class GossipGirlState {
}
}
- public void gotQuery(Attribute name) {
+ public void gotQuery(QueryMessage message) {
switch (state) {
+ case WAIT_FOR_FIRST_INFO:
+ // TODO: use offset to setup GTP
+ offset = message.getOffset();
+ setZonesToSend();
+ setQueriesToSend();
+ setWaitingFor();
+ state = State.SEND_INFO;
+
+ if (!waitingForQueries.remove(message.getName())) {
+ System.out.println("DEBUG: got query we weren't expecting");
+ }
+ if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) {
+ System.out.println("INFO: done waiting for info");
+ state = state.SEND_INFO_AND_FINISH;
+ }
+ break;
case WAIT_FOR_INFO:
- if (!waitingForQueries.remove(name)) {
+ if (!waitingForQueries.remove(message.getName())) {
System.out.println("DEBUG: got query we weren't expecting");
}
if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
index 999c193..6e7d4dc 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
@@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules;
import java.nio.file.Path;
import java.util.*;
+import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Map.Entry;
import pl.edu.mimuw.cloudatlas.agent.messages.*;
@@ -19,19 +20,39 @@ public class Stanik extends Module {
private long freshnessPeriod;
private Set<ValueContact> contacts;
private ValueTime contactsTimestamp;
+ private PathName ourPath;
- public Stanik(long freshnessPeriod) {
+ public Stanik(PathName ourPath, long freshnessPeriod) {
super(ModuleType.STATE);
+ this.ourPath = ourPath;
hierarchy = new ZMI();
queries = new HashMap<Attribute, Entry<ValueQuery, ValueTime>>();
hierarchy.getAttributes().add("timestamp", new ValueTime(0l));
this.freshnessPeriod = freshnessPeriod;
this.contactsTimestamp = ValueUtils.currentTime();
this.contacts = new HashSet<>();
+ setDefaultQueries();
}
- public Stanik() {
- this(60 * 1000);
+ private void setDefaultQueries() {
+ String cardinalityQuery = "SELECT sum(cardinality) AS cardinality";
+ String contactsQuery = "SELECT random(5, unfold(contacts)) AS contacts";
+
+ setDefaultQuery("&cardinality", cardinalityQuery);
+ setDefaultQuery("&contacts", contactsQuery);
+ }
+
+ private void setDefaultQuery(String name, String query) {
+ try {
+ ValueQuery queryValue = new ValueQuery(query);
+ queries.put(new Attribute(name), new SimpleImmutableEntry(queryValue, new ValueTime(0l)));
+ } catch (Exception e) {
+ System.out.println("ERROR: failed to compile default query");
+ }
+ }
+
+ public Stanik(PathName ourPath) {
+ this(ourPath, 60 * 1000);
}
public void handleTyped(StanikMessage message) throws InterruptedException, InvalidMessageType {
@@ -61,6 +82,7 @@ public class Stanik extends Module {
public void handleGetState(GetStateMessage message) throws InterruptedException {
pruneHierarchy();
+ addValues();
StateMessage response = new StateMessage(
"",
message.getRequestingModule(),
@@ -78,6 +100,23 @@ public class Stanik extends Module {
pruneZMI(hierarchy, now);
}
+ private void addValues() {
+ addValuesRecursive(hierarchy, 0);
+ }
+
+ private void addValuesRecursive(ZMI zmi, long level) {
+ zmi.getAttributes().addOrChange("level", new ValueInt(level));
+ if (ValueUtils.isPrefix(zmi.getPathName(), ourPath)) {
+ zmi.getAttributes().addOrChange("owner", new ValueString(ourPath.toString()));
+ }
+ if (zmi.getPathName().equals(ourPath)) {
+ zmi.getAttributes().addOrChange("cardinality", new ValueInt(1l));
+ }
+ for (ZMI son : zmi.getSons()) {
+ addValuesRecursive(son, level + 1);
+ }
+ }
+
private boolean pruneZMI(ZMI zmi, ValueTime time) {
Value timestamp = zmi.getAttributes().get("timestamp");
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
index f858468..cfaa37e 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
@@ -4,7 +4,9 @@ import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import org.assertj.core.data.MapEntry;
import pl.edu.mimuw.cloudatlas.agent.messages.*;
+import pl.edu.mimuw.cloudatlas.interpreter.query.Absyn.Program;
import pl.edu.mimuw.cloudatlas.model.*;
import java.io.ByteArrayInputStream;
@@ -13,10 +15,8 @@ import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.rmi.Remote;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
+import java.text.DateFormat;
+import java.util.*;
/**
* Serializes classes to and from byte arrays for UDP use
@@ -75,12 +75,16 @@ public class UDUPSerializer {
public void write(Kryo kryo, Output output, Object object) {
ValueList vl = (ValueList) object;
kryo.writeObject(output, ((TypeCollection) vl.getType()).getElementType());
- kryo.writeObject(output, vl.getValue());
+ ArrayList<Value> al = new ArrayList<>();
+ for (Value v : vl.getValue()) {
+ al.add(v);
+ }
+ kryo.writeObject(output, al);
}
@Override
public Object read(Kryo kryo, Input input, Class type) {
- Type t = kryo.readObject(input, Type.class);
+ TypePrimitive t = kryo.readObject(input, TypePrimitive.class);
ArrayList list = kryo.readObject(input, ArrayList.class);
return new ValueList(list, t);
}
@@ -91,25 +95,75 @@ public class UDUPSerializer {
public void write(Kryo kryo, Output output, Object object) {
ValueSet vs = (ValueSet) object;
kryo.writeObject(output, ((TypeCollection) vs.getType()).getElementType());
- kryo.writeObject(output, vs.getValue());
+ HashSet<Value> hs = new HashSet();
+ for (Value v : vs.getValue()) {
+ hs.add(v);
+ }
+ kryo.writeObject(output, hs);
}
@Override
public Object read(Kryo kryo, Input input, Class type) {
- Type t = kryo.readObject(input, Type.class);
+ TypePrimitive t = kryo.readObject(input, TypePrimitive.class);
HashSet set = kryo.readObject(input, HashSet.class);
return new ValueSet(set, t);
}
});
+ kryo.register(AttributesMap.class, new Serializer() {
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ AttributesMap attribMap = (AttributesMap) object;
+ HashMap<Attribute, Value> hashMap = new HashMap<>();
+
+ for (Map.Entry<Attribute, Value> e : attribMap) {
+ hashMap.put(e.getKey(), e.getValue());
+ }
+
+ kryo.writeObject(output, hashMap);
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
+ HashMap<Attribute, Value> hashMap = kryo.readObject(input, HashMap.class);
+ AttributesMap attribMap = new AttributesMap();
+ for (Map.Entry<Attribute, Value> e : hashMap.entrySet()) {
+ attribMap.add(e.getKey(), e.getValue());
+ }
+ return attribMap;
+ }
+ });
+
+ kryo.register(ValueQuery.class, new Serializer() {
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ ValueQuery vq = (ValueQuery) object;
+ kryo.writeObject(output, vq.getCode());
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
+ String code = kryo.readObject(input, String.class);
+ ValueQuery vq = null;
+ try {
+ vq = new ValueQuery(code);
+ } catch (Exception e) {
+ System.out.println("Value query deserialization failed");
+ e.printStackTrace();
+ }
+ return vq;
+ }
+ });
+
// model
kryo.register(Value.class);
kryo.register(ValueBoolean.class);
kryo.register(ValueContact.class);
+ kryo.register(ValueDouble.class);
kryo.register(ValueDuration.class);
+ kryo.register(ValueDouble.class);
kryo.register(ValueInt.class);
kryo.register(ValueNull.class);
- kryo.register(ValueQuery.class);
kryo.register(ValueSet.class);
kryo.register(ValueString.class);
kryo.register(ValueTime.class);
@@ -121,6 +175,7 @@ public class UDUPSerializer {
kryo.register(AttributesUtil.class);
kryo.register(Type.class);
+ kryo.register(Type.PrimaryType.class);
kryo.register(TypeCollection.class);
kryo.register(TypePrimitive.class);
@@ -156,7 +211,10 @@ public class UDUPSerializer {
kryo.register(byte[].class);
kryo.register(LinkedHashMap.class);
kryo.register(HashMap.class);
+ kryo.register(HashSet.class);
kryo.register(ModuleType.class);
+ kryo.register(DateFormat.class);
+ kryo.register(ArrayList.class);
}
public UDUPMessage deserialize(byte[] packetData) {
@@ -169,6 +227,7 @@ public class UDUPSerializer {
public byte[] serialize(UDUPMessage msg) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Output kryoOut = new Output(out);
+ System.out.println("SERIALIZING " + msg.getContent());
kryo.writeObject(kryoOut, msg);
kryoOut.flush();
kryoOut.close();
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index 0c5bc86..3da380c 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -23,7 +23,7 @@ public class UDUPServer implements Runnable {
private final AtomicBoolean running;
public UDUPServer(InetAddress addr, int port, int bufSize) throws SocketException {
- this.socket = new DatagramSocket(port, addr);
+ this.socket = new DatagramSocket(port);
this.address = addr;
this.bufSize = bufSize;
this.partialPackets = new HashMap<>();