From add8225ec2dbef80ed0ea429805b246966ba0092 Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sat, 11 Jan 2020 22:23:57 +0100 Subject: Add logs and stuff --- .../java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java | 10 ++++++++++ .../pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java | 1 + .../java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java | 2 +- 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index dd8f0b7..fb721dc 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -36,6 +36,7 @@ public class GossipGirl extends Module { public void handleTyped(GossipGirlMessage message) throws InterruptedException, InvalidMessageType { switch(message.getType()) { + System.out.println("INFO: got GossipGirlMessage " + message.getType()); case INITIATE: initiateGossip((InitiateGossipMessage) message); break; @@ -59,6 +60,7 @@ public class GossipGirl extends Module { public void handleTyped(ResponseMessage message) throws InterruptedException, InvalidMessageType { switch(message.getType()) { case STATE: + System.out.println("INFO: GossipGirl got a StateMessage"); setState((StateMessage) message); break; default: @@ -72,6 +74,7 @@ public class GossipGirl extends Module { gossipStates.put(gossipId, new GossipGirlState(gossipId, message.getOurPath(), message.getTheirContact(), true)); GetStateMessage getState = new GetStateMessage("", 0, ModuleType.GOSSIP, gossipId); + System.out.println("INFO: GossipGirl sending GetStateMessage when initiating"); sendMessage(getState); } @@ -89,6 +92,7 @@ public class GossipGirl extends Module { gossipStates.get(gossipId).handleHejka(message); GetStateMessage getState = new GetStateMessage("", 0, ModuleType.GOSSIP, gossipId); + System.out.println("INFO: GossipGirl sending GetStateMessage when responding"); sendMessage(getState); } @@ -107,6 +111,7 @@ public class GossipGirl extends Module { state.getQueryTimestampsToSend() ); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, hejka); + System.out.println("INFO: GossipGirl sending HejkaMessage"); sendMessage(udupMessage); state.sentHejka(); } else if (state.state == GossipGirlState.State.SEND_NO_CO_TAM) { @@ -121,6 +126,7 @@ public class GossipGirl extends Module { state.hejkaReceiveTimestamp ); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, noCoTam); + System.out.println("INFO: GossipGirl sending NoCoTamMessage"); sendMessage(udupMessage); state.sentNoCoTam(); } @@ -143,12 +149,14 @@ public class GossipGirl extends Module { for (ZMI zmi : state.getZMIsToSend()) { AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage); + System.out.println("INFO: GossipGirl sending AttributesMessage"); sendMessage(udupMessage); } for (Entry query : state.getQueriesToSend()) { QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage); + System.out.println("INFO: GossipGirl sending QueryMessage"); sendMessage(udupMessage); } state.sentInfo(); @@ -162,6 +170,7 @@ public class GossipGirl extends Module { sendInfo(state); } UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes()); + System.out.println("INFO: GossipGirl sending UpdateAttributesMessage"); sendMessage(updateMessage); if (state.state == GossipGirlState.State.FINISHED) { gossipStates.remove(message.getReceiverGossipId()); @@ -181,6 +190,7 @@ public class GossipGirl extends Module { new SimpleImmutableEntry(message.getQuery(), state.getTheirQueryTimestamp(message.getName())) ); UpdateQueriesMessage updateMessage = new UpdateQueriesMessage("", 0, queries); + System.out.println("INFO: GossipGirl sending UpdateQueriesMessage"); sendMessage(updateMessage); if (state.state == GossipGirlState.State.FINISHED) { gossipStates.remove(message.getReceiverGossipId()); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index eafbcca..81cb9f2 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -59,6 +59,7 @@ public class GossipGirlState { this.gossipId = gossipId; this.ourPath = ourPath; this.theirContact = theirContact; + System.out.println("INFO: initializing Gossip state, their contact " + theirContact.toString()); if (initiating) { state = State.WAIT_FOR_STATE_INITIALIZER; } else { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index 0c5bc86..3da380c 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -23,7 +23,7 @@ public class UDUPServer implements Runnable { private final AtomicBoolean running; public UDUPServer(InetAddress addr, int port, int bufSize) throws SocketException { - this.socket = new DatagramSocket(port, addr); + this.socket = new DatagramSocket(port); this.address = addr; this.bufSize = bufSize; this.partialPackets = new HashMap<>(); -- cgit v1.2.3 From 48ea2d21e3f8f138e1b440fae694679198efd45c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Sat, 11 Jan 2020 22:25:10 +0100 Subject: Fix bad line --- src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index fb721dc..02b69c8 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -35,8 +35,8 @@ public class GossipGirl extends Module { } public void handleTyped(GossipGirlMessage message) throws InterruptedException, InvalidMessageType { + System.out.println("INFO: got GossipGirlMessage " + message.getType()); switch(message.getType()) { - System.out.println("INFO: got GossipGirlMessage " + message.getType()); case INITIATE: initiateGossip((InitiateGossipMessage) message); break; -- cgit v1.2.3 From 715869151e0551b0a99b1175f68cabe25df392c7 Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sat, 11 Jan 2020 22:37:49 +0100 Subject: Register ValueDouble with kryo --- src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java index f858468..efa3dae 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java @@ -106,6 +106,7 @@ public class UDUPSerializer { kryo.register(Value.class); kryo.register(ValueBoolean.class); kryo.register(ValueContact.class); + kryo.register(ValueDouble.class); kryo.register(ValueDuration.class); kryo.register(ValueInt.class); kryo.register(ValueNull.class); -- cgit v1.2.3 From 8fc15a98e6a674d5f902382afa668cbafd0a88c4 Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sat, 11 Jan 2020 22:41:35 +0100 Subject: Add thing to kryo --- src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java index efa3dae..20cb98a 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java @@ -122,6 +122,7 @@ public class UDUPSerializer { kryo.register(AttributesUtil.class); kryo.register(Type.class); + kryo.register(Type.PrimaryType.class); kryo.register(TypeCollection.class); kryo.register(TypePrimitive.class); -- cgit v1.2.3 From 1b01c0c2daa289ee99bf14edaeb6b7351aa6b891 Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sun, 12 Jan 2020 12:04:59 +0100 Subject: Add more logs --- .../java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java | 7 +++++++ .../pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java | 10 ++++++++++ 2 files changed, 17 insertions(+) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index 02b69c8..0ec9d6c 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -99,6 +99,7 @@ public class GossipGirl extends Module { private void setState(StateMessage message) throws InterruptedException { GossipGirlState state = gossipStates.get(message.getRequestId()); if (state != null) { + System.out.println("INFO: setting state in gossip " + Long.toString(message.getRequestId())); state.setState(message.getZMI(), message.getQueries()); if (state.state == GossipGirlState.State.SEND_HEJKA) { HejkaMessage hejka = new HejkaMessage( @@ -138,14 +139,18 @@ public class GossipGirl extends Module { private void handleNoCoTam(NoCoTamMessage message) throws InterruptedException { GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); if (state != null) { + System.out.println("INFO: handling NoCoTamMessage in" + Long.toString(message.getReceiverGossipId())); state.handleNoCoTam(message); + System.out.println("DEBUG: handled NoCoTam in GossipGirlState"); sendInfo(state); + System.out.println("DEBUG: sent info after NoCoTam"); } else { System.out.println("ERROR: GossipGirl got state for a nonexistent gossip"); } } private void sendInfo(GossipGirlState state) throws InterruptedException { + System.out.println("DEBUG: about to send info"); for (ZMI zmi : state.getZMIsToSend()) { AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage); @@ -165,6 +170,7 @@ public class GossipGirl extends Module { private void handleAttributes(AttributesMessage message) throws InterruptedException { GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); if (state != null) { + System.out.println("INFO: handling Attributes in " + Long.toString(message.getReceiverGossipId())); state.gotAttributes(message); if (state.state == GossipGirlState.State.SEND_INFO) { sendInfo(state); @@ -183,6 +189,7 @@ public class GossipGirl extends Module { private void handleQuery(QueryMessage message) throws InterruptedException { GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); if (state != null) { + System.out.println("INFO: handling Query in " + Long.toString(message.getReceiverGossipId())); state.gotQuery(message.getName()); Map> queries = new HashMap(); queries.put( diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index 81cb9f2..6ee7474 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -124,16 +124,20 @@ public class GossipGirlState { } public void handleNoCoTam(NoCoTamMessage message) { + System.out.println("DEBUG: in GossipGirlState handleNoCoTam"); switch (state) { case WAIT_FOR_NO_CO_TAM: + System.out.println("DEBUG: lets do this"); theirGossipId = message.getSenderGossipId(); theirZoneTimestamps = message.getZoneTimestamps(); theirQueryTimestamps = message.getQueryTimestamps(); hejkaSendTimestamp = message.getHejkaSendTimestamp(); hejkaReceiveTimestamp = message.getHejkaReceiveTimestamp(); + System.out.println("DEBUG: set basic stuff"); setZonesToSend(); setQueriesToSend(); setWaitingFor(); + System.out.println("DEBUG: set big stuff"); state = State.SEND_INFO; break; default: @@ -163,6 +167,8 @@ public class GossipGirlState { public Map getZoneTimestampsToSend() { Map timestamps = new HashMap(); + System.out.println("Getting zone timestamps to send to " + theirContact.getName().toString()); + System.out.println("hierarchy is " + hierarchy.toString()); collectZoneTimestamps(timestamps, hierarchy, theirContact.getName()); return timestamps; } @@ -178,12 +184,14 @@ public class GossipGirlState { public void setZonesToSend() { zonesToSend = new LinkedList(); + System.out.println("DEBUG: timestamps to send: " + getZoneTimestampsToSend().toString()); for (Entry timestampedPath : getZoneTimestampsToSend().entrySet()) { ValueTime theirTimestamp = theirZoneTimestamps.get(timestampedPath.getKey()); if (theirTimestamp == null || ValueUtils.valueLower(theirTimestamp, timestampedPath.getValue())) { zonesToSend.add(timestampedPath.getKey()); } } + System.out.println("DEBUG: zones to send: " + zonesToSend.toString()); } public void setQueriesToSend() { @@ -194,6 +202,7 @@ public class GossipGirlState { queriesToSend.add(timestampedQuery.getKey()); } } + System.out.println("DEBUG: Queries to send: " + queriesToSend.toString()); } public List getZMIsToSend() { @@ -222,6 +231,7 @@ public class GossipGirlState { } public void collectZoneTimestamps(Map timestamps, ZMI currentZMI, PathName recipientPath) { + System.out.println("collecting timestamps, on " + currentZMI.getPathName().toString()); for (ZMI zmi : currentZMI.getSons()) { if (interestedIn(recipientPath, zmi.getPathName())) { ValueTime timestamp = (ValueTime) zmi.getAttributes().getOrNull("timestamp"); -- cgit v1.2.3 From 9c6a9bb5a8ce6d3282d1fd669214911c302e9aca Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sun, 12 Jan 2020 14:39:29 +0100 Subject: Remove event bus log --- src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java index 0e972f4..3d3d279 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java @@ -49,7 +49,6 @@ public class EventBus implements Runnable { } public void routeMessage(AgentMessage msg) throws InterruptedException { - System.out.println("Event bus routing message"); executors.get(msg.getDestinationModule()).addMessage(msg); } -- cgit v1.2.3 From e01887358628321b423dbbc5be5bc5d7e2504d99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Sun, 12 Jan 2020 16:38:03 +0100 Subject: Fix type collection serialization --- .../cloudatlas/agent/modules/UDUPSerializer.java | 53 ++++++++++++++++++---- .../java/pl/edu/mimuw/cloudatlas/model/Type.java | 2 + .../edu/mimuw/cloudatlas/model/TypePrimitive.java | 2 + 3 files changed, 49 insertions(+), 8 deletions(-) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java index f858468..d0dc03f 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java @@ -4,6 +4,7 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import org.assertj.core.data.MapEntry; import pl.edu.mimuw.cloudatlas.agent.messages.*; import pl.edu.mimuw.cloudatlas.model.*; @@ -13,10 +14,8 @@ import java.net.Inet4Address; import java.net.InetAddress; import java.net.UnknownHostException; import java.rmi.Remote; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.HashMap; -import java.util.LinkedHashMap; +import java.text.DateFormat; +import java.util.*; /** * Serializes classes to and from byte arrays for UDP use @@ -75,12 +74,16 @@ public class UDUPSerializer { public void write(Kryo kryo, Output output, Object object) { ValueList vl = (ValueList) object; kryo.writeObject(output, ((TypeCollection) vl.getType()).getElementType()); - kryo.writeObject(output, vl.getValue()); + ArrayList al = new ArrayList<>(); + for (Value v : vl.getValue()) { + al.add(v); + } + kryo.writeObject(output, al); } @Override public Object read(Kryo kryo, Input input, Class type) { - Type t = kryo.readObject(input, Type.class); + TypePrimitive t = kryo.readObject(input, TypePrimitive.class); ArrayList list = kryo.readObject(input, ArrayList.class); return new ValueList(list, t); } @@ -91,22 +94,51 @@ public class UDUPSerializer { public void write(Kryo kryo, Output output, Object object) { ValueSet vs = (ValueSet) object; kryo.writeObject(output, ((TypeCollection) vs.getType()).getElementType()); - kryo.writeObject(output, vs.getValue()); + HashSet hs = new HashSet(); + for (Value v : vs.getValue()) { + hs.add(v); + } + kryo.writeObject(output, hs); } @Override public Object read(Kryo kryo, Input input, Class type) { - Type t = kryo.readObject(input, Type.class); + TypePrimitive t = kryo.readObject(input, TypePrimitive.class); HashSet set = kryo.readObject(input, HashSet.class); return new ValueSet(set, t); } }); + kryo.register(AttributesMap.class, new Serializer() { + @Override + public void write(Kryo kryo, Output output, Object object) { + AttributesMap attribMap = (AttributesMap) object; + HashMap hashMap = new HashMap<>(); + + for (Map.Entry e : attribMap) { + hashMap.put(e.getKey(), e.getValue()); + } + + kryo.writeObject(output, hashMap); + } + + @Override + public Object read(Kryo kryo, Input input, Class type) { + HashMap hashMap = kryo.readObject(input, HashMap.class); + AttributesMap attribMap = new AttributesMap(); + for (Map.Entry e : hashMap.entrySet()) { + attribMap.add(e.getKey(), e.getValue()); + } + return attribMap; + } + }); + // model kryo.register(Value.class); kryo.register(ValueBoolean.class); kryo.register(ValueContact.class); kryo.register(ValueDuration.class); + kryo.register(ValueDouble.class); kryo.register(ValueInt.class); kryo.register(ValueNull.class); kryo.register(ValueQuery.class); @@ -121,6 +153,7 @@ public class UDUPSerializer { kryo.register(AttributesUtil.class); kryo.register(Type.class); + kryo.register(Type.PrimaryType.class); kryo.register(TypeCollection.class); kryo.register(TypePrimitive.class); @@ -156,7 +189,10 @@ public class UDUPSerializer { kryo.register(byte[].class); kryo.register(LinkedHashMap.class); kryo.register(HashMap.class); + kryo.register(HashSet.class); kryo.register(ModuleType.class); + kryo.register(DateFormat.class); + kryo.register(ArrayList.class); } public UDUPMessage deserialize(byte[] packetData) { @@ -169,6 +205,7 @@ public class UDUPSerializer { public byte[] serialize(UDUPMessage msg) { ByteArrayOutputStream out = new ByteArrayOutputStream(); Output kryoOut = new Output(out); + System.out.println("SERIALIZING " + msg.getContent()); kryo.writeObject(kryoOut, msg); kryoOut.flush(); kryoOut.close(); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/Type.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/Type.java index 0994cba..4453aea 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/model/Type.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/Type.java @@ -49,6 +49,8 @@ public abstract class Type implements Serializable { this.primaryType = primaryType; } + public Type() { this.primaryType = PrimaryType.NULL; }; + /** * Returns the primary type of this type. * diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/TypePrimitive.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/TypePrimitive.java index ad07c0a..a17cafa 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/model/TypePrimitive.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/TypePrimitive.java @@ -78,6 +78,8 @@ public class TypePrimitive extends Type { */ public static final TypePrimitive QUERY = new TypePrimitive(PrimaryType.QUERY); + private TypePrimitive() {} + private TypePrimitive(PrimaryType primaryType) { super(primaryType); switch(primaryType) { -- cgit v1.2.3 From 091c5f1c6f52f46734b3faf7c08ea95551e3ab74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Sun, 12 Jan 2020 16:54:51 +0100 Subject: Add query serialization --- .../cloudatlas/agent/modules/UDUPSerializer.java | 23 +++++++++++++++++++++- .../pl/edu/mimuw/cloudatlas/model/ValueQuery.java | 2 ++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java index d0dc03f..3b7e633 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java @@ -6,6 +6,7 @@ import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import org.assertj.core.data.MapEntry; import pl.edu.mimuw.cloudatlas.agent.messages.*; +import pl.edu.mimuw.cloudatlas.interpreter.query.Absyn.Program; import pl.edu.mimuw.cloudatlas.model.*; import java.io.ByteArrayInputStream; @@ -133,6 +134,27 @@ public class UDUPSerializer { } }); + kryo.register(ValueQuery.class, new Serializer() { + @Override + public void write(Kryo kryo, Output output, Object object) { + ValueQuery vq = (ValueQuery) object; + kryo.writeObject(output, vq.getCode()); + } + + @Override + public Object read(Kryo kryo, Input input, Class type) { + String code = kryo.readObject(input, String.class); + ValueQuery vq = null; + try { + vq = new ValueQuery(code); + } catch (Exception e) { + System.out.println("Value query deserialization failed"); + e.printStackTrace(); + } + return vq; + } + }); + // model kryo.register(Value.class); kryo.register(ValueBoolean.class); @@ -141,7 +163,6 @@ public class UDUPSerializer { kryo.register(ValueDouble.class); kryo.register(ValueInt.class); kryo.register(ValueNull.class); - kryo.register(ValueQuery.class); kryo.register(ValueSet.class); kryo.register(ValueString.class); kryo.register(ValueTime.class); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java index 82e1602..6d233ea 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java @@ -32,6 +32,8 @@ public class ValueQuery extends Value { this.query = null; } + public String getCode() { return code; } + public Program getQuery() { return query; } -- cgit v1.2.3 From 16c2b56ed2c0039484d754e984bffc91b6f5f552 Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sun, 12 Jan 2020 17:10:44 +0100 Subject: Ignore UDUP tests --- src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java index f731706..922ebe2 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java @@ -20,6 +20,7 @@ import java.net.UnknownHostException; // TODO add serialization tests that target custom serializers (type collections!) +@Ignore public class UDUPTest { @Test -- cgit v1.2.3 From 84b686eb2e4e2eccde13f7cee1987b4211660729 Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sun, 12 Jan 2020 21:00:58 +0100 Subject: Use GTP time protocol for gossiping --- .../agent/messages/AttributesMessage.java | 9 +++- .../cloudatlas/agent/messages/QueryMessage.java | 9 +++- .../mimuw/cloudatlas/agent/modules/GossipGirl.java | 8 ++-- .../cloudatlas/agent/modules/GossipGirlState.java | 50 ++++++++++++++++++++-- .../cloudatlas/agent/modules/GossipGirlTest.java | 23 +++++----- 5 files changed, 80 insertions(+), 19 deletions(-) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java index e4e3cb7..e3bd390 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java @@ -3,18 +3,21 @@ package pl.edu.mimuw.cloudatlas.agent.messages; import java.util.Map; import pl.edu.mimuw.cloudatlas.model.AttributesMap; +import pl.edu.mimuw.cloudatlas.model.ValueDuration; import pl.edu.mimuw.cloudatlas.model.PathName; public class AttributesMessage extends RemoteGossipGirlMessage { private PathName path; private AttributesMap attributes; private long receiverGossipId; + private ValueDuration offset; - public AttributesMessage(String messageId, long timestamp, PathName path, AttributesMap attributes, long receiverGossipId) { + public AttributesMessage(String messageId, long timestamp, PathName path, AttributesMap attributes, long receiverGossipId, ValueDuration offset) { super(messageId, timestamp, Type.ATTRIBUTES); this.path = path; this.attributes = attributes; this.receiverGossipId = receiverGossipId; + this.offset = offset; } private AttributesMessage() {} @@ -30,4 +33,8 @@ public class AttributesMessage extends RemoteGossipGirlMessage { public long getReceiverGossipId() { return receiverGossipId; } + + public ValueDuration getOffset() { + return offset; + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java index e457c21..77a2068 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java @@ -1,18 +1,21 @@ package pl.edu.mimuw.cloudatlas.agent.messages; import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.ValueDuration; import pl.edu.mimuw.cloudatlas.model.ValueQuery; public class QueryMessage extends RemoteGossipGirlMessage { private Attribute name; private ValueQuery query; private long receiverGossipId; + private ValueDuration offset; - public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId) { + public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId, ValueDuration offset) { super(messageId, timestamp, Type.QUERY); this.name = name; this.query = query; this.receiverGossipId = receiverGossipId; + this.offset = offset; } public QueryMessage() {} @@ -28,4 +31,8 @@ public class QueryMessage extends RemoteGossipGirlMessage { public long getReceiverGossipId() { return receiverGossipId; } + + public ValueDuration getOffset() { + return offset; + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index 440df33..5199e82 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -159,14 +159,14 @@ public class GossipGirl extends Module { private void sendInfo(GossipGirlState state) throws InterruptedException { System.out.println("DEBUG: about to send info"); for (ZMI zmi : state.getZMIsToSend()) { - AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId); + AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId, state.offset); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage); System.out.println("INFO: GossipGirl sending AttributesMessage"); sendMessage(udupMessage); } for (Entry query : state.getQueriesToSend()) { - QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId); + QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId, state.offset); UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage); System.out.println("INFO: GossipGirl sending QueryMessage"); sendMessage(udupMessage); @@ -183,7 +183,7 @@ public class GossipGirl extends Module { if (state.state == GossipGirlState.State.SEND_INFO || state.state == GossipGirlState.State.SEND_INFO_AND_FINISH) { sendInfo(state); } - UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes()); + UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), state.modifyAttributes(message.getAttributes())); System.out.println("INFO: GossipGirl sending UpdateAttributesMessage"); sendMessage(updateMessage); if (state.state == GossipGirlState.State.FINISHED) { @@ -199,7 +199,7 @@ public class GossipGirl extends Module { if (state != null) { System.out.println("INFO: handling Query in " + Long.toString(message.getReceiverGossipId())); state.setLastAction(); - state.gotQuery(message.getName()); + state.gotQuery(message); Map> queries = new HashMap(); queries.put( message.getName(), diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index e9bc02a..251d8b3 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -13,9 +13,13 @@ import java.util.Set; import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage; import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage; import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.QueryMessage; import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.AttributesMap; import pl.edu.mimuw.cloudatlas.model.PathName; import pl.edu.mimuw.cloudatlas.model.ValueContact; +import pl.edu.mimuw.cloudatlas.model.ValueDuration; +import pl.edu.mimuw.cloudatlas.model.ValueInt; import pl.edu.mimuw.cloudatlas.model.ValueQuery; import pl.edu.mimuw.cloudatlas.model.ValueTime; import pl.edu.mimuw.cloudatlas.model.ValueUtils; @@ -48,18 +52,21 @@ public class GossipGirlState { public ValueTime hejkaSendTimestamp; public ValueTime hejkaReceiveTimestamp; public ValueTime noCoTamSendTimestamp; - public ValueTime noCoTamSendReceiveTimestamp; + public ValueTime noCoTamReceiveTimestamp; + public ValueDuration offset; private Map theirZoneTimestamps; private Map theirQueryTimestamps; private List zonesToSend; private List queriesToSend; private Set waitingForZones; private Set waitingForQueries; + private boolean initiating; public GossipGirlState(long gossipId, PathName ourPath, ValueContact theirContact, boolean initiating) { this.gossipId = gossipId; this.ourPath = ourPath; this.theirContact = theirContact; + this.initiating = initiating; System.out.println("INFO: initializing Gossip state, their contact " + theirContact.toString()); if (initiating) { state = State.WAIT_FOR_STATE_INITIALIZER; @@ -139,6 +146,9 @@ public class GossipGirlState { theirQueryTimestamps = message.getQueryTimestamps(); hejkaSendTimestamp = message.getHejkaSendTimestamp(); hejkaReceiveTimestamp = message.getHejkaReceiveTimestamp(); + noCoTamSendTimestamp = message.getSentTimestamp(); + noCoTamReceiveTimestamp = message.getReceivedTimestamp(); + computeOffset(); System.out.println("DEBUG: set basic stuff"); setZonesToSend(); setQueriesToSend(); @@ -152,6 +162,23 @@ public class GossipGirlState { } } + public void computeOffset() { + ValueDuration rtd = (ValueDuration) (noCoTamReceiveTimestamp.subtract(hejkaSendTimestamp)) + .subtract(noCoTamSendTimestamp.subtract(hejkaReceiveTimestamp)); + offset = (ValueDuration) (noCoTamSendTimestamp.addValue(rtd.divide(new ValueInt(2l)))) + .subtract(noCoTamReceiveTimestamp); + System.out.println("INFO: GossipGirlState calculated offset: " + offset.toString()); + } + + public AttributesMap modifyAttributes(AttributesMap attributes) { + ValueDuration delta = offset; + if (!initiating) { + delta = delta.negate(); + } + attributes.addOrChange("timestamp", attributes.getOrNull("timestamp").subtract(delta)); + return attributes; + } + private void setWaitingFor() { setWaitingForZones(); setWaitingForQueries(); @@ -305,6 +332,7 @@ public class GossipGirlState { setZonesToSend(); setQueriesToSend(); setWaitingFor(); + offset = message.getOffset(); state = State.SEND_INFO; if (!waitingForZones.remove(message.getPath())) { @@ -321,10 +349,26 @@ public class GossipGirlState { } } - public void gotQuery(Attribute name) { + public void gotQuery(QueryMessage message) { switch (state) { + case WAIT_FOR_FIRST_INFO: + // TODO: use offset to setup GTP + offset = message.getOffset(); + setZonesToSend(); + setQueriesToSend(); + setWaitingFor(); + state = State.SEND_INFO; + + if (!waitingForQueries.remove(message.getName())) { + System.out.println("DEBUG: got query we weren't expecting"); + } + if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { + System.out.println("INFO: done waiting for info"); + state = state.SEND_INFO_AND_FINISH; + } + break; case WAIT_FOR_INFO: - if (!waitingForQueries.remove(name)) { + if (!waitingForQueries.remove(message.getName())) { System.out.println("DEBUG: got query we weren't expecting"); } if (waitingForZones.isEmpty() && waitingForQueries.isEmpty()) { diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java index d443fad..dbacfe5 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java @@ -34,6 +34,7 @@ import pl.edu.mimuw.cloudatlas.model.Attribute; import pl.edu.mimuw.cloudatlas.model.PathName; import pl.edu.mimuw.cloudatlas.model.TestUtil; import pl.edu.mimuw.cloudatlas.model.ValueContact; +import pl.edu.mimuw.cloudatlas.model.ValueDuration; import pl.edu.mimuw.cloudatlas.model.ValueInt; import pl.edu.mimuw.cloudatlas.model.ValueQuery; import pl.edu.mimuw.cloudatlas.model.ValueString; @@ -57,6 +58,7 @@ public class GossipGirlTest { private AttributesMessage attributesMessage2; private QueryMessage queryMessage1; private QueryMessage queryMessage2; + private ValueDuration offset; private HejkaMessage hejkaMessage; @@ -84,7 +86,10 @@ public class GossipGirlTest { Map otherZoneTimestamps = makeOtherZoneTimestamps(); Map otherQueryTimestamps = makeOtherQueryTimestamps(); - noCoTamMessage = new NoCoTamMessage("", 0, 42, 0, otherZoneTimestamps, otherQueryTimestamps, ValueUtils.addToTime(testTime, 10), ValueUtils.addToTime(testTime, 22)); + noCoTamMessage = new NoCoTamMessage("", 0, 42, 0, otherZoneTimestamps, otherQueryTimestamps, ValueUtils.addToTime(testTime, 10), ValueUtils.addToTime(testTime, 30)); + noCoTamMessage.setSentTimestamp(ValueUtils.addToTime(testTime, 35)); + noCoTamMessage.setReceivedTimestamp(ValueUtils.addToTime(testTime, 255)); + offset = new ValueDuration(-100l); attributesMessage1 = makeAttributesMessage("/son/bro", makeAttributes1()); attributesMessage2 = makeAttributesMessage("/son/whodis", makeAttributes2()); @@ -99,7 +104,7 @@ public class GossipGirlTest { public QueryMessage makeQueryMessage(String name, String query) throws Exception { - return new QueryMessage("", 0, new Attribute(name), new ValueQuery(query), 0); + return new QueryMessage("", 0, new Attribute(name), new ValueQuery(query), 0, offset); } public AttributesMap makeAttributes1() { @@ -121,12 +126,12 @@ public class GossipGirlTest { } public AttributesMessage makeAttributesMessage(String path, AttributesMap attributes) { - return new AttributesMessage("", 0, new PathName(path), attributes, 0); + return new AttributesMessage("", 0, new PathName(path), attributes, 0, offset); } public Map makeOtherZoneTimestamps() { Map zoneTimestamps = new HashMap(); - addOtherZoneTimestamp(zoneTimestamps, "/son/sis", -100); + addOtherZoneTimestamp(zoneTimestamps, "/son/sis", -120); addOtherZoneTimestamp(zoneTimestamps, "/son/bro", 0); addOtherZoneTimestamp(zoneTimestamps, "/son/whodis", -300); @@ -272,8 +277,7 @@ public class GossipGirlTest { assertEquals(StanikMessage.Type.UPDATE_ATTRIBUTES, stanikMessage1.getType()); UpdateAttributesMessage updateMessage1 = (UpdateAttributesMessage) stanikMessage1; assertEquals("/son/bro", updateMessage1.getPathName()); - // TODO: this should be modified by GTP - assertEquals(testTime, updateMessage1.getAttributes().getOrNull("timestamp")); + assertEquals(ValueUtils.addToTime(testTime, 100), updateMessage1.getAttributes().getOrNull("timestamp")); assertEquals(new ValueInt(140l), updateMessage1.getAttributes().getOrNull("foo")); assertEquals(new ValueString(":wq"), updateMessage1.getAttributes().getOrNull("bar")); @@ -289,7 +293,6 @@ public class GossipGirlTest { assertEquals(updateMessage2.getQueries().get(new Attribute("&one")), new SimpleImmutableEntry( new ValueQuery("SELECT 3 AS one"), - // TODO: this should be modified by GTP ValueUtils.addToTime(testTime, 10) ) ); @@ -385,8 +388,7 @@ public class GossipGirlTest { assertEquals(StanikMessage.Type.UPDATE_ATTRIBUTES, stanikMessage1.getType()); UpdateAttributesMessage updateMessage1 = (UpdateAttributesMessage) stanikMessage1; assertEquals("/son/bro", updateMessage1.getPathName()); - // TODO: this should be modified by GTP - assertEquals(testTime, updateMessage1.getAttributes().getOrNull("timestamp")); + assertEquals(ValueUtils.addToTime(testTime, -100), updateMessage1.getAttributes().getOrNull("timestamp")); assertEquals(new ValueInt(140l), updateMessage1.getAttributes().getOrNull("foo")); assertEquals(new ValueString(":wq"), updateMessage1.getAttributes().getOrNull("bar")); @@ -402,7 +404,6 @@ public class GossipGirlTest { assertEquals(updateMessage2.getQueries().get(new Attribute("&one")), new SimpleImmutableEntry( new ValueQuery("SELECT 3 AS one"), - // TODO: this should be modified by GTP ValueUtils.addToTime(testTime, 10) ) ); @@ -462,6 +463,7 @@ public class GossipGirlTest { QueryMessage queryMessage = (QueryMessage) ((UDUPMessage) message).getContent(); assertEquals(new Attribute(name), queryMessage.getName()); assertEquals(new ValueQuery(query), queryMessage.getQuery()); + assertEquals(new ValueDuration(-100l), queryMessage.getOffset()); } private void assertAttributeMessage(AgentMessage message, String recipientPath, String zonePath) throws Exception { @@ -472,6 +474,7 @@ public class GossipGirlTest { ); AttributesMessage attributesMessage = (AttributesMessage) ((UDUPMessage) message).getContent(); assertEquals(new PathName(zonePath), attributesMessage.getPath()); + assertEquals(new ValueDuration(-100l), attributesMessage.getOffset()); } private void assertUDUPMessage(AgentMessage message, PathName destinationName, GossipGirlMessage.Type type) throws Exception { -- cgit v1.2.3 From 84df6c11a868e8cd98fa14455456d313e25234db Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sun, 12 Jan 2020 21:52:38 +0100 Subject: Set zone levels --- .../edu/mimuw/cloudatlas/agent/modules/Stanik.java | 12 +++++++++++ .../mimuw/cloudatlas/agent/modules/StanikTest.java | 25 +++++++++++++++++++++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java index 999c193..f572efe 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java @@ -61,6 +61,7 @@ public class Stanik extends Module { public void handleGetState(GetStateMessage message) throws InterruptedException { pruneHierarchy(); + addLevels(); StateMessage response = new StateMessage( "", message.getRequestingModule(), @@ -78,6 +79,17 @@ public class Stanik extends Module { pruneZMI(hierarchy, now); } + private void addLevels() { + addLevelsRecursive(hierarchy, 0); + } + + private void addLevelsRecursive(ZMI zmi, long level) { + zmi.getAttributes().addOrChange("level", new ValueInt(level)); + for (ZMI son : zmi.getSons()) { + addLevelsRecursive(son, level + 1); + } + } + private boolean pruneZMI(ZMI zmi, ValueTime time) { Value timestamp = zmi.getAttributes().get("timestamp"); diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java index f3ea0b0..03a89c6 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java @@ -57,7 +57,8 @@ public class StanikTest { ZMI zmi = stateMessage.getZMI(); assertNull(zmi.getFather()); assertTrue(zmi.getSons().isEmpty()); - assertEquals(1, TestUtil.iterableSize(zmi.getAttributes())); + assertEquals(2, TestUtil.iterableSize(zmi.getAttributes())); + assertEquals(new ValueInt(0l), zmi.getAttributes().getOrNull("level")); Map> queries = stateMessage.getQueries(); assertEquals(0, TestUtil.iterableSize(queries.keySet())); } @@ -111,6 +112,28 @@ public class StanikTest { assertEquals(testTime, actualAttributes.getOrNull("timestamp")); } + @Test + public void newZoneHasNewLevel() throws Exception { + AttributesMap attributes = new AttributesMap(); + attributes.add("foo", new ValueInt(1337l)); + attributes.add("bar", new ValueString("baz")); + attributes.add("name", new ValueString("new")); + attributes.add("timestamp", testTime); + UpdateAttributesMessage message = new UpdateAttributesMessage("test_msg", 0, "/new", attributes); + stanik.handleTyped(message); + GetStateMessage newMessage = new GetStateMessage("test_msg2", 123, ModuleType.TEST, 43); + stanik.handleTyped(newMessage); + + StateMessage newReceivedMessage = (StateMessage) executor.messagesToPass.poll(); + AttributesMap actualAttributes = newReceivedMessage.getZMI().findDescendant("/new").getAttributes(); + assertEquals(5, TestUtil.iterableSize(actualAttributes)); + assertEquals(new ValueInt(1337l), actualAttributes.getOrNull("foo")); + assertEquals(new ValueString("baz"), actualAttributes.getOrNull("bar")); + assertEquals(new ValueString("new"), actualAttributes.getOrNull("name")); + assertEquals(testTime, actualAttributes.getOrNull("timestamp")); + assertEquals(new ValueInt(1l), actualAttributes.getOrNull("level")); + } + @Test public void updateWithRemovedAttributes() throws Exception { AttributesMap attributes = new AttributesMap(); -- cgit v1.2.3 From 411bf8f0dae524d70a0754049b5494182ade4a05 Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sun, 12 Jan 2020 22:12:09 +0100 Subject: Set owner of ZMIs on our path --- .../pl/edu/mimuw/cloudatlas/agent/AgentConfig.java | 4 +++- .../cloudatlas/agent/modules/GossipGirlState.java | 21 +-------------------- .../edu/mimuw/cloudatlas/agent/modules/Stanik.java | 21 +++++++++++++-------- .../pl/edu/mimuw/cloudatlas/model/ValueUtils.java | 22 ++++++++++++++++++++++ .../mimuw/cloudatlas/agent/modules/StanikTest.java | 8 +++++--- 5 files changed, 44 insertions(+), 32 deletions(-) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java index 38d764a..e9bbf4e 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java @@ -3,6 +3,7 @@ package pl.edu.mimuw.cloudatlas.agent; import pl.edu.mimuw.cloudatlas.agent.modules.*; import pl.edu.mimuw.cloudatlas.agent.modules.Module; import pl.edu.mimuw.cloudatlas.api.Api; +import pl.edu.mimuw.cloudatlas.model.PathName; import java.net.InetAddress; import java.net.SocketException; @@ -43,11 +44,12 @@ public class AgentConfig { Integer timeout = Integer.getInteger("UDUPServer.timeout"); Integer bufsize = Integer.getInteger("UDUPServer.bufsize"); InetAddress serverAddr = InetAddress.getByName(System.getProperty("UDUPServer.hostname")); + String ourPath = System.getProperty("zone_path"); HashMap modules = new HashMap(); modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER)); modules.put(ModuleType.RMI, new Remik()); - modules.put(ModuleType.STATE, new Stanik(freshnessPeriod)); + modules.put(ModuleType.STATE, new Stanik(new PathName(ourPath), freshnessPeriod)); modules.put(ModuleType.QUERY, new Qurnik()); modules.put(ModuleType.GOSSIP, new GossipGirl()); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index 251d8b3..0525f41 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -280,26 +280,7 @@ public class GossipGirlState { } public boolean interestedIn(PathName recipientPath, PathName zmiPath) { - return isPrefix(zmiPath.levelUp(), recipientPath) && !isPrefix(zmiPath, recipientPath); - } - - public boolean isPrefix(PathName prefix, PathName path) { - List prefixComponents = prefix.getComponents(); - List pathComponents = path.getComponents(); - - if (prefixComponents.size() > pathComponents.size()) { - return false; - } - - Iterator prefixIterator = prefixComponents.iterator(); - Iterator pathIterator = pathComponents.iterator(); - - while (prefixIterator.hasNext()) { - if (!prefixIterator.next().equals(pathIterator.next())) { - return false; - } - } - return true; + return ValueUtils.isPrefix(zmiPath.levelUp(), recipientPath) && !ValueUtils.isPrefix(zmiPath, recipientPath); } public void sentInfo() { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java index f572efe..a428232 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java @@ -19,9 +19,11 @@ public class Stanik extends Module { private long freshnessPeriod; private Set contacts; private ValueTime contactsTimestamp; + private PathName ourPath; - public Stanik(long freshnessPeriod) { + public Stanik(PathName ourPath, long freshnessPeriod) { super(ModuleType.STATE); + this.ourPath = ourPath; hierarchy = new ZMI(); queries = new HashMap>(); hierarchy.getAttributes().add("timestamp", new ValueTime(0l)); @@ -30,8 +32,8 @@ public class Stanik extends Module { this.contacts = new HashSet<>(); } - public Stanik() { - this(60 * 1000); + public Stanik(PathName ourPath) { + this(ourPath, 60 * 1000); } public void handleTyped(StanikMessage message) throws InterruptedException, InvalidMessageType { @@ -61,7 +63,7 @@ public class Stanik extends Module { public void handleGetState(GetStateMessage message) throws InterruptedException { pruneHierarchy(); - addLevels(); + addValues(); StateMessage response = new StateMessage( "", message.getRequestingModule(), @@ -79,14 +81,17 @@ public class Stanik extends Module { pruneZMI(hierarchy, now); } - private void addLevels() { - addLevelsRecursive(hierarchy, 0); + private void addValues() { + addValuesRecursive(hierarchy, 0); } - private void addLevelsRecursive(ZMI zmi, long level) { + private void addValuesRecursive(ZMI zmi, long level) { zmi.getAttributes().addOrChange("level", new ValueInt(level)); + if (ValueUtils.isPrefix(zmi.getPathName(), ourPath)) { + zmi.getAttributes().addOrChange("owner", new ValueString(ourPath.toString())); + } for (ZMI son : zmi.getSons()) { - addLevelsRecursive(son, level + 1); + addValuesRecursive(son, level + 1); } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java index 866349f..20d6600 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java @@ -1,5 +1,8 @@ package pl.edu.mimuw.cloudatlas.model; +import java.util.List; +import java.util.Iterator; + public class ValueUtils { public static boolean valueNonNullOfType(Value value, Type type) { return value != null && !value.isNull() && value.getType().isCompatible(type); @@ -16,4 +19,23 @@ public class ValueUtils { public static ValueTime addToTime(ValueTime time, long millis) { return time.addValue(new ValueDuration(millis)); } + + public static boolean isPrefix(PathName prefix, PathName path) { + List prefixComponents = prefix.getComponents(); + List pathComponents = path.getComponents(); + + if (prefixComponents.size() > pathComponents.size()) { + return false; + } + + Iterator prefixIterator = prefixComponents.iterator(); + Iterator pathIterator = pathComponents.iterator(); + + while (prefixIterator.hasNext()) { + if (!prefixIterator.next().equals(pathIterator.next())) { + return false; + } + } + return true; + } } diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java index 03a89c6..d16d917 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java @@ -39,7 +39,7 @@ public class StanikTest { @Before public void setupLocals() { - stanik = new Stanik(freshnessPeriod); + stanik = new Stanik(new PathName("/new"), freshnessPeriod); executor = new MockExecutor(stanik); testTime = ValueUtils.currentTime(); } @@ -57,8 +57,9 @@ public class StanikTest { ZMI zmi = stateMessage.getZMI(); assertNull(zmi.getFather()); assertTrue(zmi.getSons().isEmpty()); - assertEquals(2, TestUtil.iterableSize(zmi.getAttributes())); + assertEquals(3, TestUtil.iterableSize(zmi.getAttributes())); assertEquals(new ValueInt(0l), zmi.getAttributes().getOrNull("level")); + assertEquals(new ValueString("/new"), zmi.getAttributes().getOrNull("owner")); Map> queries = stateMessage.getQueries(); assertEquals(0, TestUtil.iterableSize(queries.keySet())); } @@ -126,10 +127,11 @@ public class StanikTest { StateMessage newReceivedMessage = (StateMessage) executor.messagesToPass.poll(); AttributesMap actualAttributes = newReceivedMessage.getZMI().findDescendant("/new").getAttributes(); - assertEquals(5, TestUtil.iterableSize(actualAttributes)); + assertEquals(6, TestUtil.iterableSize(actualAttributes)); assertEquals(new ValueInt(1337l), actualAttributes.getOrNull("foo")); assertEquals(new ValueString("baz"), actualAttributes.getOrNull("bar")); assertEquals(new ValueString("new"), actualAttributes.getOrNull("name")); + assertEquals(new ValueString("/new"), actualAttributes.getOrNull("owner")); assertEquals(testTime, actualAttributes.getOrNull("timestamp")); assertEquals(new ValueInt(1l), actualAttributes.getOrNull("level")); } -- cgit v1.2.3 From cd247e8d64b2ef0aed7f0afdccd711008cd60fcd Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sun, 12 Jan 2020 22:25:09 +0100 Subject: Add default queries for contacts and cardinality --- .../edu/mimuw/cloudatlas/agent/modules/Stanik.java | 22 ++++++++++++++++++++++ .../mimuw/cloudatlas/agent/modules/StanikTest.java | 9 +++++---- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java index a428232..6e7d4dc 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import java.nio.file.Path; import java.util.*; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Map.Entry; import pl.edu.mimuw.cloudatlas.agent.messages.*; @@ -30,6 +31,24 @@ public class Stanik extends Module { this.freshnessPeriod = freshnessPeriod; this.contactsTimestamp = ValueUtils.currentTime(); this.contacts = new HashSet<>(); + setDefaultQueries(); + } + + private void setDefaultQueries() { + String cardinalityQuery = "SELECT sum(cardinality) AS cardinality"; + String contactsQuery = "SELECT random(5, unfold(contacts)) AS contacts"; + + setDefaultQuery("&cardinality", cardinalityQuery); + setDefaultQuery("&contacts", contactsQuery); + } + + private void setDefaultQuery(String name, String query) { + try { + ValueQuery queryValue = new ValueQuery(query); + queries.put(new Attribute(name), new SimpleImmutableEntry(queryValue, new ValueTime(0l))); + } catch (Exception e) { + System.out.println("ERROR: failed to compile default query"); + } } public Stanik(PathName ourPath) { @@ -90,6 +109,9 @@ public class Stanik extends Module { if (ValueUtils.isPrefix(zmi.getPathName(), ourPath)) { zmi.getAttributes().addOrChange("owner", new ValueString(ourPath.toString())); } + if (zmi.getPathName().equals(ourPath)) { + zmi.getAttributes().addOrChange("cardinality", new ValueInt(1l)); + } for (ZMI son : zmi.getSons()) { addValuesRecursive(son, level + 1); } diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java index d16d917..92ba051 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java @@ -61,7 +61,7 @@ public class StanikTest { assertEquals(new ValueInt(0l), zmi.getAttributes().getOrNull("level")); assertEquals(new ValueString("/new"), zmi.getAttributes().getOrNull("owner")); Map> queries = stateMessage.getQueries(); - assertEquals(0, TestUtil.iterableSize(queries.keySet())); + assertEquals(2, TestUtil.iterableSize(queries.keySet())); } @Test @@ -127,11 +127,12 @@ public class StanikTest { StateMessage newReceivedMessage = (StateMessage) executor.messagesToPass.poll(); AttributesMap actualAttributes = newReceivedMessage.getZMI().findDescendant("/new").getAttributes(); - assertEquals(6, TestUtil.iterableSize(actualAttributes)); + assertEquals(7, TestUtil.iterableSize(actualAttributes)); assertEquals(new ValueInt(1337l), actualAttributes.getOrNull("foo")); assertEquals(new ValueString("baz"), actualAttributes.getOrNull("bar")); assertEquals(new ValueString("new"), actualAttributes.getOrNull("name")); assertEquals(new ValueString("/new"), actualAttributes.getOrNull("owner")); + assertEquals(new ValueInt(1l), actualAttributes.getOrNull("cardinality")); assertEquals(testTime, actualAttributes.getOrNull("timestamp")); assertEquals(new ValueInt(1l), actualAttributes.getOrNull("level")); } @@ -223,7 +224,7 @@ public class StanikTest { stanik.handleTyped(message); HashMap> actualQueries = stanik.getQueries(); - assertEquals(1, TestUtil.iterableSize(actualQueries.keySet())); + assertEquals(3, TestUtil.iterableSize(actualQueries.keySet())); assertTrue(actualQueries.containsKey(new Attribute("&query"))); Entry timestampedQuery = actualQueries.get(new Attribute("&query")); assertEquals(new ValueTime(42l), timestampedQuery.getValue()); @@ -247,7 +248,7 @@ public class StanikTest { stanik.handleTyped(otherMessage); HashMap> actualQueries = stanik.getQueries(); - assertEquals(4, TestUtil.iterableSize(actualQueries.keySet())); + assertEquals(6, TestUtil.iterableSize(actualQueries.keySet())); assertTrue(actualQueries.containsKey(new Attribute("&query1"))); assertTrue(actualQueries.containsKey(new Attribute("&query2"))); assertTrue(actualQueries.containsKey(new Attribute("&query3"))); -- cgit v1.2.3