From bfd4c421004ce35a9eac307c3ac4976caa1820ae Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= <mag.grodzinska@gmail.com>
Date: Sun, 5 Jan 2020 18:00:56 +0100
Subject: UDUP with no serialization working

---
 .../cloudatlas/agent/messages/UDUPMessage.java     |  61 +++++++++++
 .../edu/mimuw/cloudatlas/agent/modules/Module.java |  11 +-
 .../edu/mimuw/cloudatlas/agent/modules/UDUP.java   | 120 +++++++++++++++++++++
 .../mimuw/cloudatlas/agent/modules/UDUPClient.java |  89 +++++++++++++++
 .../mimuw/cloudatlas/agent/modules/UDUPServer.java |  68 ++++++++++++
 5 files changed, 343 insertions(+), 6 deletions(-)
 create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
 create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
 create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
 create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java

(limited to 'src/main/java')

diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
new file mode 100644
index 0000000..335d6fe
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
@@ -0,0 +1,61 @@
+package pl.edu.mimuw.cloudatlas.agent.messages;
+
+import pl.edu.mimuw.cloudatlas.agent.modules.Module;
+import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
+import pl.edu.mimuw.cloudatlas.model.ValueContact;
+
+public class UDUPMessage extends AgentMessage {
+    private ValueContact contact;
+    private AgentMessage content;
+    private int retry;
+    private String conversationId;
+
+    public UDUPMessage(String messageId, long timestamp, ValueContact contact, AgentMessage content, int retry, String conversationId) {
+        super(messageId, ModuleType.UDP, timestamp);
+        this.contact = contact;
+        this.content = content;
+        this.retry = retry;
+        this.conversationId = conversationId;
+    }
+
+    public UDUPMessage(String messageId, ValueContact contact, AgentMessage content, int retry, String conversationId) {
+        super(messageId, ModuleType.UDP);
+        this.contact = contact;
+        this.content = content;
+        this.retry = retry;
+        this.conversationId = conversationId;
+    }
+
+    @Override
+    public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType {
+        module.handleTyped(this);
+    }
+
+    public AgentMessage getContent() {
+        return content;
+    }
+
+    public void setContent(AgentMessage content) {
+        this.content = content;
+    }
+
+    public int getRetry() {
+        return retry;
+    }
+
+    public String getConversationId() {
+        return conversationId;
+    }
+
+    public void setRetry(int retry) { this.retry = retry; }
+
+    public ValueContact getContact() { return contact; }
+
+    public void setContact(ValueContact contact) {
+        this.contact = contact;
+    }
+
+    public void setConversationId(String conversationId) {
+        this.conversationId = conversationId;
+    }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
index 0a934cb..67fdab9 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
@@ -1,12 +1,7 @@
 package pl.edu.mimuw.cloudatlas.agent.modules;
 
 import pl.edu.mimuw.cloudatlas.agent.Executor;
-import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.QurnikMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.RemikMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.*;
 
 /*
  * A Module is a (potentially stateful) event handler.
@@ -49,6 +44,10 @@ public abstract class Module {
         throw new InvalidMessageType("Got a ResponseMessage in module " + moduleType.toString());
     }
 
+    public void handleTyped(UDUPMessage message) throws InterruptedException, InvalidMessageType {
+        throw new InvalidMessageType("Got a ResponseMessage in module " + moduleType.toString());
+    }
+
     public void setExecutor(Executor executor) {
         this.executor = executor;
     }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
new file mode 100644
index 0000000..e4a7962
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
@@ -0,0 +1,120 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.primitives.Bytes;
+import pl.edu.mimuw.cloudatlas.agent.EventBus;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Communication over UDP
+ *
+ * Client-server exchange pattern:
+ *  server: init message
+ *  client: ack message
+ *
+ *  retry count
+ *  retry timeout after which retry happens
+ **
+ *  udp sends initiator module success/failure information
+ *
+ *  we have udps on different addresses with the same ports
+ *  due to ValueContact design
+ */
+
+public class UDUP extends Module implements Runnable {
+    public class InvalidConversation extends Exception {
+        public InvalidConversation(String message) { super(message); }
+    }
+
+    public class InvalidContact extends Exception {
+        public InvalidContact(String message) { super(message); }
+    }
+
+    private UDUPClient client;
+    private UDUPServer server;
+    private HashMap<String, UDUPMessage> currentConversations; // TODO find blocking one
+    private final AtomicBoolean running;
+
+    public UDUP(ModuleType moduleType,
+                InetAddress serverAddr,
+                int serverPort,
+                int retryTimeout,
+                int retriesCount,
+                int bufferSize) {
+        super(moduleType);
+        this.currentConversations = new HashMap<>();
+        this.running = new AtomicBoolean(true);
+        try {
+            this.client = new UDUPClient(this, serverPort, retryTimeout, retriesCount, bufferSize);
+            this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize);
+        } catch (SocketException | UnknownHostException e) {
+            e.printStackTrace();
+            this.client.close();
+            this.server.close();
+            this.running.getAndSet(false);
+        }
+    }
+
+    public void run() {
+        System.out.println("UDP server running");
+        while(this.running.get()) {
+            try {
+                this.server.acceptMessage();
+            } catch (IOException | InterruptedException e) {
+                e.printStackTrace();
+                this.running.getAndSet(false);
+                this.server.close();
+            }
+        }
+    }
+
+    public void handleTyped(UDUPMessage event) throws InterruptedException {
+//        System.out.println("UDP sending message " + event.getContent().getMessageId());
+        this.client.sendMessage(event);
+    }
+
+    // also used for updating
+    public void addConversation(UDUPMessage msg) {
+        this.currentConversations.put(msg.getConversationId(), msg);
+    }
+
+    public UDUPMessage fetchConversation(String conversationId) throws InvalidConversation {
+        UDUPMessage ret = this.currentConversations.get(conversationId);
+        if (ret == null) {
+            throw new InvalidConversation("Conversation does not exist");
+        } else {
+            return ret;
+        }
+    }
+
+    // TODO add conversation removal
+    public void removeConversation(String conversationId) {
+        this.currentConversations.remove(conversationId);
+    }
+
+    public UDUPMessage deserialize(ByteArrayInputStream in) {
+        Kryo kryo = new Kryo();
+        Input kryoInput = new Input(in);
+        UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class);
+        return msg;
+    }
+
+    public void serialize(ByteArrayOutputStream out, UDUPMessage msg) {
+        Kryo kryo = new Kryo();
+        Output kryoOut = new Output(out);
+        kryo.writeObject(kryoOut, msg);
+        kryoOut.flush();
+    }
+
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
new file mode 100644
index 0000000..93f2898
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
@@ -0,0 +1,89 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.*;
+
+public class UDUPClient {
+    private UDUP udp;
+    private int serverPort;
+    private int timeout;
+    private int retriesCount;
+    private DatagramSocket socket;
+    private int bufsize;
+
+    UDUPClient(UDUP udp, int serverPort, int timeout, int retriesCount, int bufferSize) throws SocketException, UnknownHostException {
+        this.udp = udp;
+        this.serverPort = serverPort;
+        this.timeout = timeout;
+        this.retriesCount = retriesCount;
+        this.socket = new DatagramSocket();
+        this.bufsize = bufferSize;
+    }
+
+    // TODO make sure that retry count in message is updated correctly
+    public void sendMessage(UDUPMessage msg) throws InterruptedException {
+        String messageId = msg.getMessageId();
+
+        if (msg.getRetry() >= this.retriesCount) {
+            this.udp.removeConversation(msg.getConversationId());
+        } else {
+            this.udp.addConversation(msg);
+            try {
+                sendUDP(msg);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+
+            msg.setRetry(msg.getRetry() + 1);
+
+            // TODO add sending message to timer with retry
+            /*
+            this.udp.executor.passMessage(new TimerSchedulerMessage(
+                    "",
+                    0,
+                    "",
+                    this.timeout,
+                    System.currentTimeMillis() / 1000L,
+                    new TimerScheduledTask() {
+                        @Override
+                        public void run() {
+                            try {
+                                this.sendMessage(msg);
+                            } catch (InterruptedException e) {
+                                e.printStackTrace();
+                            }
+                        }
+                    }));
+
+             */
+        }
+    }
+
+    private void sendUDP(UDUPMessage msg) throws IOException {
+        int offset = 0;
+        int outputSize;
+        byte[] buf = new byte[bufsize];
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+        this.udp.serialize(output, msg);
+        outputSize = output.size();
+
+        do {
+            output.write(buf, offset, bufsize);
+            System.out.println("UDP sends message: " + buf);
+            outputSize =- bufsize;
+            offset += bufsize;
+            DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort);
+            this.socket.send(packet);
+        } while (outputSize > bufsize);
+    }
+
+    void close() {
+        this.socket.close();
+    }
+
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
new file mode 100644
index 0000000..f89f986
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -0,0 +1,68 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import com.google.common.primitives.Bytes;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.*;
+import java.util.HashMap;
+
+public class UDUPServer {
+    UDUP udp;
+    private DatagramSocket socket;
+    private InetAddress address;
+    private byte[] buf;
+    private HashMap<InetAddress, byte[]> partialPackets;
+
+    public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException, UnknownHostException {
+        this.udp = udp;
+        this.socket = new DatagramSocket(port, addr);
+        this.address = addr;
+        this.buf = new byte[bufSize];
+        this.partialPackets = new HashMap<>();
+    }
+
+    public void acceptMessage() throws IOException, InterruptedException {
+        DatagramPacket packet = new DatagramPacket(buf, buf.length);
+        this.socket.receive(packet);
+        System.out.println("UDP received packet: " + packet.getData());
+
+        if (packet.getOffset() == 0) {
+            UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(packet.getData()));
+            // TODO check if not full packet anyway
+            // TODO check for errors if it's not the end od transmission
+
+            this.udp.addConversation(msg);
+            System.out.println("UDP received message " + msg.getContent().getMessageId());
+
+            if (msg.getDestinationModule() != ModuleType.UDP) {
+                this.udp.sendMessage(msg.getContent());
+            }
+        } else {
+            this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
+        }
+    }
+
+    public void addPartialMessageAndCheckSerialization(InetAddress senderAddress, byte[] packetData) {
+        if (this.partialPackets.containsKey(senderAddress)) {
+            byte[] previousPacketData = this.partialPackets.get(senderAddress);
+            byte[] allPacketData = Bytes.concat(previousPacketData, packetData);
+            try {
+                UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(allPacketData));
+                this.udp.sendMessage(msg.getContent());
+                this.partialPackets.remove(senderAddress);
+            } catch (Error | Exception e) {
+                System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest");
+                this.partialPackets.put(senderAddress, allPacketData);
+            }
+        } else {
+            this.partialPackets.put(senderAddress, packetData);
+        }
+    }
+
+    public void close() {
+        this.socket.close();
+    }
+
+}
-- 
cgit v1.2.3


From aa7278e17afb5129034bbe1af1cf4ca3c6ba3e90 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= <mag.grodzinska@gmail.com>
Date: Mon, 6 Jan 2020 12:18:13 +0100
Subject: Add UDUP basic test

---
 .../java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java    | 4 ++++
 1 file changed, 4 insertions(+)
 create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java

(limited to 'src/main/java')

diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
new file mode 100644
index 0000000..3196a97
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
@@ -0,0 +1,4 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+public class UDUPSerializer {
+}
-- 
cgit v1.2.3


From ab9a470aa67ef414581145ad671e119d9edb86d9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= <mag.grodzinska@gmail.com>
Date: Mon, 6 Jan 2020 16:14:44 +0100
Subject: Refactor UDUP module

---
 .../edu/mimuw/cloudatlas/agent/modules/UDUP.java   | 79 ++++--------------
 .../mimuw/cloudatlas/agent/modules/UDUPClient.java | 64 +++------------
 .../cloudatlas/agent/modules/UDUPSerializer.java   | 94 ++++++++++++++++++++++
 .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 30 +++----
 4 files changed, 137 insertions(+), 130 deletions(-)

(limited to 'src/main/java')

diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
index e4a7962..e616c93 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
@@ -1,19 +1,10 @@
 package pl.edu.mimuw.cloudatlas.agent.modules;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.primitives.Bytes;
-import pl.edu.mimuw.cloudatlas.agent.EventBus;
 import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -32,37 +23,28 @@ import java.util.concurrent.atomic.AtomicBoolean;
  *  due to ValueContact design
  */
 
-public class UDUP extends Module implements Runnable {
-    public class InvalidConversation extends Exception {
-        public InvalidConversation(String message) { super(message); }
-    }
-
-    public class InvalidContact extends Exception {
-        public InvalidContact(String message) { super(message); }
-    }
+// TODO set server port in global config - must be the same everywhere
+// TODO same with buffer size
 
+public class UDUP extends Module implements Runnable {
     private UDUPClient client;
     private UDUPServer server;
-    private HashMap<String, UDUPMessage> currentConversations; // TODO find blocking one
     private final AtomicBoolean running;
 
-    public UDUP(ModuleType moduleType,
-                InetAddress serverAddr,
+    public UDUP(InetAddress serverAddr,
                 int serverPort,
-                int retryTimeout,
-                int retriesCount,
+                int timeout,
                 int bufferSize) {
-        super(moduleType);
-        this.currentConversations = new HashMap<>();
-        this.running = new AtomicBoolean(true);
+        super(ModuleType.UDP);
+        this.running = new AtomicBoolean(false);
         try {
-            this.client = new UDUPClient(this, serverPort, retryTimeout, retriesCount, bufferSize);
+            this.client = new UDUPClient(this, serverPort, bufferSize);
             this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize);
-        } catch (SocketException | UnknownHostException e) {
+            this.running.getAndSet(true);
+        } catch (SocketException e) {
             e.printStackTrace();
             this.client.close();
             this.server.close();
-            this.running.getAndSet(false);
         }
     }
 
@@ -80,41 +62,12 @@ public class UDUP extends Module implements Runnable {
     }
 
     public void handleTyped(UDUPMessage event) throws InterruptedException {
-//        System.out.println("UDP sending message " + event.getContent().getMessageId());
-        this.client.sendMessage(event);
-    }
-
-    // also used for updating
-    public void addConversation(UDUPMessage msg) {
-        this.currentConversations.put(msg.getConversationId(), msg);
-    }
-
-    public UDUPMessage fetchConversation(String conversationId) throws InvalidConversation {
-        UDUPMessage ret = this.currentConversations.get(conversationId);
-        if (ret == null) {
-            throw new InvalidConversation("Conversation does not exist");
-        } else {
-            return ret;
+        System.out.println("UDP sending message " + event.getContent().getMessageId());
+        try {
+            this.client.sendMessage(event);
+        } catch (IOException e) {
+            System.out.println("UDP send message failed");
+            e.printStackTrace();
         }
     }
-
-    // TODO add conversation removal
-    public void removeConversation(String conversationId) {
-        this.currentConversations.remove(conversationId);
-    }
-
-    public UDUPMessage deserialize(ByteArrayInputStream in) {
-        Kryo kryo = new Kryo();
-        Input kryoInput = new Input(in);
-        UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class);
-        return msg;
-    }
-
-    public void serialize(ByteArrayOutputStream out, UDUPMessage msg) {
-        Kryo kryo = new Kryo();
-        Output kryoOut = new Output(out);
-        kryo.writeObject(kryoOut, msg);
-        kryoOut.flush();
-    }
-
 }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
index 93f2898..82aaeb1 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
@@ -1,83 +1,41 @@
 package pl.edu.mimuw.cloudatlas.agent.modules;
 
-import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
 import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.*;
 
 public class UDUPClient {
     private UDUP udp;
+    private UDUPSerializer serializer;
     private int serverPort;
-    private int timeout;
-    private int retriesCount;
     private DatagramSocket socket;
     private int bufsize;
 
-    UDUPClient(UDUP udp, int serverPort, int timeout, int retriesCount, int bufferSize) throws SocketException, UnknownHostException {
+    UDUPClient(UDUP udp, int serverPort, int bufferSize) throws SocketException {
         this.udp = udp;
         this.serverPort = serverPort;
-        this.timeout = timeout;
-        this.retriesCount = retriesCount;
         this.socket = new DatagramSocket();
         this.bufsize = bufferSize;
+        this.serializer = new UDUPSerializer();
     }
 
-    // TODO make sure that retry count in message is updated correctly
-    public void sendMessage(UDUPMessage msg) throws InterruptedException {
-        String messageId = msg.getMessageId();
-
-        if (msg.getRetry() >= this.retriesCount) {
-            this.udp.removeConversation(msg.getConversationId());
-        } else {
-            this.udp.addConversation(msg);
-            try {
-                sendUDP(msg);
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-
-            msg.setRetry(msg.getRetry() + 1);
-
-            // TODO add sending message to timer with retry
-            /*
-            this.udp.executor.passMessage(new TimerSchedulerMessage(
-                    "",
-                    0,
-                    "",
-                    this.timeout,
-                    System.currentTimeMillis() / 1000L,
-                    new TimerScheduledTask() {
-                        @Override
-                        public void run() {
-                            try {
-                                this.sendMessage(msg);
-                            } catch (InterruptedException e) {
-                                e.printStackTrace();
-                            }
-                        }
-                    }));
-
-             */
-        }
-    }
-
-    private void sendUDP(UDUPMessage msg) throws IOException {
+    public void sendMessage(UDUPMessage msg) throws IOException {
         int offset = 0;
         int outputSize;
-        byte[] buf = new byte[bufsize];
-        ByteArrayOutputStream output = new ByteArrayOutputStream();
 
-        this.udp.serialize(output, msg);
-        outputSize = output.size();
+        byte[] buf = this.serializer.serialize(msg);
+        outputSize = buf.length;
 
         do {
-            output.write(buf, offset, bufsize);
-            System.out.println("UDP sends message: " + buf);
             outputSize =- bufsize;
             offset += bufsize;
             DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort);
+            System.out.println("UDP sends message: ");
+            for (byte b : buf) {
+                System.out.print(b);
+            }
+            System.out.println("to " + packet.getAddress());
             this.socket.send(packet);
         } while (outputSize > bufsize);
     }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
index 3196a97..ac35265 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
@@ -1,4 +1,98 @@
 package pl.edu.mimuw.cloudatlas.agent.modules;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+import pl.edu.mimuw.cloudatlas.model.PathName;
+import pl.edu.mimuw.cloudatlas.model.ValueContact;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * Serializes classes to and from byte arrays for UDP use
+ */
 public class UDUPSerializer {
+    private Kryo kryo;
+
+    UDUPSerializer() {
+        kryo = new Kryo();
+        kryo.setReferences(true);
+        kryo.setRegistrationRequired(true);
+        registerClasses();
+    }
+
+    private void registerClasses() {
+
+        kryo.register(Inet4Address.class, new Serializer() {
+
+            @Override
+            public void write(Kryo kryo, Output output, Object object) {
+                InetAddress ia = (InetAddress) object;
+                kryo.writeObject(output, ia.getAddress());
+            }
+
+            @Override
+            public Object read(Kryo kryo, Input input, Class type) {
+                try {
+                    byte[] buf = kryo.readObject(input, byte[].class);
+                    InetAddress addr = Inet4Address.getByAddress(buf);
+                    return addr;
+                } catch (UnknownHostException e) {
+                    System.out.println("Custom InetAddress read failed");
+                    e.printStackTrace();
+                    return null;
+                }
+            }
+        });
+
+        kryo.register(PathName.class, new Serializer() {
+
+            @Override
+            public void write(Kryo kryo, Output output, Object object) {
+                PathName pn = (PathName) object;
+                kryo.writeObject(output, pn.getName());
+            }
+
+            @Override
+            public Object read(Kryo kryo, Input input, Class type) {
+                String addr = input.readString();
+                return new PathName(addr);
+            }
+        });
+
+        kryo.register(byte[].class);
+        kryo.register(ValueContact.class);
+        kryo.register(ModuleType.class);
+
+        kryo.register(AgentMessage.class);
+        kryo.register(GetStateMessage.class);
+        kryo.register(UDUPMessage.class);
+        kryo.register(StanikMessage.Type.class);
+        kryo.register(StanikMessage.class);
+    }
+
+    public UDUPMessage deserialize(byte[] packetData) {
+        ByteArrayInputStream in = new ByteArrayInputStream(packetData);
+        Input kryoInput = new Input(in);
+        UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class);
+        return msg;
+    }
+
+    public byte[] serialize(UDUPMessage msg) {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        Output kryoOut = new Output(out);
+        kryo.writeObject(kryoOut, msg);
+        kryoOut.flush();
+        kryoOut.close();
+        return out.toByteArray();
+    }
 }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index f89f986..9a64bbd 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -3,41 +3,43 @@ package pl.edu.mimuw.cloudatlas.agent.modules;
 import com.google.common.primitives.Bytes;
 import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.*;
 import java.util.HashMap;
 
 public class UDUPServer {
-    UDUP udp;
+    private UDUP udp;
+    private UDUPSerializer serializer;
     private DatagramSocket socket;
     private InetAddress address;
-    private byte[] buf;
     private HashMap<InetAddress, byte[]> partialPackets;
+    private int bufSize;
 
-    public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException, UnknownHostException {
+    public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException {
         this.udp = udp;
         this.socket = new DatagramSocket(port, addr);
         this.address = addr;
-        this.buf = new byte[bufSize];
+        this.bufSize = bufSize;
         this.partialPackets = new HashMap<>();
+        this.serializer = new UDUPSerializer();
     }
 
     public void acceptMessage() throws IOException, InterruptedException {
+        byte[] buf = new byte[bufSize];
         DatagramPacket packet = new DatagramPacket(buf, buf.length);
         this.socket.receive(packet);
-        System.out.println("UDP received packet: " + packet.getData());
+        System.out.println("UDP " + this.address + " received packet from " + packet.getAddress());
 
         if (packet.getOffset() == 0) {
-            UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(packet.getData()));
-            // TODO check if not full packet anyway
-            // TODO check for errors if it's not the end od transmission
-
-            this.udp.addConversation(msg);
+            UDUPMessage msg = this.serializer.deserialize(packet.getData());
             System.out.println("UDP received message " + msg.getContent().getMessageId());
 
-            if (msg.getDestinationModule() != ModuleType.UDP) {
-                this.udp.sendMessage(msg.getContent());
+            if (packet.getLength() == this.bufSize) {
+                this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
+            } else  {
+                if (msg.getDestinationModule() != ModuleType.UDP) {
+                    this.udp.sendMessage(msg.getContent());
+                }
             }
         } else {
             this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
@@ -49,7 +51,7 @@ public class UDUPServer {
             byte[] previousPacketData = this.partialPackets.get(senderAddress);
             byte[] allPacketData = Bytes.concat(previousPacketData, packetData);
             try {
-                UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(allPacketData));
+                UDUPMessage msg = this.serializer.deserialize(allPacketData);
                 this.udp.sendMessage(msg.getContent());
                 this.partialPackets.remove(senderAddress);
             } catch (Error | Exception e) {
-- 
cgit v1.2.3


From f8e0f371515a825ead0bf385105b461223c55e71 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= <mag.grodzinska@gmail.com>
Date: Mon, 6 Jan 2020 16:17:18 +0100
Subject: Refactor UDUP message

---
 .../cloudatlas/agent/messages/UDUPMessage.java     | 28 +++++-----------------
 1 file changed, 6 insertions(+), 22 deletions(-)

(limited to 'src/main/java')

diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
index 335d6fe..fa8d1fa 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
@@ -7,23 +7,21 @@ import pl.edu.mimuw.cloudatlas.model.ValueContact;
 public class UDUPMessage extends AgentMessage {
     private ValueContact contact;
     private AgentMessage content;
-    private int retry;
-    private String conversationId;
 
-    public UDUPMessage(String messageId, long timestamp, ValueContact contact, AgentMessage content, int retry, String conversationId) {
+    public UDUPMessage(String messageId, long timestamp, ValueContact contact, AgentMessage content) {
         super(messageId, ModuleType.UDP, timestamp);
         this.contact = contact;
         this.content = content;
-        this.retry = retry;
-        this.conversationId = conversationId;
     }
 
-    public UDUPMessage(String messageId, ValueContact contact, AgentMessage content, int retry, String conversationId) {
+    public UDUPMessage(String messageId, ValueContact contact, AgentMessage content) {
         super(messageId, ModuleType.UDP);
         this.contact = contact;
         this.content = content;
-        this.retry = retry;
-        this.conversationId = conversationId;
+    }
+
+    public UDUPMessage() {
+        super("", ModuleType.UDP);
     }
 
     @Override
@@ -39,23 +37,9 @@ public class UDUPMessage extends AgentMessage {
         this.content = content;
     }
 
-    public int getRetry() {
-        return retry;
-    }
-
-    public String getConversationId() {
-        return conversationId;
-    }
-
-    public void setRetry(int retry) { this.retry = retry; }
-
     public ValueContact getContact() { return contact; }
 
     public void setContact(ValueContact contact) {
         this.contact = contact;
     }
-
-    public void setConversationId(String conversationId) {
-        this.conversationId = conversationId;
-    }
 }
-- 
cgit v1.2.3


From 6027e8ffcc91495dc1cfe76da925e846f57646e7 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= <mag.grodzinska@gmail.com>
Date: Mon, 6 Jan 2020 16:32:45 +0100
Subject: Fix passing messages in server

---
 src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

(limited to 'src/main/java')

diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index 9a64bbd..6807a86 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -37,7 +37,9 @@ public class UDUPServer {
             if (packet.getLength() == this.bufSize) {
                 this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
             } else  {
-                if (msg.getDestinationModule() != ModuleType.UDP) {
+                if (msg.getContent().getDestinationModule() == ModuleType.TEST) {
+                    System.out.println("UDP server: test message received");
+                } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) {
                     this.udp.sendMessage(msg.getContent());
                 }
             }
-- 
cgit v1.2.3


From 360cb66d187b64ae51c792d87d0d425e73b48d0a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= <mag.grodzinska@gmail.com>
Date: Mon, 6 Jan 2020 18:02:20 +0100
Subject: Add udup module initialization

---
 src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

(limited to 'src/main/java')

diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
index 0efa710..ef4f48d 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
@@ -1,5 +1,8 @@
 package pl.edu.mimuw.cloudatlas.agent;
 
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.rmi.registry.LocateRegistry;
 import java.rmi.registry.Registry;
 import java.rmi.server.UnicastRemoteObject;
@@ -12,14 +15,8 @@ import pl.edu.mimuw.cloudatlas.agent.NewApiImplementation;
 import pl.edu.mimuw.cloudatlas.agent.messages.RunQueriesMessage;
 import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
 import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage;
+import pl.edu.mimuw.cloudatlas.agent.modules.*;
 import pl.edu.mimuw.cloudatlas.agent.modules.Module;
-import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
-import pl.edu.mimuw.cloudatlas.agent.modules.Qurnik;
-import pl.edu.mimuw.cloudatlas.agent.modules.Remik;
-import pl.edu.mimuw.cloudatlas.agent.modules.Stanik;
-import pl.edu.mimuw.cloudatlas.agent.modules.RecursiveScheduledTask;
-import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask;
-import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduler;
 import pl.edu.mimuw.cloudatlas.api.Api;
 import pl.edu.mimuw.cloudatlas.interpreter.Main;
 import pl.edu.mimuw.cloudatlas.model.PathName;
@@ -48,6 +45,11 @@ public class Agent {
         modules.put(ModuleType.RMI, new Remik());
         modules.put(ModuleType.STATE, new Stanik());
         modules.put(ModuleType.QUERY, new Qurnik());
+        try {
+            modules.put(ModuleType.UDP, new UDUP(InetAddress.getByName("127.0.0.1"), 5988, 5000, 20000));
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
         // TODO add modules as we implement them
         return modules;
     }
-- 
cgit v1.2.3