From 70a233029d805104845f20b9904e1cdb6feac921 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= <mag.grodzinska@gmail.com>
Date: Mon, 6 Jan 2020 18:51:49 +0100
Subject: Add noarg constructors to messages for serialization

---
 .../java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java     | 2 ++
 .../java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java  | 2 ++
 .../java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java    | 3 +++
 .../java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java     | 3 +++
 .../java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java | 2 ++
 .../pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java   | 2 ++
 .../java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java  | 2 ++
 .../pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java     | 2 ++
 .../pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java   | 2 ++
 .../java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java    | 2 ++
 .../java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java     | 2 ++
 .../pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java | 2 ++
 src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java | 4 +---
 .../edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java  | 2 ++
 .../pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java  | 2 ++
 15 files changed, 31 insertions(+), 3 deletions(-)

(limited to 'src/main/java/pl/edu/mimuw')

diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java
index f343e0f..c3fcb40 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java
@@ -20,6 +20,8 @@ public abstract class AgentMessage {
         this.timestamp = System.currentTimeMillis() / 1000L;
     }
 
+    public AgentMessage() {}
+
     public String getMessageId() {
         return messageId;
     }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java
index 63392e8..67be8e9 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java
@@ -12,6 +12,8 @@ public class GetStateMessage extends StanikMessage {
         this.requestId = requestId;
     }
 
+    public GetStateMessage() {}
+
     public ModuleType getRequestingModule() {
         return requestingModule;
     }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java
index 0161a37..97bef69 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java
@@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.messages;
 
 import pl.edu.mimuw.cloudatlas.agent.modules.Module;
 import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
+import pl.edu.mimuw.cloudatlas.agent.modules.Qurnik;
 
 public abstract class QurnikMessage extends AgentMessage {
     public enum Type {
@@ -15,6 +16,8 @@ public abstract class QurnikMessage extends AgentMessage {
         this.type = type;
     }
 
+    public QurnikMessage() {}
+
     public Type getType() {
         return type;
     }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java
index b0300cb..e4bc1b6 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java
@@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.messages;
 
 import pl.edu.mimuw.cloudatlas.agent.modules.Module;
 import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
+import pl.edu.mimuw.cloudatlas.agent.modules.Remik;
 
 public abstract class RemikMessage extends AgentMessage {
     public enum Type {
@@ -15,6 +16,8 @@ public abstract class RemikMessage extends AgentMessage {
         this.type = type;
     }
 
+    public RemikMessage() {}
+
     public Type getType() {
         return type;
     }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java
index 9330185..e7b03ba 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java
@@ -12,6 +12,8 @@ public class RemoveZMIMessage extends StanikMessage {
         this.removalTimestamp = removalTimestamp;
     }
 
+    public RemoveZMIMessage() {}
+
     public String getPathName() {
         return pathName;
     }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java
index 698aac7..a1fd279 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java
@@ -10,6 +10,8 @@ public class RequestStateMessage extends RemikMessage {
         this.responseFuture = responseFuture;
     }
 
+    public RequestStateMessage() {}
+
     public CompletableFuture<ResponseMessage> getFuture() {
         return responseFuture;
     }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java
index 02b3337..b28f4b9 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java
@@ -17,6 +17,8 @@ public abstract class ResponseMessage extends AgentMessage {
         this.requestId = requestId;
     }
 
+    public ResponseMessage() {}
+
     public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType {
         module.handleTyped(this);
     }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java
index 35f7819..03882a0 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java
@@ -6,4 +6,6 @@ public class RunQueriesMessage extends QurnikMessage {
     public RunQueriesMessage(String messageId, long timestamp) {
         super(messageId, timestamp, Type.RUN_QUERIES);
     }
+
+    public RunQueriesMessage() {}
 }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java
index 4888484..3de2b65 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java
@@ -19,6 +19,8 @@ public class SetAttributeMessage extends StanikMessage {
         this.updateTimestamp = updateTimestamp;
     }
 
+    public SetAttributeMessage() {}
+
     public String getPathName() {
         return pathName;
     }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java
index 844f31c..3daa5f9 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java
@@ -19,6 +19,8 @@ public abstract class StanikMessage extends AgentMessage {
         this.type = type;
     }
 
+    public StanikMessage() {}
+
     public Type getType() {
         return type;
     }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java
index f7f490b..759723b 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java
@@ -19,6 +19,8 @@ public class StateMessage extends ResponseMessage {
         this.queries = queries;
     }
 
+    public StateMessage() {}
+
     public ZMI getZMI() {
         return zmi;
     }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java
index 8566d67..b42e181 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java
@@ -23,6 +23,8 @@ public class TimerSchedulerMessage extends AgentMessage {
         this.task = task;
     }
 
+    public TimerSchedulerMessage() {}
+
     public long getDelay() {
         return delay;
     }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
index fa8d1fa..3751b3c 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
@@ -20,9 +20,7 @@ public class UDUPMessage extends AgentMessage {
         this.content = content;
     }
 
-    public UDUPMessage() {
-        super("", ModuleType.UDP);
-    }
+    public UDUPMessage() {}
 
     @Override
     public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java
index 7e41631..b943384 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java
@@ -13,6 +13,8 @@ public class UpdateAttributesMessage extends StanikMessage {
         this.attributes = attributes;
     }
 
+    public UpdateAttributesMessage() {}
+
     public String getPathName() {
         return pathName;
     }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java
index 58ad55a..4b0b9c8 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java
@@ -15,6 +15,8 @@ public class UpdateQueriesMessage extends StanikMessage {
         this.queries = queries;
     }
 
+    public UpdateQueriesMessage() {}
+
     public Map<Attribute, Entry<ValueQuery, ValueTime>> getQueries() {
         return queries;
     }
-- 
cgit v1.2.3


From 76b5ad38792c93cd530b5faf59c613e83a129d19 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= <mag.grodzinska@gmail.com>
Date: Mon, 6 Jan 2020 18:53:06 +0100
Subject: Extend kryo registration

---
 .../cloudatlas/agent/modules/UDUPSerializer.java   | 77 +++++++++++++++++++---
 1 file changed, 68 insertions(+), 9 deletions(-)

(limited to 'src/main/java/pl/edu/mimuw')

diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
index ac35265..79236ca 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
@@ -4,18 +4,15 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
-import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
-import pl.edu.mimuw.cloudatlas.model.PathName;
-import pl.edu.mimuw.cloudatlas.model.ValueContact;
+import pl.edu.mimuw.cloudatlas.agent.messages.*;
+import pl.edu.mimuw.cloudatlas.model.*;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.LinkedHashMap;
 
 /**
  * Serializes classes to and from byte arrays for UDP use
@@ -69,15 +66,77 @@ public class UDUPSerializer {
             }
         });
 
-        kryo.register(byte[].class);
+        kryo.register(ValueList.class, new Serializer() {
+            @Override
+            public void write(Kryo kryo, Output output, Object object) {
+
+            }
+
+            @Override
+            public Object read(Kryo kryo, Input input, Class type) {
+                return null;
+            }
+        });
+
+        kryo.register(ValueSet.class, new Serializer() {
+            @Override
+            public void write(Kryo kryo, Output output, Object object) {
+
+            }
+
+            @Override
+            public Object read(Kryo kryo, Input input, Class type) {
+                return null;
+            }
+        });
+
+        // model
+        kryo.register(Value.class);
+        kryo.register(ValueBoolean.class);
         kryo.register(ValueContact.class);
-        kryo.register(ModuleType.class);
+        kryo.register(ValueDuration.class);
+        kryo.register(ValueInt.class);
+        kryo.register(ValueNull.class);
+        kryo.register(ValueQuery.class);
+        kryo.register(ValueSet.class);
+        kryo.register(ValueString.class);
+        kryo.register(ValueTime.class);
+        kryo.register(ValueUtils.class);
+        kryo.register(ZMI.class);
+
+        kryo.register(Attribute.class);
+        kryo.register(AttributesMap.class);
+        kryo.register(AttributesUtil.class);
 
+        kryo.register(Type.class);
+        kryo.register(TypeCollection.class);
+        kryo.register(TypePrimitive.class);
+
+        // messages in chronological order so it's easier to keep track
         kryo.register(AgentMessage.class);
         kryo.register(GetStateMessage.class);
-        kryo.register(UDUPMessage.class);
+        kryo.register(QurnikMessage.class);
+        kryo.register(RemikMessage.class);
+        kryo.register(RemoveZMIMessage.class);
+        kryo.register(RequestStateMessage.class);
+        kryo.register(ResponseMessage.class);
+        kryo.register(RunQueriesMessage.class);
+        kryo.register(SetAttributeMessage.class);
         kryo.register(StanikMessage.Type.class);
         kryo.register(StanikMessage.class);
+        kryo.register(TimerSchedulerMessage.class);
+        kryo.register(UDUPMessage.class);
+        kryo.register(UpdateAttributesMessage.class);
+        kryo.register(UpdateQueriesMessage.class);
+
+        // modules
+        kryo.register(TimerScheduledTask.class);
+        kryo.register(RecursiveScheduledTask.class);
+
+        // other
+        kryo.register(byte[].class);
+        kryo.register(LinkedHashMap.class);
+        kryo.register(ModuleType.class);
     }
 
     public UDUPMessage deserialize(byte[] packetData) {
-- 
cgit v1.2.3


From 5f02fa0e59dc84e12fae1fde61bdfa8edb5446b1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= <mag.grodzinska@gmail.com>
Date: Mon, 6 Jan 2020 18:58:25 +0100
Subject: Fix handling of multipart udp messages

---
 src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

(limited to 'src/main/java/pl/edu/mimuw')

diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index 6807a86..fb79dc6 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -31,12 +31,11 @@ public class UDUPServer {
         System.out.println("UDP " + this.address + " received packet from " + packet.getAddress());
 
         if (packet.getOffset() == 0) {
-            UDUPMessage msg = this.serializer.deserialize(packet.getData());
-            System.out.println("UDP received message " + msg.getContent().getMessageId());
-
             if (packet.getLength() == this.bufSize) {
                 this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
             } else  {
+                UDUPMessage msg = this.serializer.deserialize(packet.getData());
+                System.out.println("UDP received message " + msg.getContent().getMessageId());
                 if (msg.getContent().getDestinationModule() == ModuleType.TEST) {
                     System.out.println("UDP server: test message received");
                 } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) {
-- 
cgit v1.2.3


From 28c905a7365a37d0874b865f513c64b31d8679f4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= <mag.grodzinska@gmail.com>
Date: Mon, 6 Jan 2020 20:10:50 +0100
Subject: Add custom serialization to type collection

---
 .../mimuw/cloudatlas/agent/modules/UDUPSerializer.java | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

(limited to 'src/main/java/pl/edu/mimuw')

diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
index 79236ca..40c4d7c 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
@@ -12,6 +12,8 @@ import java.io.ByteArrayOutputStream;
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 
 /**
@@ -69,24 +71,32 @@ public class UDUPSerializer {
         kryo.register(ValueList.class, new Serializer() {
             @Override
             public void write(Kryo kryo, Output output, Object object) {
-
+                ValueList vl = (ValueList) object;
+                kryo.writeObject(output, ((TypeCollection) vl.getType()).getElementType());
+                kryo.writeObject(output, vl.getValue());
             }
 
             @Override
             public Object read(Kryo kryo, Input input, Class type) {
-                return null;
+                Type t = kryo.readObject(input, Type.class);
+                ArrayList list = kryo.readObject(input, ArrayList.class);
+                return new ValueList(list, t);
             }
         });
 
         kryo.register(ValueSet.class, new Serializer() {
             @Override
             public void write(Kryo kryo, Output output, Object object) {
-
+                ValueSet vs = (ValueSet) object;
+                kryo.writeObject(output, ((TypeCollection) vs.getType()).getElementType());
+                kryo.writeObject(output, vs.getValue());
             }
 
             @Override
             public Object read(Kryo kryo, Input input, Class type) {
-                return null;
+                Type t = kryo.readObject(input, Type.class);
+                HashSet set = kryo.readObject(input, HashSet.class);
+                return new ValueSet(set, t);
             }
         });
 
-- 
cgit v1.2.3


From 2d7fe232b7c1f2ef62e7bf2f3100adb51e9bc0d4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= <mag.grodzinska@gmail.com>
Date: Mon, 6 Jan 2020 22:13:48 +0100
Subject: Add segmentation handling

---
 .../mimuw/cloudatlas/agent/modules/UDUPClient.java |  62 ++++++++++---
 .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 103 ++++++++++++++++-----
 2 files changed, 130 insertions(+), 35 deletions(-)

(limited to 'src/main/java/pl/edu/mimuw')

diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
index 82aaeb1..d7cbc9d 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
@@ -2,8 +2,10 @@ package pl.edu.mimuw.cloudatlas.agent.modules;
 
 import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
 
+import javax.xml.crypto.Data;
 import java.io.IOException;
 import java.net.*;
+import java.nio.ByteBuffer;
 
 public class UDUPClient {
     private UDUP udp;
@@ -11,6 +13,7 @@ public class UDUPClient {
     private int serverPort;
     private DatagramSocket socket;
     private int bufsize;
+    private int lastTransmission;
 
     UDUPClient(UDUP udp, int serverPort, int bufferSize) throws SocketException {
         this.udp = udp;
@@ -18,26 +21,57 @@ public class UDUPClient {
         this.socket = new DatagramSocket();
         this.bufsize = bufferSize;
         this.serializer = new UDUPSerializer();
+        this.lastTransmission = 0;
     }
 
-    public void sendMessage(UDUPMessage msg) throws IOException {
-        int offset = 0;
-        int outputSize;
+    private void logSending(DatagramPacket packet, int packetNo, byte[] buf) {
+        System.out.print("UDP sends packet no " + packetNo +
+                " with realbufsize " + (bufsize - 8) +
+                " out of data buffer with size " + buf.length +
+                " to " + packet.getAddress() + ": ");
+        for (byte b : packet.getData()) {
+            System.out.print(b);
+        }
+        System.out.println();
+    }
 
-        byte[] buf = this.serializer.serialize(msg);
-        outputSize = buf.length;
+    byte[] toByteArray(int val) {
+        return ByteBuffer.allocate(4).putInt(val).array();
+    }
+
+    private byte[] packSendBuffer(int transmissionNo, int packetNo, byte[] buf) {
+        byte[] sendBuf = new byte[bufsize];
+        int sendBufSize = bufsize - 8;
+        System.arraycopy(toByteArray(transmissionNo), 0, sendBuf, 0, 4);
+        System.arraycopy(toByteArray(packetNo), 0, sendBuf, 4, 4);
+        if (packetNo*sendBufSize >= buf.length) {
+            System.arraycopy(buf, 0, sendBuf, 8, sendBufSize);
+        } else {
+            System.arraycopy(buf, (packetNo-1)*sendBufSize, sendBuf, 8, sendBufSize);
+        }
+        return sendBuf;
+    }
+
+    boolean checkEndOfTransmission(int packetNo, int dataBufSize) {
+        int sendBufSize = bufsize - 8;
+        int dataSentSoFar = (packetNo - 1) * sendBufSize;
+        System.out.println("used data " + dataSentSoFar + " " + dataBufSize);
+        return dataSentSoFar >= dataBufSize;
+    }
+
+    public void sendMessage(UDUPMessage msg) throws IOException {
+        int packetNo = 1;
+        byte[] sendBuf;
+        byte[] dataBuf = this.serializer.serialize(msg);
+        this.lastTransmission++;
 
         do {
-            outputSize =- bufsize;
-            offset += bufsize;
-            DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort);
-            System.out.println("UDP sends message: ");
-            for (byte b : buf) {
-                System.out.print(b);
-            }
-            System.out.println("to " + packet.getAddress());
+            sendBuf = packSendBuffer(this.lastTransmission, packetNo, dataBuf);
+            DatagramPacket packet = new DatagramPacket(sendBuf, bufsize, msg.getContact().getAddress(), this.serverPort);
             this.socket.send(packet);
-        } while (outputSize > bufsize);
+            logSending(packet, packetNo, dataBuf);
+            packetNo++;
+        } while (!checkEndOfTransmission(packetNo, dataBuf.length));
     }
 
     void close() {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index fb79dc6..c7ceca2 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -3,16 +3,20 @@ package pl.edu.mimuw.cloudatlas.agent.modules;
 import com.google.common.primitives.Bytes;
 import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.NoSuchElementException;
 
 public class UDUPServer {
     private UDUP udp;
     private UDUPSerializer serializer;
     private DatagramSocket socket;
     private InetAddress address;
-    private HashMap<InetAddress, byte[]> partialPackets;
+    private HashMap<String, ArrayList<byte[]>> partialPackets;
     private int bufSize;
 
     public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException {
@@ -25,42 +29,99 @@ public class UDUPServer {
     }
 
     public void acceptMessage() throws IOException, InterruptedException {
-        byte[] buf = new byte[bufSize];
-        DatagramPacket packet = new DatagramPacket(buf, buf.length);
-        this.socket.receive(packet);
-        System.out.println("UDP " + this.address + " received packet from " + packet.getAddress());
+        DatagramPacket packet = receivePacket();
+        int transmissionNo = readTransmissionNo(packet.getData());
+        String transmissionID = packTransmissionID(transmissionNo, packet.getAddress());
+        int packetNo = readPacketNo(packet.getData());
+        byte[] packetData = trimPacketBuffer(packet.getData());
 
-        if (packet.getOffset() == 0) {
+        if (packetNo == 0) {
             if (packet.getLength() == this.bufSize) {
-                this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
+                this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData);
             } else  {
-                UDUPMessage msg = this.serializer.deserialize(packet.getData());
+                UDUPMessage msg = this.serializer.deserialize(packetData);
                 System.out.println("UDP received message " + msg.getContent().getMessageId());
-                if (msg.getContent().getDestinationModule() == ModuleType.TEST) {
-                    System.out.println("UDP server: test message received");
-                } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) {
-                    this.udp.sendMessage(msg.getContent());
-                }
+                sendMessageFurther(msg);
             }
         } else {
-            this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
+            this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData);
         }
     }
 
-    public void addPartialMessageAndCheckSerialization(InetAddress senderAddress, byte[] packetData) {
-        if (this.partialPackets.containsKey(senderAddress)) {
-            byte[] previousPacketData = this.partialPackets.get(senderAddress);
-            byte[] allPacketData = Bytes.concat(previousPacketData, packetData);
+    private DatagramPacket receivePacket() throws IOException {
+        byte[] buf = new byte[bufSize];
+        DatagramPacket packet = new DatagramPacket(buf, buf.length);
+        this.socket.receive(packet);
+        System.out.println("UDP " + this.address + " received packet from " + packet.getAddress());
+        return packet;
+    }
+
+    private void sendMessageFurther(UDUPMessage msg) throws InterruptedException {
+        if (msg.getContent().getDestinationModule() == ModuleType.TEST) {
+            System.out.println("UDP server: test message received");
+        } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) {
+            this.udp.sendMessage(msg.getContent());
+        }
+    }
+
+    private int readTransmissionNo(byte[] packetData) throws IOException {
+        ByteArrayInputStream in = new ByteArrayInputStream(packetData, 0, 4);
+        byte[] byteBuf = new byte[4];
+        in.read(byteBuf);
+        return ByteBuffer.wrap(byteBuf).getInt();
+    }
+
+    private int readPacketNo(byte[] packetData) throws IOException {
+        ByteArrayInputStream in = new ByteArrayInputStream(packetData,4, 4);
+        byte[] byteBuf = new byte[4];
+        in.read(byteBuf);
+        return ByteBuffer.wrap(byteBuf).getInt();
+    }
+
+    private byte[] trimPacketBuffer(byte[] packetData) {
+        int newPacketDataSize = packetData.length - 8;
+        byte[] newPacketData = new byte[newPacketDataSize];
+        System.arraycopy(packetData, 8, newPacketData, 0, newPacketDataSize);
+        return newPacketData;
+    }
+
+    private String packTransmissionID(int transmissionNo, InetAddress contactAddr) {
+        return contactAddr.getHostAddress() + ":" + transmissionNo;
+    }
+
+    private byte[] concatPacketData(String transmissionID, int newPacketNo, byte[] newPacketData) {
+        ArrayList<byte[]> previousPacketData = this.partialPackets.get(transmissionID);
+        byte[] fullData = new byte[0];
+
+        previousPacketData.add(newPacketNo, newPacketData);
+        this.partialPackets.put(transmissionID, previousPacketData);
+
+        if (previousPacketData.contains(null)) {
+            throw new NoSuchElementException("Packet is not full");
+        } else {
+            for (byte[] prevData : previousPacketData) {
+                fullData = Bytes.concat(fullData, prevData);
+            }
+        }
+
+        return fullData;
+    }
+
+    public void addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) {
+        if (this.partialPackets.containsKey(transmissionID)) {
             try {
+                byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData);
                 UDUPMessage msg = this.serializer.deserialize(allPacketData);
                 this.udp.sendMessage(msg.getContent());
-                this.partialPackets.remove(senderAddress);
+                this.partialPackets.remove(transmissionID);
+                System.out.println("Kryo put together whole transmission");
             } catch (Error | Exception e) {
                 System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest");
-                this.partialPackets.put(senderAddress, allPacketData);
             }
         } else {
-            this.partialPackets.put(senderAddress, packetData);
+            ArrayList<byte[]> newTransmission = new ArrayList<byte[]>();
+            newTransmission.add(packetData);
+            this.partialPackets.put(transmissionID, newTransmission);
         }
     }
 
-- 
cgit v1.2.3


From 90d3d2e3e1e116bbb288d78e9c6c996a7f1e0270 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= <mag.grodzinska@gmail.com>
Date: Fri, 10 Jan 2020 15:26:52 +0100
Subject: Fix segmentation

---
 .../pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java |  6 ++++++
 .../mimuw/cloudatlas/agent/modules/UDUPClient.java  | 19 ++++++++++++-------
 .../mimuw/cloudatlas/agent/modules/UDUPServer.java  | 21 +++++++++------------
 3 files changed, 27 insertions(+), 19 deletions(-)

(limited to 'src/main/java/pl/edu/mimuw')

diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
index e616c93..8c6db8f 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
@@ -26,6 +26,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
 // TODO set server port in global config - must be the same everywhere
 // TODO same with buffer size
 
+// TODO separate server like newapiimpl
+// TODO add timestamps as close to sending as possible
+
+// TODO wysylac tylko remotegossipgirl message
+// TODO update timestampow odpowiedni w tym remotegossipgirlmessage
+
 public class UDUP extends Module implements Runnable {
     private UDUPClient client;
     private UDUPServer server;
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
index d7cbc9d..2e4f0b4 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
@@ -26,7 +26,7 @@ public class UDUPClient {
 
     private void logSending(DatagramPacket packet, int packetNo, byte[] buf) {
         System.out.print("UDP sends packet no " + packetNo +
-                " with realbufsize " + (bufsize - 8) +
+                " with real bufsize " + (bufsize - 8) +
                 " out of data buffer with size " + buf.length +
                 " to " + packet.getAddress() + ": ");
         for (byte b : packet.getData()) {
@@ -40,22 +40,27 @@ public class UDUPClient {
     }
 
     private byte[] packSendBuffer(int transmissionNo, int packetNo, byte[] buf) {
-        byte[] sendBuf = new byte[bufsize];
+        byte[] sendBuf;
         int sendBufSize = bufsize - 8;
-        System.arraycopy(toByteArray(transmissionNo), 0, sendBuf, 0, 4);
-        System.arraycopy(toByteArray(packetNo), 0, sendBuf, 4, 4);
+
         if (packetNo*sendBufSize >= buf.length) {
-            System.arraycopy(buf, 0, sendBuf, 8, sendBufSize);
+            int copyLength = buf.length - (packetNo-1)*sendBufSize;
+            sendBuf = new byte[copyLength + 8];
+            System.arraycopy(buf, (packetNo-1)*sendBufSize, sendBuf, 8, copyLength);
         } else {
+            sendBuf = new byte[bufsize];
             System.arraycopy(buf, (packetNo-1)*sendBufSize, sendBuf, 8, sendBufSize);
         }
+
+        System.arraycopy(toByteArray(transmissionNo), 0, sendBuf, 0, 4);
+        System.arraycopy(toByteArray(packetNo), 0, sendBuf, 4, 4);
+
         return sendBuf;
     }
 
     boolean checkEndOfTransmission(int packetNo, int dataBufSize) {
         int sendBufSize = bufsize - 8;
         int dataSentSoFar = (packetNo - 1) * sendBufSize;
-        System.out.println("used data " + dataSentSoFar + " " + dataBufSize);
         return dataSentSoFar >= dataBufSize;
     }
 
@@ -67,7 +72,7 @@ public class UDUPClient {
 
         do {
             sendBuf = packSendBuffer(this.lastTransmission, packetNo, dataBuf);
-            DatagramPacket packet = new DatagramPacket(sendBuf, bufsize, msg.getContact().getAddress(), this.serverPort);
+            DatagramPacket packet = new DatagramPacket(sendBuf, 0, sendBuf.length, msg.getContact().getAddress(), this.serverPort);
             this.socket.send(packet);
             logSending(packet, packetNo, dataBuf);
             packetNo++;
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index c7ceca2..b71a475 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -35,15 +35,12 @@ public class UDUPServer {
         int packetNo = readPacketNo(packet.getData());
         byte[] packetData = trimPacketBuffer(packet.getData());
 
-        if (packetNo == 0) {
-            if (packet.getLength() == this.bufSize) {
-                this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData);
-            } else  {
-                UDUPMessage msg = this.serializer.deserialize(packetData);
-                System.out.println("UDP received message " + msg.getContent().getMessageId());
-                sendMessageFurther(msg);
-            }
+        if (packetNo == 1 && packet.getLength() < this.bufSize) {
+            UDUPMessage msg = this.serializer.deserialize(packetData);
+            System.out.println("UDP received message " + msg.getContent().getMessageId());
+            sendMessageFurther(msg);
         } else {
+            System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo);
             this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData);
         }
     }
@@ -93,7 +90,7 @@ public class UDUPServer {
         ArrayList<byte[]> previousPacketData = this.partialPackets.get(transmissionID);
         byte[] fullData = new byte[0];
 
-        previousPacketData.add(newPacketNo, newPacketData);
+        previousPacketData.add(newPacketNo - 1, newPacketData);
         this.partialPackets.put(transmissionID, previousPacketData);
 
         if (previousPacketData.contains(null)) {
@@ -112,15 +109,15 @@ public class UDUPServer {
             try {
                 byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData);
                 UDUPMessage msg = this.serializer.deserialize(allPacketData);
-                this.udp.sendMessage(msg.getContent());
                 this.partialPackets.remove(transmissionID);
-                System.out.println("Kryo put together whole transmission");
+                System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId());
+                this.udp.sendMessage(msg.getContent());
             } catch (Error | Exception e) {
                 System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest");
             }
         } else {
             ArrayList<byte[]> newTransmission = new ArrayList<byte[]>();
-            newTransmission.add(packetData);
+            newTransmission.add(newPacketNo-1, packetData);
             this.partialPackets.put(transmissionID, newTransmission);
         }
     }
-- 
cgit v1.2.3