From 5f02fa0e59dc84e12fae1fde61bdfa8edb5446b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 6 Jan 2020 18:58:25 +0100 Subject: Fix handling of multipart udp messages --- src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index 6807a86..fb79dc6 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -31,12 +31,11 @@ public class UDUPServer { System.out.println("UDP " + this.address + " received packet from " + packet.getAddress()); if (packet.getOffset() == 0) { - UDUPMessage msg = this.serializer.deserialize(packet.getData()); - System.out.println("UDP received message " + msg.getContent().getMessageId()); - if (packet.getLength() == this.bufSize) { this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); } else { + UDUPMessage msg = this.serializer.deserialize(packet.getData()); + System.out.println("UDP received message " + msg.getContent().getMessageId()); if (msg.getContent().getDestinationModule() == ModuleType.TEST) { System.out.println("UDP server: test message received"); } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) { -- cgit v1.2.3 From 2d7fe232b7c1f2ef62e7bf2f3100adb51e9bc0d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 6 Jan 2020 22:13:48 +0100 Subject: Add segmentation handling --- .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 103 ++++++++++++++++----- 1 file changed, 82 insertions(+), 21 deletions(-) (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index fb79dc6..c7ceca2 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -3,16 +3,20 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import com.google.common.primitives.Bytes; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.*; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; +import java.util.NoSuchElementException; public class UDUPServer { private UDUP udp; private UDUPSerializer serializer; private DatagramSocket socket; private InetAddress address; - private HashMap partialPackets; + private HashMap> partialPackets; private int bufSize; public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException { @@ -25,42 +29,99 @@ public class UDUPServer { } public void acceptMessage() throws IOException, InterruptedException { - byte[] buf = new byte[bufSize]; - DatagramPacket packet = new DatagramPacket(buf, buf.length); - this.socket.receive(packet); - System.out.println("UDP " + this.address + " received packet from " + packet.getAddress()); + DatagramPacket packet = receivePacket(); + int transmissionNo = readTransmissionNo(packet.getData()); + String transmissionID = packTransmissionID(transmissionNo, packet.getAddress()); + int packetNo = readPacketNo(packet.getData()); + byte[] packetData = trimPacketBuffer(packet.getData()); - if (packet.getOffset() == 0) { + if (packetNo == 0) { if (packet.getLength() == this.bufSize) { - this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); + this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); } else { - UDUPMessage msg = this.serializer.deserialize(packet.getData()); + UDUPMessage msg = this.serializer.deserialize(packetData); System.out.println("UDP received message " + msg.getContent().getMessageId()); - if (msg.getContent().getDestinationModule() == ModuleType.TEST) { - System.out.println("UDP server: test message received"); - } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) { - this.udp.sendMessage(msg.getContent()); - } + sendMessageFurther(msg); } } else { - this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); + this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); } } - public void addPartialMessageAndCheckSerialization(InetAddress senderAddress, byte[] packetData) { - if (this.partialPackets.containsKey(senderAddress)) { - byte[] previousPacketData = this.partialPackets.get(senderAddress); - byte[] allPacketData = Bytes.concat(previousPacketData, packetData); + private DatagramPacket receivePacket() throws IOException { + byte[] buf = new byte[bufSize]; + DatagramPacket packet = new DatagramPacket(buf, buf.length); + this.socket.receive(packet); + System.out.println("UDP " + this.address + " received packet from " + packet.getAddress()); + return packet; + } + + private void sendMessageFurther(UDUPMessage msg) throws InterruptedException { + if (msg.getContent().getDestinationModule() == ModuleType.TEST) { + System.out.println("UDP server: test message received"); + } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) { + this.udp.sendMessage(msg.getContent()); + } + } + + private int readTransmissionNo(byte[] packetData) throws IOException { + ByteArrayInputStream in = new ByteArrayInputStream(packetData, 0, 4); + byte[] byteBuf = new byte[4]; + in.read(byteBuf); + return ByteBuffer.wrap(byteBuf).getInt(); + } + + private int readPacketNo(byte[] packetData) throws IOException { + ByteArrayInputStream in = new ByteArrayInputStream(packetData,4, 4); + byte[] byteBuf = new byte[4]; + in.read(byteBuf); + return ByteBuffer.wrap(byteBuf).getInt(); + } + + private byte[] trimPacketBuffer(byte[] packetData) { + int newPacketDataSize = packetData.length - 8; + byte[] newPacketData = new byte[newPacketDataSize]; + System.arraycopy(packetData, 8, newPacketData, 0, newPacketDataSize); + return newPacketData; + } + + private String packTransmissionID(int transmissionNo, InetAddress contactAddr) { + return contactAddr.getHostAddress() + ":" + transmissionNo; + } + + private byte[] concatPacketData(String transmissionID, int newPacketNo, byte[] newPacketData) { + ArrayList previousPacketData = this.partialPackets.get(transmissionID); + byte[] fullData = new byte[0]; + + previousPacketData.add(newPacketNo, newPacketData); + this.partialPackets.put(transmissionID, previousPacketData); + + if (previousPacketData.contains(null)) { + throw new NoSuchElementException("Packet is not full"); + } else { + for (byte[] prevData : previousPacketData) { + fullData = Bytes.concat(fullData, prevData); + } + } + + return fullData; + } + + public void addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { + if (this.partialPackets.containsKey(transmissionID)) { try { + byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData); UDUPMessage msg = this.serializer.deserialize(allPacketData); this.udp.sendMessage(msg.getContent()); - this.partialPackets.remove(senderAddress); + this.partialPackets.remove(transmissionID); + System.out.println("Kryo put together whole transmission"); } catch (Error | Exception e) { System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest"); - this.partialPackets.put(senderAddress, allPacketData); } } else { - this.partialPackets.put(senderAddress, packetData); + ArrayList newTransmission = new ArrayList(); + newTransmission.add(packetData); + this.partialPackets.put(transmissionID, newTransmission); } } -- cgit v1.2.3 From 90d3d2e3e1e116bbb288d78e9c6c996a7f1e0270 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Fri, 10 Jan 2020 15:26:52 +0100 Subject: Fix segmentation --- .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index c7ceca2..b71a475 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -35,15 +35,12 @@ public class UDUPServer { int packetNo = readPacketNo(packet.getData()); byte[] packetData = trimPacketBuffer(packet.getData()); - if (packetNo == 0) { - if (packet.getLength() == this.bufSize) { - this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); - } else { - UDUPMessage msg = this.serializer.deserialize(packetData); - System.out.println("UDP received message " + msg.getContent().getMessageId()); - sendMessageFurther(msg); - } + if (packetNo == 1 && packet.getLength() < this.bufSize) { + UDUPMessage msg = this.serializer.deserialize(packetData); + System.out.println("UDP received message " + msg.getContent().getMessageId()); + sendMessageFurther(msg); } else { + System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo); this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); } } @@ -93,7 +90,7 @@ public class UDUPServer { ArrayList previousPacketData = this.partialPackets.get(transmissionID); byte[] fullData = new byte[0]; - previousPacketData.add(newPacketNo, newPacketData); + previousPacketData.add(newPacketNo - 1, newPacketData); this.partialPackets.put(transmissionID, previousPacketData); if (previousPacketData.contains(null)) { @@ -112,15 +109,15 @@ public class UDUPServer { try { byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData); UDUPMessage msg = this.serializer.deserialize(allPacketData); - this.udp.sendMessage(msg.getContent()); this.partialPackets.remove(transmissionID); - System.out.println("Kryo put together whole transmission"); + System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId()); + this.udp.sendMessage(msg.getContent()); } catch (Error | Exception e) { System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest"); } } else { ArrayList newTransmission = new ArrayList(); - newTransmission.add(packetData); + newTransmission.add(newPacketNo-1, packetData); this.partialPackets.put(transmissionID, newTransmission); } } -- cgit v1.2.3