From ab9a470aa67ef414581145ad671e119d9edb86d9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= <mag.grodzinska@gmail.com>
Date: Mon, 6 Jan 2020 16:14:44 +0100
Subject: Refactor UDUP module

---
 .../edu/mimuw/cloudatlas/agent/modules/UDUP.java   | 79 ++++--------------
 .../mimuw/cloudatlas/agent/modules/UDUPClient.java | 64 +++------------
 .../cloudatlas/agent/modules/UDUPSerializer.java   | 94 ++++++++++++++++++++++
 .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 30 +++----
 4 files changed, 137 insertions(+), 130 deletions(-)

(limited to 'src/main/java/pl')

diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
index e4a7962..e616c93 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
@@ -1,19 +1,10 @@
 package pl.edu.mimuw.cloudatlas.agent.modules;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.primitives.Bytes;
-import pl.edu.mimuw.cloudatlas.agent.EventBus;
 import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -32,37 +23,28 @@ import java.util.concurrent.atomic.AtomicBoolean;
  *  due to ValueContact design
  */
 
-public class UDUP extends Module implements Runnable {
-    public class InvalidConversation extends Exception {
-        public InvalidConversation(String message) { super(message); }
-    }
-
-    public class InvalidContact extends Exception {
-        public InvalidContact(String message) { super(message); }
-    }
+// TODO set server port in global config - must be the same everywhere
+// TODO same with buffer size
 
+public class UDUP extends Module implements Runnable {
     private UDUPClient client;
     private UDUPServer server;
-    private HashMap<String, UDUPMessage> currentConversations; // TODO find blocking one
     private final AtomicBoolean running;
 
-    public UDUP(ModuleType moduleType,
-                InetAddress serverAddr,
+    public UDUP(InetAddress serverAddr,
                 int serverPort,
-                int retryTimeout,
-                int retriesCount,
+                int timeout,
                 int bufferSize) {
-        super(moduleType);
-        this.currentConversations = new HashMap<>();
-        this.running = new AtomicBoolean(true);
+        super(ModuleType.UDP);
+        this.running = new AtomicBoolean(false);
         try {
-            this.client = new UDUPClient(this, serverPort, retryTimeout, retriesCount, bufferSize);
+            this.client = new UDUPClient(this, serverPort, bufferSize);
             this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize);
-        } catch (SocketException | UnknownHostException e) {
+            this.running.getAndSet(true);
+        } catch (SocketException e) {
             e.printStackTrace();
             this.client.close();
             this.server.close();
-            this.running.getAndSet(false);
         }
     }
 
@@ -80,41 +62,12 @@ public class UDUP extends Module implements Runnable {
     }
 
     public void handleTyped(UDUPMessage event) throws InterruptedException {
-//        System.out.println("UDP sending message " + event.getContent().getMessageId());
-        this.client.sendMessage(event);
-    }
-
-    // also used for updating
-    public void addConversation(UDUPMessage msg) {
-        this.currentConversations.put(msg.getConversationId(), msg);
-    }
-
-    public UDUPMessage fetchConversation(String conversationId) throws InvalidConversation {
-        UDUPMessage ret = this.currentConversations.get(conversationId);
-        if (ret == null) {
-            throw new InvalidConversation("Conversation does not exist");
-        } else {
-            return ret;
+        System.out.println("UDP sending message " + event.getContent().getMessageId());
+        try {
+            this.client.sendMessage(event);
+        } catch (IOException e) {
+            System.out.println("UDP send message failed");
+            e.printStackTrace();
         }
     }
-
-    // TODO add conversation removal
-    public void removeConversation(String conversationId) {
-        this.currentConversations.remove(conversationId);
-    }
-
-    public UDUPMessage deserialize(ByteArrayInputStream in) {
-        Kryo kryo = new Kryo();
-        Input kryoInput = new Input(in);
-        UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class);
-        return msg;
-    }
-
-    public void serialize(ByteArrayOutputStream out, UDUPMessage msg) {
-        Kryo kryo = new Kryo();
-        Output kryoOut = new Output(out);
-        kryo.writeObject(kryoOut, msg);
-        kryoOut.flush();
-    }
-
 }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
index 93f2898..82aaeb1 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
@@ -1,83 +1,41 @@
 package pl.edu.mimuw.cloudatlas.agent.modules;
 
-import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
 import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.*;
 
 public class UDUPClient {
     private UDUP udp;
+    private UDUPSerializer serializer;
     private int serverPort;
-    private int timeout;
-    private int retriesCount;
     private DatagramSocket socket;
     private int bufsize;
 
-    UDUPClient(UDUP udp, int serverPort, int timeout, int retriesCount, int bufferSize) throws SocketException, UnknownHostException {
+    UDUPClient(UDUP udp, int serverPort, int bufferSize) throws SocketException {
         this.udp = udp;
         this.serverPort = serverPort;
-        this.timeout = timeout;
-        this.retriesCount = retriesCount;
         this.socket = new DatagramSocket();
         this.bufsize = bufferSize;
+        this.serializer = new UDUPSerializer();
     }
 
-    // TODO make sure that retry count in message is updated correctly
-    public void sendMessage(UDUPMessage msg) throws InterruptedException {
-        String messageId = msg.getMessageId();
-
-        if (msg.getRetry() >= this.retriesCount) {
-            this.udp.removeConversation(msg.getConversationId());
-        } else {
-            this.udp.addConversation(msg);
-            try {
-                sendUDP(msg);
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-
-            msg.setRetry(msg.getRetry() + 1);
-
-            // TODO add sending message to timer with retry
-            /*
-            this.udp.executor.passMessage(new TimerSchedulerMessage(
-                    "",
-                    0,
-                    "",
-                    this.timeout,
-                    System.currentTimeMillis() / 1000L,
-                    new TimerScheduledTask() {
-                        @Override
-                        public void run() {
-                            try {
-                                this.sendMessage(msg);
-                            } catch (InterruptedException e) {
-                                e.printStackTrace();
-                            }
-                        }
-                    }));
-
-             */
-        }
-    }
-
-    private void sendUDP(UDUPMessage msg) throws IOException {
+    public void sendMessage(UDUPMessage msg) throws IOException {
         int offset = 0;
         int outputSize;
-        byte[] buf = new byte[bufsize];
-        ByteArrayOutputStream output = new ByteArrayOutputStream();
 
-        this.udp.serialize(output, msg);
-        outputSize = output.size();
+        byte[] buf = this.serializer.serialize(msg);
+        outputSize = buf.length;
 
         do {
-            output.write(buf, offset, bufsize);
-            System.out.println("UDP sends message: " + buf);
             outputSize =- bufsize;
             offset += bufsize;
             DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort);
+            System.out.println("UDP sends message: ");
+            for (byte b : buf) {
+                System.out.print(b);
+            }
+            System.out.println("to " + packet.getAddress());
             this.socket.send(packet);
         } while (outputSize > bufsize);
     }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
index 3196a97..ac35265 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
@@ -1,4 +1,98 @@
 package pl.edu.mimuw.cloudatlas.agent.modules;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+import pl.edu.mimuw.cloudatlas.model.PathName;
+import pl.edu.mimuw.cloudatlas.model.ValueContact;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * Serializes classes to and from byte arrays for UDP use
+ */
 public class UDUPSerializer {
+    private Kryo kryo;
+
+    UDUPSerializer() {
+        kryo = new Kryo();
+        kryo.setReferences(true);
+        kryo.setRegistrationRequired(true);
+        registerClasses();
+    }
+
+    private void registerClasses() {
+
+        kryo.register(Inet4Address.class, new Serializer() {
+
+            @Override
+            public void write(Kryo kryo, Output output, Object object) {
+                InetAddress ia = (InetAddress) object;
+                kryo.writeObject(output, ia.getAddress());
+            }
+
+            @Override
+            public Object read(Kryo kryo, Input input, Class type) {
+                try {
+                    byte[] buf = kryo.readObject(input, byte[].class);
+                    InetAddress addr = Inet4Address.getByAddress(buf);
+                    return addr;
+                } catch (UnknownHostException e) {
+                    System.out.println("Custom InetAddress read failed");
+                    e.printStackTrace();
+                    return null;
+                }
+            }
+        });
+
+        kryo.register(PathName.class, new Serializer() {
+
+            @Override
+            public void write(Kryo kryo, Output output, Object object) {
+                PathName pn = (PathName) object;
+                kryo.writeObject(output, pn.getName());
+            }
+
+            @Override
+            public Object read(Kryo kryo, Input input, Class type) {
+                String addr = input.readString();
+                return new PathName(addr);
+            }
+        });
+
+        kryo.register(byte[].class);
+        kryo.register(ValueContact.class);
+        kryo.register(ModuleType.class);
+
+        kryo.register(AgentMessage.class);
+        kryo.register(GetStateMessage.class);
+        kryo.register(UDUPMessage.class);
+        kryo.register(StanikMessage.Type.class);
+        kryo.register(StanikMessage.class);
+    }
+
+    public UDUPMessage deserialize(byte[] packetData) {
+        ByteArrayInputStream in = new ByteArrayInputStream(packetData);
+        Input kryoInput = new Input(in);
+        UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class);
+        return msg;
+    }
+
+    public byte[] serialize(UDUPMessage msg) {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        Output kryoOut = new Output(out);
+        kryo.writeObject(kryoOut, msg);
+        kryoOut.flush();
+        kryoOut.close();
+        return out.toByteArray();
+    }
 }
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index f89f986..9a64bbd 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -3,41 +3,43 @@ package pl.edu.mimuw.cloudatlas.agent.modules;
 import com.google.common.primitives.Bytes;
 import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.*;
 import java.util.HashMap;
 
 public class UDUPServer {
-    UDUP udp;
+    private UDUP udp;
+    private UDUPSerializer serializer;
     private DatagramSocket socket;
     private InetAddress address;
-    private byte[] buf;
     private HashMap<InetAddress, byte[]> partialPackets;
+    private int bufSize;
 
-    public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException, UnknownHostException {
+    public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException {
         this.udp = udp;
         this.socket = new DatagramSocket(port, addr);
         this.address = addr;
-        this.buf = new byte[bufSize];
+        this.bufSize = bufSize;
         this.partialPackets = new HashMap<>();
+        this.serializer = new UDUPSerializer();
     }
 
     public void acceptMessage() throws IOException, InterruptedException {
+        byte[] buf = new byte[bufSize];
         DatagramPacket packet = new DatagramPacket(buf, buf.length);
         this.socket.receive(packet);
-        System.out.println("UDP received packet: " + packet.getData());
+        System.out.println("UDP " + this.address + " received packet from " + packet.getAddress());
 
         if (packet.getOffset() == 0) {
-            UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(packet.getData()));
-            // TODO check if not full packet anyway
-            // TODO check for errors if it's not the end od transmission
-
-            this.udp.addConversation(msg);
+            UDUPMessage msg = this.serializer.deserialize(packet.getData());
             System.out.println("UDP received message " + msg.getContent().getMessageId());
 
-            if (msg.getDestinationModule() != ModuleType.UDP) {
-                this.udp.sendMessage(msg.getContent());
+            if (packet.getLength() == this.bufSize) {
+                this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
+            } else  {
+                if (msg.getDestinationModule() != ModuleType.UDP) {
+                    this.udp.sendMessage(msg.getContent());
+                }
             }
         } else {
             this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
@@ -49,7 +51,7 @@ public class UDUPServer {
             byte[] previousPacketData = this.partialPackets.get(senderAddress);
             byte[] allPacketData = Bytes.concat(previousPacketData, packetData);
             try {
-                UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(allPacketData));
+                UDUPMessage msg = this.serializer.deserialize(allPacketData);
                 this.udp.sendMessage(msg.getContent());
                 this.partialPackets.remove(senderAddress);
             } catch (Error | Exception e) {
-- 
cgit v1.2.3