From bfd4c421004ce35a9eac307c3ac4976caa1820ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Sun, 5 Jan 2020 18:00:56 +0100 Subject: UDUP with no serialization working --- .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 68 ++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java new file mode 100644 index 0000000..f89f986 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -0,0 +1,68 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import com.google.common.primitives.Bytes; +import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.net.*; +import java.util.HashMap; + +public class UDUPServer { + UDUP udp; + private DatagramSocket socket; + private InetAddress address; + private byte[] buf; + private HashMap partialPackets; + + public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException, UnknownHostException { + this.udp = udp; + this.socket = new DatagramSocket(port, addr); + this.address = addr; + this.buf = new byte[bufSize]; + this.partialPackets = new HashMap<>(); + } + + public void acceptMessage() throws IOException, InterruptedException { + DatagramPacket packet = new DatagramPacket(buf, buf.length); + this.socket.receive(packet); + System.out.println("UDP received packet: " + packet.getData()); + + if (packet.getOffset() == 0) { + UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(packet.getData())); + // TODO check if not full packet anyway + // TODO check for errors if it's not the end od transmission + + this.udp.addConversation(msg); + System.out.println("UDP received message " + msg.getContent().getMessageId()); + + if (msg.getDestinationModule() != ModuleType.UDP) { + this.udp.sendMessage(msg.getContent()); + } + } else { + this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); + } + } + + public void addPartialMessageAndCheckSerialization(InetAddress senderAddress, byte[] packetData) { + if (this.partialPackets.containsKey(senderAddress)) { + byte[] previousPacketData = this.partialPackets.get(senderAddress); + byte[] allPacketData = Bytes.concat(previousPacketData, packetData); + try { + UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(allPacketData)); + this.udp.sendMessage(msg.getContent()); + this.partialPackets.remove(senderAddress); + } catch (Error | Exception e) { + System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest"); + this.partialPackets.put(senderAddress, allPacketData); + } + } else { + this.partialPackets.put(senderAddress, packetData); + } + } + + public void close() { + this.socket.close(); + } + +} -- cgit v1.2.3 From ab9a470aa67ef414581145ad671e119d9edb86d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 6 Jan 2020 16:14:44 +0100 Subject: Refactor UDUP module --- .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 30 ++++++++++++---------- 1 file changed, 16 insertions(+), 14 deletions(-) (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index f89f986..9a64bbd 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -3,41 +3,43 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import com.google.common.primitives.Bytes; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.*; import java.util.HashMap; public class UDUPServer { - UDUP udp; + private UDUP udp; + private UDUPSerializer serializer; private DatagramSocket socket; private InetAddress address; - private byte[] buf; private HashMap partialPackets; + private int bufSize; - public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException, UnknownHostException { + public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException { this.udp = udp; this.socket = new DatagramSocket(port, addr); this.address = addr; - this.buf = new byte[bufSize]; + this.bufSize = bufSize; this.partialPackets = new HashMap<>(); + this.serializer = new UDUPSerializer(); } public void acceptMessage() throws IOException, InterruptedException { + byte[] buf = new byte[bufSize]; DatagramPacket packet = new DatagramPacket(buf, buf.length); this.socket.receive(packet); - System.out.println("UDP received packet: " + packet.getData()); + System.out.println("UDP " + this.address + " received packet from " + packet.getAddress()); if (packet.getOffset() == 0) { - UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(packet.getData())); - // TODO check if not full packet anyway - // TODO check for errors if it's not the end od transmission - - this.udp.addConversation(msg); + UDUPMessage msg = this.serializer.deserialize(packet.getData()); System.out.println("UDP received message " + msg.getContent().getMessageId()); - if (msg.getDestinationModule() != ModuleType.UDP) { - this.udp.sendMessage(msg.getContent()); + if (packet.getLength() == this.bufSize) { + this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); + } else { + if (msg.getDestinationModule() != ModuleType.UDP) { + this.udp.sendMessage(msg.getContent()); + } } } else { this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); @@ -49,7 +51,7 @@ public class UDUPServer { byte[] previousPacketData = this.partialPackets.get(senderAddress); byte[] allPacketData = Bytes.concat(previousPacketData, packetData); try { - UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(allPacketData)); + UDUPMessage msg = this.serializer.deserialize(allPacketData); this.udp.sendMessage(msg.getContent()); this.partialPackets.remove(senderAddress); } catch (Error | Exception e) { -- cgit v1.2.3 From 6027e8ffcc91495dc1cfe76da925e846f57646e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 6 Jan 2020 16:32:45 +0100 Subject: Fix passing messages in server --- src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index 9a64bbd..6807a86 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -37,7 +37,9 @@ public class UDUPServer { if (packet.getLength() == this.bufSize) { this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); } else { - if (msg.getDestinationModule() != ModuleType.UDP) { + if (msg.getContent().getDestinationModule() == ModuleType.TEST) { + System.out.println("UDP server: test message received"); + } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) { this.udp.sendMessage(msg.getContent()); } } -- cgit v1.2.3