From bfd4c421004ce35a9eac307c3ac4976caa1820ae Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= <mag.grodzinska@gmail.com>
Date: Sun, 5 Jan 2020 18:00:56 +0100
Subject: UDUP with no serialization working

---
 .../cloudatlas/agent/messages/UDUPMessage.java     |  61 +++++++++++
 .../edu/mimuw/cloudatlas/agent/modules/Module.java |  11 +-
 .../edu/mimuw/cloudatlas/agent/modules/UDUP.java   | 120 +++++++++++++++++++++
 .../mimuw/cloudatlas/agent/modules/UDUPClient.java |  89 +++++++++++++++
 .../mimuw/cloudatlas/agent/modules/UDUPServer.java |  68 ++++++++++++
 5 files changed, 343 insertions(+), 6 deletions(-)
 create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
 create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
 create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
 create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java

(limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent')

diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
new file mode 100644
index 0000000..335d6fe
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
@@ -0,0 +1,61 @@
+package pl.edu.mimuw.cloudatlas.agent.messages;
+
+import pl.edu.mimuw.cloudatlas.agent.modules.Module;
+import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
+import pl.edu.mimuw.cloudatlas.model.ValueContact;
+
+public class UDUPMessage extends AgentMessage {
+    private ValueContact contact;
+    private AgentMessage content;
+    private int retry;
+    private String conversationId;
+
+    public UDUPMessage(String messageId, long timestamp, ValueContact contact, AgentMessage content, int retry, String conversationId) {
+        super(messageId, ModuleType.UDP, timestamp);
+        this.contact = contact;
+        this.content = content;
+        this.retry = retry;
+        this.conversationId = conversationId;
+    }
+
+    public UDUPMessage(String messageId, ValueContact contact, AgentMessage content, int retry, String conversationId) {
+        super(messageId, ModuleType.UDP);
+        this.contact = contact;
+        this.content = content;
+        this.retry = retry;
+        this.conversationId = conversationId;
+    }
+
+    @Override
+    public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType {
+        module.handleTyped(this);
+    }
+
+    public AgentMessage getContent() {
+        return content;
+    }
+
+    public void setContent(AgentMessage content) {
+        this.content = content;
+    }
+
+    public int getRetry() {
+        return retry;
+    }
+
+    public String getConversationId() {
+        return conversationId;
+    }
+
+    public void setRetry(int retry) { this.retry = retry; }
+
+    public ValueContact getContact() { return contact; }
+
+    public void setContact(ValueContact contact) {
+        this.contact = contact;
+    }
+
+    public void setConversationId(String conversationId) {
+        this.conversationId = conversationId;
+    }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
index 0a934cb..67fdab9 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
@@ -1,12 +1,7 @@
 package pl.edu.mimuw.cloudatlas.agent.modules;
 
 import pl.edu.mimuw.cloudatlas.agent.Executor;
-import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.QurnikMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.RemikMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.*;
 
 /*
  * A Module is a (potentially stateful) event handler.
@@ -49,6 +44,10 @@ public abstract class Module {
         throw new InvalidMessageType("Got a ResponseMessage in module " + moduleType.toString());
     }
 
+    public void handleTyped(UDUPMessage message) throws InterruptedException, InvalidMessageType {
+        throw new InvalidMessageType("Got a ResponseMessage in module " + moduleType.toString());
+    }
+
     public void setExecutor(Executor executor) {
         this.executor = executor;
     }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
new file mode 100644
index 0000000..e4a7962
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
@@ -0,0 +1,120 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.primitives.Bytes;
+import pl.edu.mimuw.cloudatlas.agent.EventBus;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Communication over UDP
+ *
+ * Client-server exchange pattern:
+ *  server: init message
+ *  client: ack message
+ *
+ *  retry count
+ *  retry timeout after which retry happens
+ **
+ *  udp sends initiator module success/failure information
+ *
+ *  we have udps on different addresses with the same ports
+ *  due to ValueContact design
+ */
+
+public class UDUP extends Module implements Runnable {
+    public class InvalidConversation extends Exception {
+        public InvalidConversation(String message) { super(message); }
+    }
+
+    public class InvalidContact extends Exception {
+        public InvalidContact(String message) { super(message); }
+    }
+
+    private UDUPClient client;
+    private UDUPServer server;
+    private HashMap<String, UDUPMessage> currentConversations; // TODO find blocking one
+    private final AtomicBoolean running;
+
+    public UDUP(ModuleType moduleType,
+                InetAddress serverAddr,
+                int serverPort,
+                int retryTimeout,
+                int retriesCount,
+                int bufferSize) {
+        super(moduleType);
+        this.currentConversations = new HashMap<>();
+        this.running = new AtomicBoolean(true);
+        try {
+            this.client = new UDUPClient(this, serverPort, retryTimeout, retriesCount, bufferSize);
+            this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize);
+        } catch (SocketException | UnknownHostException e) {
+            e.printStackTrace();
+            this.client.close();
+            this.server.close();
+            this.running.getAndSet(false);
+        }
+    }
+
+    public void run() {
+        System.out.println("UDP server running");
+        while(this.running.get()) {
+            try {
+                this.server.acceptMessage();
+            } catch (IOException | InterruptedException e) {
+                e.printStackTrace();
+                this.running.getAndSet(false);
+                this.server.close();
+            }
+        }
+    }
+
+    public void handleTyped(UDUPMessage event) throws InterruptedException {
+//        System.out.println("UDP sending message " + event.getContent().getMessageId());
+        this.client.sendMessage(event);
+    }
+
+    // also used for updating
+    public void addConversation(UDUPMessage msg) {
+        this.currentConversations.put(msg.getConversationId(), msg);
+    }
+
+    public UDUPMessage fetchConversation(String conversationId) throws InvalidConversation {
+        UDUPMessage ret = this.currentConversations.get(conversationId);
+        if (ret == null) {
+            throw new InvalidConversation("Conversation does not exist");
+        } else {
+            return ret;
+        }
+    }
+
+    // TODO add conversation removal
+    public void removeConversation(String conversationId) {
+        this.currentConversations.remove(conversationId);
+    }
+
+    public UDUPMessage deserialize(ByteArrayInputStream in) {
+        Kryo kryo = new Kryo();
+        Input kryoInput = new Input(in);
+        UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class);
+        return msg;
+    }
+
+    public void serialize(ByteArrayOutputStream out, UDUPMessage msg) {
+        Kryo kryo = new Kryo();
+        Output kryoOut = new Output(out);
+        kryo.writeObject(kryoOut, msg);
+        kryoOut.flush();
+    }
+
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
new file mode 100644
index 0000000..93f2898
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
@@ -0,0 +1,89 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.*;
+
+public class UDUPClient {
+    private UDUP udp;
+    private int serverPort;
+    private int timeout;
+    private int retriesCount;
+    private DatagramSocket socket;
+    private int bufsize;
+
+    UDUPClient(UDUP udp, int serverPort, int timeout, int retriesCount, int bufferSize) throws SocketException, UnknownHostException {
+        this.udp = udp;
+        this.serverPort = serverPort;
+        this.timeout = timeout;
+        this.retriesCount = retriesCount;
+        this.socket = new DatagramSocket();
+        this.bufsize = bufferSize;
+    }
+
+    // TODO make sure that retry count in message is updated correctly
+    public void sendMessage(UDUPMessage msg) throws InterruptedException {
+        String messageId = msg.getMessageId();
+
+        if (msg.getRetry() >= this.retriesCount) {
+            this.udp.removeConversation(msg.getConversationId());
+        } else {
+            this.udp.addConversation(msg);
+            try {
+                sendUDP(msg);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+
+            msg.setRetry(msg.getRetry() + 1);
+
+            // TODO add sending message to timer with retry
+            /*
+            this.udp.executor.passMessage(new TimerSchedulerMessage(
+                    "",
+                    0,
+                    "",
+                    this.timeout,
+                    System.currentTimeMillis() / 1000L,
+                    new TimerScheduledTask() {
+                        @Override
+                        public void run() {
+                            try {
+                                this.sendMessage(msg);
+                            } catch (InterruptedException e) {
+                                e.printStackTrace();
+                            }
+                        }
+                    }));
+
+             */
+        }
+    }
+
+    private void sendUDP(UDUPMessage msg) throws IOException {
+        int offset = 0;
+        int outputSize;
+        byte[] buf = new byte[bufsize];
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+        this.udp.serialize(output, msg);
+        outputSize = output.size();
+
+        do {
+            output.write(buf, offset, bufsize);
+            System.out.println("UDP sends message: " + buf);
+            outputSize =- bufsize;
+            offset += bufsize;
+            DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort);
+            this.socket.send(packet);
+        } while (outputSize > bufsize);
+    }
+
+    void close() {
+        this.socket.close();
+    }
+
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
new file mode 100644
index 0000000..f89f986
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -0,0 +1,68 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import com.google.common.primitives.Bytes;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.*;
+import java.util.HashMap;
+
+public class UDUPServer {
+    UDUP udp;
+    private DatagramSocket socket;
+    private InetAddress address;
+    private byte[] buf;
+    private HashMap<InetAddress, byte[]> partialPackets;
+
+    public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException, UnknownHostException {
+        this.udp = udp;
+        this.socket = new DatagramSocket(port, addr);
+        this.address = addr;
+        this.buf = new byte[bufSize];
+        this.partialPackets = new HashMap<>();
+    }
+
+    public void acceptMessage() throws IOException, InterruptedException {
+        DatagramPacket packet = new DatagramPacket(buf, buf.length);
+        this.socket.receive(packet);
+        System.out.println("UDP received packet: " + packet.getData());
+
+        if (packet.getOffset() == 0) {
+            UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(packet.getData()));
+            // TODO check if not full packet anyway
+            // TODO check for errors if it's not the end od transmission
+
+            this.udp.addConversation(msg);
+            System.out.println("UDP received message " + msg.getContent().getMessageId());
+
+            if (msg.getDestinationModule() != ModuleType.UDP) {
+                this.udp.sendMessage(msg.getContent());
+            }
+        } else {
+            this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
+        }
+    }
+
+    public void addPartialMessageAndCheckSerialization(InetAddress senderAddress, byte[] packetData) {
+        if (this.partialPackets.containsKey(senderAddress)) {
+            byte[] previousPacketData = this.partialPackets.get(senderAddress);
+            byte[] allPacketData = Bytes.concat(previousPacketData, packetData);
+            try {
+                UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(allPacketData));
+                this.udp.sendMessage(msg.getContent());
+                this.partialPackets.remove(senderAddress);
+            } catch (Error | Exception e) {
+                System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest");
+                this.partialPackets.put(senderAddress, allPacketData);
+            }
+        } else {
+            this.partialPackets.put(senderAddress, packetData);
+        }
+    }
+
+    public void close() {
+        this.socket.close();
+    }
+
+}
-- 
cgit v1.2.3