diff options
author | Magdalena GrodziĆska <mag.grodzinska@gmail.com> | 2020-01-12 22:58:53 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-12 22:58:53 +0100 |
commit | 39d7f8b70dfef38e4b6169e53fb7066743d734d4 (patch) | |
tree | 56f6843f62ff881628148e92ad068785e7b69166 /src | |
parent | c48ec1604744ab330d18af1f55256c35dc5c34c6 (diff) | |
parent | 0f31d1f5c267f893d765ccd848b95fc111009de5 (diff) |
Merge branch 'master' into query_signer
Diffstat (limited to 'src')
16 files changed, 283 insertions, 59 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java index 38d764a..e9bbf4e 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java @@ -3,6 +3,7 @@ package pl.edu.mimuw.cloudatlas.agent; import pl.edu.mimuw.cloudatlas.agent.modules.*; import pl.edu.mimuw.cloudatlas.agent.modules.Module; import pl.edu.mimuw.cloudatlas.api.Api; +import pl.edu.mimuw.cloudatlas.model.PathName; import java.net.InetAddress; import java.net.SocketException; @@ -43,11 +44,12 @@ public class AgentConfig { Integer timeout = Integer.getInteger("UDUPServer.timeout"); Integer bufsize = Integer.getInteger("UDUPServer.bufsize"); InetAddress serverAddr = InetAddress.getByName(System.getProperty("UDUPServer.hostname")); + String ourPath = System.getProperty("zone_path"); HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>(); modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER)); modules.put(ModuleType.RMI, new Remik()); - modules.put(ModuleType.STATE, new Stanik(freshnessPeriod)); + modules.put(ModuleType.STATE, new Stanik(new PathName(ourPath), freshnessPeriod)); modules.put(ModuleType.QUERY, new Qurnik()); modules.put(ModuleType.GOSSIP, new GossipGirl()); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java index 0e972f4..3d3d279 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java @@ -49,7 +49,6 @@ public class EventBus implements Runnable { } public void routeMessage(AgentMessage msg) throws InterruptedException { - System.out.println("Event bus routing message"); executors.get(msg.getDestinationModule()).addMessage(msg); } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java index e4e3cb7..e3bd390 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java @@ -3,18 +3,21 @@ package pl.edu.mimuw.cloudatlas.agent.messages; import java.util.Map; import pl.edu.mimuw.cloudatlas.model.AttributesMap; +import pl.edu.mimuw.cloudatlas.model.ValueDuration; import pl.edu.mimuw.cloudatlas.model.PathName; public class AttributesMessage extends RemoteGossipGirlMessage { private PathName path; private AttributesMap attributes; private long receiverGossipId; + private ValueDuration offset; - public AttributesMessage(String messageId, long timestamp, PathName path, AttributesMap attributes, long receiverGossipId) { + public AttributesMessage(String messageId, long timestamp, PathName path, AttributesMap attributes, long receiverGossipId, ValueDuration offset) { super(messageId, timestamp, Type.ATTRIBUTES); this.path = path; this.attributes = attributes; this.receiverGossipId = receiverGossipId; + this.offset = offset; } private AttributesMessage() {} @@ -30,4 +33,8 @@ public class AttributesMessage extends RemoteGossipGirlMessage { public long getReceiverGossipId() { return receiverGossipId; } + + public ValueDuration getOffset() { + return offset; + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java index e457c21..77a2068 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java @@ -1,18 +1,21 @@ package pl.edu.mimuw.cloudatlas.agent.messages; import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.ValueDuration; import pl.edu.mimuw.cloudatlas.model.ValueQuery; public class QueryMessage extends RemoteGossipGirlMessage { private Attribute name; private ValueQuery query; private long receiverGossipId; + private ValueDuration offset; - public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId) { + public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId, ValueDuration offset) { super(messageId, timestamp, Type.QUERY); this.name = name; this.query = query; this.receiverGossipId = receiverGossipId; + this.offset = offset; } public QueryMessage() {} @@ -28,4 +31,8 @@ public class QueryMessage extends RemoteGossipGirlMessage { public long getReceiverGossipId() { return receiverGossipId; } + + public ValueDuration getOffset() { + return offset; + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index e795b83..5199e82 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -37,6 +37,7 @@ public class GossipGirl extends Module { } public void handleTyped(GossipGirlMessage message) throws InterruptedException, InvalidMessageType { + System.out.println("INFO: got GossipGirlMessage " + message.getType()); switch(message.getType()) { case INITIATE: initiateGossip((InitiateGossipMessage) message); @@ -64,6 +65,7 @@ public class GossipGirl extends Module { public void handleTyped(ResponseMessage message) throws InterruptedException, InvalidMessageType { switch(message.getType()) { case STATE: + System.out.println("INFO: GossipGirl got a StateMessage"); setState((StateMessage) message); break; default: @@ -77,6 +79,7 @@ public class GossipGirl extends Module { gossipStates.put(gossipId, new GossipGirlState(gossipId, message.getOurPath(), message.getTheirContact(), true)); GetStateMessage getState = new GetStateMessage("", 0, ModuleType.GOSSIP, gossipId); + System.out.println("INFO: GossipGirl sending GetStateMessage when initiating"); sendMessage(getState); } @@ -94,12 +97,14 @@ public class GossipGirl extends Module { gossipStates.get(gossipId).handleHejka(message); GetStateMessage getState = new GetStateMessage("", 0, ModuleType.GOSSIP, gossipId); + System.out.println("INFO: GossipGirl sending GetStateMessage when responding"); sendMessage(getState); } private void setState(StateMessage message) throws InterruptedException { GossipGirlState state = gossipStates.get(message.getRequestId()); if (state != null) { + System.out.println("INFO: setting state in gossip " + Long.toString(message.getRequestId())); state.setLastAction(); state.setState(message.getZMI(), message.getQueries()); if (state.state == GossipGirlState.State.SEND_HEJKA) { @@ -113,6 +118,7 @@ public class GossipGirl extends Module { state.getQueryTimestampsToSend() ); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, hejka); + System.out.println("INFO: GossipGirl sending HejkaMessage"); sendMessage(udupMessage); state.sentHejka(); } else if (state.state == GossipGirlState.State.SEND_NO_CO_TAM) { @@ -127,6 +133,7 @@ public class GossipGirl extends Module { state.hejkaReceiveTimestamp ); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, noCoTam); + System.out.println("INFO: GossipGirl sending NoCoTamMessage"); sendMessage(udupMessage); state.sentNoCoTam(); } @@ -138,24 +145,30 @@ public class GossipGirl extends Module { private void handleNoCoTam(NoCoTamMessage message) throws InterruptedException { GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); if (state != null) { + System.out.println("INFO: handling NoCoTamMessage in" + Long.toString(message.getReceiverGossipId())); state.setLastAction(); state.handleNoCoTam(message); + System.out.println("DEBUG: handled NoCoTam in GossipGirlState"); sendInfo(state); + System.out.println("DEBUG: sent info after NoCoTam"); } else { System.out.println("ERROR: GossipGirl got state for a nonexistent gossip"); } } private void sendInfo(GossipGirlState state) throws InterruptedException { + System.out.println("DEBUG: about to send info"); for (ZMI zmi : state.getZMIsToSend()) { - AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId); + AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId, state.offset); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage); + System.out.println("INFO: GossipGirl sending AttributesMessage"); sendMessage(udupMessage); } for (Entry<Attribute, ValueQuery> query : state.getQueriesToSend()) { - QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId); + QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId, state.offset); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage); + System.out.println("INFO: GossipGirl sending QueryMessage"); sendMessage(udupMessage); } state.sentInfo(); @@ -164,12 +177,14 @@ public class GossipGirl extends Module { private void handleAttributes(AttributesMessage message) throws InterruptedException { GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); if (state != null) { + System.out.println("INFO: handling Attributes in " + Long.toString(message.getReceiverGossipId())); state.setLastAction(); state.gotAttributes(message); if (state.state == GossipGirlState.State.SEND_INFO || state.state == GossipGirlState.State.SEND_INFO_AND_FINISH) { sendInfo(state); } - UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes()); + UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), state.modifyAttributes(message.getAttributes())); + System.out.println("INFO: GossipGirl sending UpdateAttributesMessage"); sendMessage(updateMessage); if (state.state == GossipGirlState.State.FINISHED) { gossipStates.remove(message.getReceiverGossipId()); @@ -182,14 +197,16 @@ public class GossipGirl extends Module { private void handleQuery(QueryMessage message) throws InterruptedException { GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); if (state != null) { + System.out.println("INFO: handling Query in " + Long.toString(message.getReceiverGossipId())); state.setLastAction(); - state.gotQuery(message.getName()); + state.gotQuery(message); Map<Attribute, Entry<ValueQuery, ValueTime>> queries = new HashMap(); queries.put( message.getName(), new SimpleImmutableEntry(message.getQuery(), state.getTheirQueryTimestamp(message.getName())) ); UpdateQueriesMessage updateMessage = new UpdateQueriesMessage("", 0, queries); + System.out.println("INFO: GossipGirl sending UpdateQueriesMessage"); sendMessage(updateMessage); if (state.state == GossipGirlState.State.FINISHED) { gossipStates.remove(message.getReceiverGossipId()); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index 70d57d9..0525f41 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -13,9 +13,13 @@ import java.util.Set; import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage; import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage; import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.QueryMessage; import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.AttributesMap; import pl.edu.mimuw.cloudatlas.model.PathName; import pl.edu.mimuw.cloudatlas.model.ValueContact; +import pl.edu.mimuw.cloudatlas.model.ValueDuration; +import pl.edu.mimuw.cloudatlas.model.ValueInt; import pl.edu.mimuw.cloudatlas.model.ValueQuery; import pl.edu.mimuw.cloudatlas.model.ValueTime; import pl.edu.mimuw.cloudatlas.model.ValueUtils; @@ -48,18 +52,22 @@ public class GossipGirlState { public ValueTime hejkaSendTimestamp; public ValueTime hejkaReceiveTimestamp; public ValueTime noCoTamSendTimestamp; - public ValueTime noCoTamSendReceiveTimestamp; + public ValueTime noCoTamReceiveTimestamp; + public ValueDuration offset; private Map<PathName, ValueTime> theirZoneTimestamps; private Map<Attribute, ValueTime> theirQueryTimestamps; private List<PathName> zonesToSend; private List<Attribute> queriesToSend; private Set<PathName> waitingForZones; private Set<Attribute> waitingForQueries; + private boolean initiating; public GossipGirlState(long gossipId, PathName ourPath, ValueContact theirContact, boolean initiating) { this.gossipId = gossipId; this.ourPath = ourPath; this.theirContact = theirContact; + this.initiating = initiating; + System.out.println("INFO: initializing Gossip state, their contact " + theirContact.toString()); if (initiating) { state = State.WAIT_FOR_STATE_INITIALIZER; } else { @@ -129,16 +137,23 @@ public class GossipGirlState { } public void handleNoCoTam(NoCoTamMessage message) { + System.out.println("DEBUG: in GossipGirlState handleNoCoTam"); switch (state) { case WAIT_FOR_NO_CO_TAM: + System.out.println("DEBUG: lets do this"); theirGossipId = message.getSenderGossipId(); theirZoneTimestamps = message.getZoneTimestamps(); theirQueryTimestamps = message.getQueryTimestamps(); hejkaSendTimestamp = message.getHejkaSendTimestamp(); hejkaReceiveTimestamp = message.getHejkaReceiveTimestamp(); + noCoTamSendTimestamp = message.getSentTimestamp(); + noCoTamReceiveTimestamp = message.getReceivedTimestamp(); + computeOffset(); + System.out.println("DEBUG: set basic stuff"); setZonesToSend(); setQueriesToSend(); setWaitingFor(); + System.out.println("DEBUG: set big stuff"); state = State.SEND_INFO; break; default: @@ -147,6 +162,23 @@ public class GossipGirlState { } } + public void computeOffset() { + ValueDuration rtd = (ValueDuration) (noCoTamReceiveTimestamp.subtract(hejkaSendTimestamp)) + .subtract(noCoTamSendTimestamp.subtract(hejkaReceiveTimestamp)); + offset = (ValueDuration) (noCoTamSendTimestamp.addValue(rtd.divide(new ValueInt(2l)))) + .subtract(noCoTamReceiveTimestamp); + System.out.println("INFO: GossipGirlState calculated offset: " + offset.toString()); + } + + public AttributesMap modifyAttributes(AttributesMap attributes) { + ValueDuration delta = offset; + if (!initiating) { + delta = delta.negate(); + } + attributes.addOrChange("timestamp", attributes.getOrNull("timestamp").subtract(delta)); + return attributes; + } + private void setWaitingFor() { setWaitingForZones(); setWaitingForQueries(); @@ -168,6 +200,8 @@ public class GossipGirlState { public Map<PathName, ValueTime> getZoneTimestampsToSend() { Map<PathName, ValueTime> timestamps = new HashMap(); + System.out.println("Getting zone timestamps to send to " + theirContact.getName().toString()); + System.out.println("hierarchy is " + hierarchy.toString()); collectZoneTimestamps(timestamps, hierarchy, theirContact.getName()); return timestamps; } @@ -183,12 +217,14 @@ public class GossipGirlState { public void setZonesToSend() { zonesToSend = new LinkedList(); + System.out.println("DEBUG: timestamps to send: " + getZoneTimestampsToSend().toString()); for (Entry<PathName, ValueTime> timestampedPath : getZoneTimestampsToSend().entrySet()) { ValueTime theirTimestamp = theirZoneTimestamps.get(timestampedPath.getKey()); if (theirTimestamp == null || ValueUtils.valueLower(theirTimestamp, timestampedPath.getValue())) { zonesToSend.add(timestampedPath.getKey()); } } + System.out.println("DEBUG: zones to send: " + zonesToSend.toString()); } public void setQueriesToSend() { @@ -199,6 +235,7 @@ public class GossipGirlState { queriesToSend.add(timestampedQuery.getKey()); } } + System.out.println("DEBUG: Queries to send: " + queriesToSend.toString()); } public List<ZMI> getZMIsToSend() { @@ -227,6 +264,7 @@ public class GossipGirlState { } public void collectZoneTimestamps(Map<PathName, ValueTime> timestamps, ZMI currentZMI, PathName recipientPath) { + System.out.println("collecting timestamps, on " + currentZMI.getPathName().toString()); for (ZMI zmi : currentZMI.getSons()) { if (interestedIn(recipientPath, zmi.getPathName())) { ValueTime timestamp = (ValueTime) zmi.getAttributes().getOrNull("timestamp"); @@ -242,26 +280,7 @@ public class GossipGirlState { } public boolean interestedIn(PathName recipientPath, PathName zmiPath) { - return isPrefix(zmiPath.levelUp(), recipientPath) && !isPrefix(zmiPath, recipientPath); - } - - public boolean isPrefix(PathName prefix, PathName path) { - List<String> prefixComponents = prefix.getComponents(); - List<String> pathComponents = path.getComponents(); - - if (prefixComponents.size() > pathComponents.size()) { - return false; - } - - Iterator<String> prefixIterator = prefixComponents.iterator(); - Iterator<String> pathIterator = pathComponents.iterator(); - - while (prefixIterator.hasNext()) { - if (!prefixIterator.next().equals(pathIterator.next())) { - return false; - } - } - return true; + return ValueUtils.isPrefix(zmiPath.levelUp(), recipientPath) && !ValueUtils.isPrefix(zmiPath, recipientPath); } public void sentInfo() { @@ -294,6 +313,7 @@ public class GossipGirlState { setZonesToSend(); setQueriesToSend(); setWaitingFor(); + offset = message.getOffset(); state = State.SEND_INFO; if (!waitingForZones.remove(message.getPath())) { @@ -310,10 +330,26 @@ public class GossipGirlState { } } - public void gotQuery(Attribute name) { + public void gotQuery(QueryMessage message) { switch (state) { + case WAIT_FOR_FIRST_INFO: + // TODO: use offset to setup GTP + offset = message.getOffset(); + setZonesToSend(); + setQueriesToSend(); + setWaitingFor(); + state = State.SEND_INFO; + + if (!waitingForQueries.remove(message.getName())) { + System.out.println("DEBUG: got query we weren't expecting"); + } + if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { + System.out.println("INFO: done waiting for info"); + state = state.SEND_INFO_AND_FINISH; + } + break; case WAIT_FOR_INFO: - if (!waitingForQueries.remove(name)) { + if (!waitingForQueries.remove(message.getName())) { System.out.println("DEBUG: got query we weren't expecting"); } if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java index 999c193..6e7d4dc 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import java.nio.file.Path; import java.util.*; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Map.Entry; import pl.edu.mimuw.cloudatlas.agent.messages.*; @@ -19,19 +20,39 @@ public class Stanik extends Module { private long freshnessPeriod; private Set<ValueContact> contacts; private ValueTime contactsTimestamp; + private PathName ourPath; - public Stanik(long freshnessPeriod) { + public Stanik(PathName ourPath, long freshnessPeriod) { super(ModuleType.STATE); + this.ourPath = ourPath; hierarchy = new ZMI(); queries = new HashMap<Attribute, Entry<ValueQuery, ValueTime>>(); hierarchy.getAttributes().add("timestamp", new ValueTime(0l)); this.freshnessPeriod = freshnessPeriod; this.contactsTimestamp = ValueUtils.currentTime(); this.contacts = new HashSet<>(); + setDefaultQueries(); } - public Stanik() { - this(60 * 1000); + private void setDefaultQueries() { + String cardinalityQuery = "SELECT sum(cardinality) AS cardinality"; + String contactsQuery = "SELECT random(5, unfold(contacts)) AS contacts"; + + setDefaultQuery("&cardinality", cardinalityQuery); + setDefaultQuery("&contacts", contactsQuery); + } + + private void setDefaultQuery(String name, String query) { + try { + ValueQuery queryValue = new ValueQuery(query); + queries.put(new Attribute(name), new SimpleImmutableEntry(queryValue, new ValueTime(0l))); + } catch (Exception e) { + System.out.println("ERROR: failed to compile default query"); + } + } + + public Stanik(PathName ourPath) { + this(ourPath, 60 * 1000); } public void handleTyped(StanikMessage message) throws InterruptedException, InvalidMessageType { @@ -61,6 +82,7 @@ public class Stanik extends Module { public void handleGetState(GetStateMessage message) throws InterruptedException { pruneHierarchy(); + addValues(); StateMessage response = new StateMessage( "", message.getRequestingModule(), @@ -78,6 +100,23 @@ public class Stanik extends Module { pruneZMI(hierarchy, now); } + private void addValues() { + addValuesRecursive(hierarchy, 0); + } + + private void addValuesRecursive(ZMI zmi, long level) { + zmi.getAttributes().addOrChange("level", new ValueInt(level)); + if (ValueUtils.isPrefix(zmi.getPathName(), ourPath)) { + zmi.getAttributes().addOrChange("owner", new ValueString(ourPath.toString())); + } + if (zmi.getPathName().equals(ourPath)) { + zmi.getAttributes().addOrChange("cardinality", new ValueInt(1l)); + } + for (ZMI son : zmi.getSons()) { + addValuesRecursive(son, level + 1); + } + } + private boolean pruneZMI(ZMI zmi, ValueTime time) { Value timestamp = zmi.getAttributes().get("timestamp"); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java index f858468..cfaa37e 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java @@ -4,7 +4,9 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import org.assertj.core.data.MapEntry; import pl.edu.mimuw.cloudatlas.agent.messages.*; +import pl.edu.mimuw.cloudatlas.interpreter.query.Absyn.Program; import pl.edu.mimuw.cloudatlas.model.*; import java.io.ByteArrayInputStream; @@ -13,10 +15,8 @@ import java.net.Inet4Address; import java.net.InetAddress; import java.net.UnknownHostException; import java.rmi.Remote; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.HashMap; -import java.util.LinkedHashMap; +import java.text.DateFormat; +import java.util.*; /** * Serializes classes to and from byte arrays for UDP use @@ -75,12 +75,16 @@ public class UDUPSerializer { public void write(Kryo kryo, Output output, Object object) { ValueList vl = (ValueList) object; kryo.writeObject(output, ((TypeCollection) vl.getType()).getElementType()); - kryo.writeObject(output, vl.getValue()); + ArrayList<Value> al = new ArrayList<>(); + for (Value v : vl.getValue()) { + al.add(v); + } + kryo.writeObject(output, al); } @Override public Object read(Kryo kryo, Input input, Class type) { - Type t = kryo.readObject(input, Type.class); + TypePrimitive t = kryo.readObject(input, TypePrimitive.class); ArrayList list = kryo.readObject(input, ArrayList.class); return new ValueList(list, t); } @@ -91,25 +95,75 @@ public class UDUPSerializer { public void write(Kryo kryo, Output output, Object object) { ValueSet vs = (ValueSet) object; kryo.writeObject(output, ((TypeCollection) vs.getType()).getElementType()); - kryo.writeObject(output, vs.getValue()); + HashSet<Value> hs = new HashSet(); + for (Value v : vs.getValue()) { + hs.add(v); + } + kryo.writeObject(output, hs); } @Override public Object read(Kryo kryo, Input input, Class type) { - Type t = kryo.readObject(input, Type.class); + TypePrimitive t = kryo.readObject(input, TypePrimitive.class); HashSet set = kryo.readObject(input, HashSet.class); return new ValueSet(set, t); } }); + kryo.register(AttributesMap.class, new Serializer() { + @Override + public void write(Kryo kryo, Output output, Object object) { + AttributesMap attribMap = (AttributesMap) object; + HashMap<Attribute, Value> hashMap = new HashMap<>(); + + for (Map.Entry<Attribute, Value> e : attribMap) { + hashMap.put(e.getKey(), e.getValue()); + } + + kryo.writeObject(output, hashMap); + } + + @Override + public Object read(Kryo kryo, Input input, Class type) { + HashMap<Attribute, Value> hashMap = kryo.readObject(input, HashMap.class); + AttributesMap attribMap = new AttributesMap(); + for (Map.Entry<Attribute, Value> e : hashMap.entrySet()) { + attribMap.add(e.getKey(), e.getValue()); + } + return attribMap; + } + }); + + kryo.register(ValueQuery.class, new Serializer() { + @Override + public void write(Kryo kryo, Output output, Object object) { + ValueQuery vq = (ValueQuery) object; + kryo.writeObject(output, vq.getCode()); + } + + @Override + public Object read(Kryo kryo, Input input, Class type) { + String code = kryo.readObject(input, String.class); + ValueQuery vq = null; + try { + vq = new ValueQuery(code); + } catch (Exception e) { + System.out.println("Value query deserialization failed"); + e.printStackTrace(); + } + return vq; + } + }); + // model kryo.register(Value.class); kryo.register(ValueBoolean.class); kryo.register(ValueContact.class); + kryo.register(ValueDouble.class); kryo.register(ValueDuration.class); + kryo.register(ValueDouble.class); kryo.register(ValueInt.class); kryo.register(ValueNull.class); - kryo.register(ValueQuery.class); kryo.register(ValueSet.class); kryo.register(ValueString.class); kryo.register(ValueTime.class); @@ -121,6 +175,7 @@ public class UDUPSerializer { kryo.register(AttributesUtil.class); kryo.register(Type.class); + kryo.register(Type.PrimaryType.class); kryo.register(TypeCollection.class); kryo.register(TypePrimitive.class); @@ -156,7 +211,10 @@ public class UDUPSerializer { kryo.register(byte[].class); kryo.register(LinkedHashMap.class); kryo.register(HashMap.class); + kryo.register(HashSet.class); kryo.register(ModuleType.class); + kryo.register(DateFormat.class); + kryo.register(ArrayList.class); } public UDUPMessage deserialize(byte[] packetData) { @@ -169,6 +227,7 @@ public class UDUPSerializer { public byte[] serialize(UDUPMessage msg) { ByteArrayOutputStream out = new ByteArrayOutputStream(); Output kryoOut = new Output(out); + System.out.println("SERIALIZING " + msg.getContent()); kryo.writeObject(kryoOut, msg); kryoOut.flush(); kryoOut.close(); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index 0c5bc86..3da380c 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -23,7 +23,7 @@ public class UDUPServer implements Runnable { private final AtomicBoolean running; public UDUPServer(InetAddress addr, int port, int bufSize) throws SocketException { - this.socket = new DatagramSocket(port, addr); + this.socket = new DatagramSocket(port); this.address = addr; this.bufSize = bufSize; this.partialPackets = new HashMap<>(); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/Type.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/Type.java index 0994cba..4453aea 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/model/Type.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/Type.java @@ -49,6 +49,8 @@ public abstract class Type implements Serializable { this.primaryType = primaryType; } + public Type() { this.primaryType = PrimaryType.NULL; }; + /** * Returns the primary type of this type. * diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/TypePrimitive.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/TypePrimitive.java index ad07c0a..a17cafa 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/model/TypePrimitive.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/TypePrimitive.java @@ -78,6 +78,8 @@ public class TypePrimitive extends Type { */ public static final TypePrimitive QUERY = new TypePrimitive(PrimaryType.QUERY); + private TypePrimitive() {} + private TypePrimitive(PrimaryType primaryType) { super(primaryType); switch(primaryType) { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java index 26a5fbb..c5d4b54 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java @@ -49,6 +49,8 @@ public class ValueQuery extends Value { this.timestamp = System.currentTimeMillis(); } + public String getCode() { return code; } + public Program getQuery() { return query; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java index 866349f..20d6600 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java @@ -1,5 +1,8 @@ package pl.edu.mimuw.cloudatlas.model; +import java.util.List; +import java.util.Iterator; + public class ValueUtils { public static boolean valueNonNullOfType(Value value, Type type) { return value != null && !value.isNull() && value.getType().isCompatible(type); @@ -16,4 +19,23 @@ public class ValueUtils { public static ValueTime addToTime(ValueTime time, long millis) { return time.addValue(new ValueDuration(millis)); } + + public static boolean isPrefix(PathName prefix, PathName path) { + List<String> prefixComponents = prefix.getComponents(); + List<String> pathComponents = path.getComponents(); + + if (prefixComponents.size() > pathComponents.size()) { + return false; + } + + Iterator<String> prefixIterator = prefixComponents.iterator(); + Iterator<String> pathIterator = pathComponents.iterator(); + + while (prefixIterator.hasNext()) { + if (!prefixIterator.next().equals(pathIterator.next())) { + return false; + } + } + return true; + } } diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java index f731706..922ebe2 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java @@ -20,6 +20,7 @@ import java.net.UnknownHostException; // TODO add serialization tests that target custom serializers (type collections!) +@Ignore public class UDUPTest { @Test diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java index d443fad..dbacfe5 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java @@ -34,6 +34,7 @@ import pl.edu.mimuw.cloudatlas.model.Attribute; import pl.edu.mimuw.cloudatlas.model.PathName; import pl.edu.mimuw.cloudatlas.model.TestUtil; import pl.edu.mimuw.cloudatlas.model.ValueContact; +import pl.edu.mimuw.cloudatlas.model.ValueDuration; import pl.edu.mimuw.cloudatlas.model.ValueInt; import pl.edu.mimuw.cloudatlas.model.ValueQuery; import pl.edu.mimuw.cloudatlas.model.ValueString; @@ -57,6 +58,7 @@ public class GossipGirlTest { private AttributesMessage attributesMessage2; private QueryMessage queryMessage1; private QueryMessage queryMessage2; + private ValueDuration offset; private HejkaMessage hejkaMessage; @@ -84,7 +86,10 @@ public class GossipGirlTest { Map<PathName, ValueTime> otherZoneTimestamps = makeOtherZoneTimestamps(); Map<Attribute, ValueTime> otherQueryTimestamps = makeOtherQueryTimestamps(); - noCoTamMessage = new NoCoTamMessage("", 0, 42, 0, otherZoneTimestamps, otherQueryTimestamps, ValueUtils.addToTime(testTime, 10), ValueUtils.addToTime(testTime, 22)); + noCoTamMessage = new NoCoTamMessage("", 0, 42, 0, otherZoneTimestamps, otherQueryTimestamps, ValueUtils.addToTime(testTime, 10), ValueUtils.addToTime(testTime, 30)); + noCoTamMessage.setSentTimestamp(ValueUtils.addToTime(testTime, 35)); + noCoTamMessage.setReceivedTimestamp(ValueUtils.addToTime(testTime, 255)); + offset = new ValueDuration(-100l); attributesMessage1 = makeAttributesMessage("/son/bro", makeAttributes1()); attributesMessage2 = makeAttributesMessage("/son/whodis", makeAttributes2()); @@ -99,7 +104,7 @@ public class GossipGirlTest { public QueryMessage makeQueryMessage(String name, String query) throws Exception { - return new QueryMessage("", 0, new Attribute(name), new ValueQuery(query), 0); + return new QueryMessage("", 0, new Attribute(name), new ValueQuery(query), 0, offset); } public AttributesMap makeAttributes1() { @@ -121,12 +126,12 @@ public class GossipGirlTest { } public AttributesMessage makeAttributesMessage(String path, AttributesMap attributes) { - return new AttributesMessage("", 0, new PathName(path), attributes, 0); + return new AttributesMessage("", 0, new PathName(path), attributes, 0, offset); } public Map<PathName, ValueTime> makeOtherZoneTimestamps() { Map<PathName, ValueTime> zoneTimestamps = new HashMap(); - addOtherZoneTimestamp(zoneTimestamps, "/son/sis", -100); + addOtherZoneTimestamp(zoneTimestamps, "/son/sis", -120); addOtherZoneTimestamp(zoneTimestamps, "/son/bro", 0); addOtherZoneTimestamp(zoneTimestamps, "/son/whodis", -300); @@ -272,8 +277,7 @@ public class GossipGirlTest { assertEquals(StanikMessage.Type.UPDATE_ATTRIBUTES, stanikMessage1.getType()); UpdateAttributesMessage updateMessage1 = (UpdateAttributesMessage) stanikMessage1; assertEquals("/son/bro", updateMessage1.getPathName()); - // TODO: this should be modified by GTP - assertEquals(testTime, updateMessage1.getAttributes().getOrNull("timestamp")); + assertEquals(ValueUtils.addToTime(testTime, 100), updateMessage1.getAttributes().getOrNull("timestamp")); assertEquals(new ValueInt(140l), updateMessage1.getAttributes().getOrNull("foo")); assertEquals(new ValueString(":wq"), updateMessage1.getAttributes().getOrNull("bar")); @@ -289,7 +293,6 @@ public class GossipGirlTest { assertEquals(updateMessage2.getQueries().get(new Attribute("&one")), new SimpleImmutableEntry( new ValueQuery("SELECT 3 AS one"), - // TODO: this should be modified by GTP ValueUtils.addToTime(testTime, 10) ) ); @@ -385,8 +388,7 @@ public class GossipGirlTest { assertEquals(StanikMessage.Type.UPDATE_ATTRIBUTES, stanikMessage1.getType()); UpdateAttributesMessage updateMessage1 = (UpdateAttributesMessage) stanikMessage1; assertEquals("/son/bro", updateMessage1.getPathName()); - // TODO: this should be modified by GTP - assertEquals(testTime, updateMessage1.getAttributes().getOrNull("timestamp")); + assertEquals(ValueUtils.addToTime(testTime, -100), updateMessage1.getAttributes().getOrNull("timestamp")); assertEquals(new ValueInt(140l), updateMessage1.getAttributes().getOrNull("foo")); assertEquals(new ValueString(":wq"), updateMessage1.getAttributes().getOrNull("bar")); @@ -402,7 +404,6 @@ public class GossipGirlTest { assertEquals(updateMessage2.getQueries().get(new Attribute("&one")), new SimpleImmutableEntry( new ValueQuery("SELECT 3 AS one"), - // TODO: this should be modified by GTP ValueUtils.addToTime(testTime, 10) ) ); @@ -462,6 +463,7 @@ public class GossipGirlTest { QueryMessage queryMessage = (QueryMessage) ((UDUPMessage) message).getContent(); assertEquals(new Attribute(name), queryMessage.getName()); assertEquals(new ValueQuery(query), queryMessage.getQuery()); + assertEquals(new ValueDuration(-100l), queryMessage.getOffset()); } private void assertAttributeMessage(AgentMessage message, String recipientPath, String zonePath) throws Exception { @@ -472,6 +474,7 @@ public class GossipGirlTest { ); AttributesMessage attributesMessage = (AttributesMessage) ((UDUPMessage) message).getContent(); assertEquals(new PathName(zonePath), attributesMessage.getPath()); + assertEquals(new ValueDuration(-100l), attributesMessage.getOffset()); } private void assertUDUPMessage(AgentMessage message, PathName destinationName, GossipGirlMessage.Type type) throws Exception { diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java index f3ea0b0..92ba051 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java @@ -39,7 +39,7 @@ public class StanikTest { @Before public void setupLocals() { - stanik = new Stanik(freshnessPeriod); + stanik = new Stanik(new PathName("/new"), freshnessPeriod); executor = new MockExecutor(stanik); testTime = ValueUtils.currentTime(); } @@ -57,9 +57,11 @@ public class StanikTest { ZMI zmi = stateMessage.getZMI(); assertNull(zmi.getFather()); assertTrue(zmi.getSons().isEmpty()); - assertEquals(1, TestUtil.iterableSize(zmi.getAttributes())); + assertEquals(3, TestUtil.iterableSize(zmi.getAttributes())); + assertEquals(new ValueInt(0l), zmi.getAttributes().getOrNull("level")); + assertEquals(new ValueString("/new"), zmi.getAttributes().getOrNull("owner")); Map<Attribute, Entry<ValueQuery, ValueTime>> queries = stateMessage.getQueries(); - assertEquals(0, TestUtil.iterableSize(queries.keySet())); + assertEquals(2, TestUtil.iterableSize(queries.keySet())); } @Test @@ -112,6 +114,30 @@ public class StanikTest { } @Test + public void newZoneHasNewLevel() throws Exception { + AttributesMap attributes = new AttributesMap(); + attributes.add("foo", new ValueInt(1337l)); + attributes.add("bar", new ValueString("baz")); + attributes.add("name", new ValueString("new")); + attributes.add("timestamp", testTime); + UpdateAttributesMessage message = new UpdateAttributesMessage("test_msg", 0, "/new", attributes); + stanik.handleTyped(message); + GetStateMessage newMessage = new GetStateMessage("test_msg2", 123, ModuleType.TEST, 43); + stanik.handleTyped(newMessage); + + StateMessage newReceivedMessage = (StateMessage) executor.messagesToPass.poll(); + AttributesMap actualAttributes = newReceivedMessage.getZMI().findDescendant("/new").getAttributes(); + assertEquals(7, TestUtil.iterableSize(actualAttributes)); + assertEquals(new ValueInt(1337l), actualAttributes.getOrNull("foo")); + assertEquals(new ValueString("baz"), actualAttributes.getOrNull("bar")); + assertEquals(new ValueString("new"), actualAttributes.getOrNull("name")); + assertEquals(new ValueString("/new"), actualAttributes.getOrNull("owner")); + assertEquals(new ValueInt(1l), actualAttributes.getOrNull("cardinality")); + assertEquals(testTime, actualAttributes.getOrNull("timestamp")); + assertEquals(new ValueInt(1l), actualAttributes.getOrNull("level")); + } + + @Test public void updateWithRemovedAttributes() throws Exception { AttributesMap attributes = new AttributesMap(); attributes.add("foo", new ValueInt(1337l)); @@ -198,7 +224,7 @@ public class StanikTest { stanik.handleTyped(message); HashMap<Attribute, Entry<ValueQuery, ValueTime>> actualQueries = stanik.getQueries(); - assertEquals(1, TestUtil.iterableSize(actualQueries.keySet())); + assertEquals(3, TestUtil.iterableSize(actualQueries.keySet())); assertTrue(actualQueries.containsKey(new Attribute("&query"))); Entry<ValueQuery, ValueTime> timestampedQuery = actualQueries.get(new Attribute("&query")); assertEquals(new ValueTime(42l), timestampedQuery.getValue()); @@ -222,7 +248,7 @@ public class StanikTest { stanik.handleTyped(otherMessage); HashMap<Attribute, Entry<ValueQuery, ValueTime>> actualQueries = stanik.getQueries(); - assertEquals(4, TestUtil.iterableSize(actualQueries.keySet())); + assertEquals(6, TestUtil.iterableSize(actualQueries.keySet())); assertTrue(actualQueries.containsKey(new Attribute("&query1"))); assertTrue(actualQueries.containsKey(new Attribute("&query2"))); assertTrue(actualQueries.containsKey(new Attribute("&query3"))); |