m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
diff options
context:
space:
mode:
authorMagdalena Grodzińska <mag.grodzinska@gmail.com>2020-01-06 22:13:48 +0100
committerMagdalena Grodzińska <mag.grodzinska@gmail.com>2020-01-06 23:12:10 +0100
commit2d7fe232b7c1f2ef62e7bf2f3100adb51e9bc0d4 (patch)
treea061de5c7e26a0755721b20daff262591195b8d4 /src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
parent28c905a7365a37d0874b865f513c64b31d8679f4 (diff)
Add segmentation handling
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java103
1 files changed, 82 insertions, 21 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index fb79dc6..c7ceca2 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -3,16 +3,20 @@ package pl.edu.mimuw.cloudatlas.agent.modules;
import com.google.common.primitives.Bytes;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.NoSuchElementException;
public class UDUPServer {
private UDUP udp;
private UDUPSerializer serializer;
private DatagramSocket socket;
private InetAddress address;
- private HashMap<InetAddress, byte[]> partialPackets;
+ private HashMap<String, ArrayList<byte[]>> partialPackets;
private int bufSize;
public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException {
@@ -25,42 +29,99 @@ public class UDUPServer {
}
public void acceptMessage() throws IOException, InterruptedException {
- byte[] buf = new byte[bufSize];
- DatagramPacket packet = new DatagramPacket(buf, buf.length);
- this.socket.receive(packet);
- System.out.println("UDP " + this.address + " received packet from " + packet.getAddress());
+ DatagramPacket packet = receivePacket();
+ int transmissionNo = readTransmissionNo(packet.getData());
+ String transmissionID = packTransmissionID(transmissionNo, packet.getAddress());
+ int packetNo = readPacketNo(packet.getData());
+ byte[] packetData = trimPacketBuffer(packet.getData());
- if (packet.getOffset() == 0) {
+ if (packetNo == 0) {
if (packet.getLength() == this.bufSize) {
- this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
+ this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData);
} else {
- UDUPMessage msg = this.serializer.deserialize(packet.getData());
+ UDUPMessage msg = this.serializer.deserialize(packetData);
System.out.println("UDP received message " + msg.getContent().getMessageId());
- if (msg.getContent().getDestinationModule() == ModuleType.TEST) {
- System.out.println("UDP server: test message received");
- } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) {
- this.udp.sendMessage(msg.getContent());
- }
+ sendMessageFurther(msg);
}
} else {
- this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
+ this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData);
}
}
- public void addPartialMessageAndCheckSerialization(InetAddress senderAddress, byte[] packetData) {
- if (this.partialPackets.containsKey(senderAddress)) {
- byte[] previousPacketData = this.partialPackets.get(senderAddress);
- byte[] allPacketData = Bytes.concat(previousPacketData, packetData);
+ private DatagramPacket receivePacket() throws IOException {
+ byte[] buf = new byte[bufSize];
+ DatagramPacket packet = new DatagramPacket(buf, buf.length);
+ this.socket.receive(packet);
+ System.out.println("UDP " + this.address + " received packet from " + packet.getAddress());
+ return packet;
+ }
+
+ private void sendMessageFurther(UDUPMessage msg) throws InterruptedException {
+ if (msg.getContent().getDestinationModule() == ModuleType.TEST) {
+ System.out.println("UDP server: test message received");
+ } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) {
+ this.udp.sendMessage(msg.getContent());
+ }
+ }
+
+ private int readTransmissionNo(byte[] packetData) throws IOException {
+ ByteArrayInputStream in = new ByteArrayInputStream(packetData, 0, 4);
+ byte[] byteBuf = new byte[4];
+ in.read(byteBuf);
+ return ByteBuffer.wrap(byteBuf).getInt();
+ }
+
+ private int readPacketNo(byte[] packetData) throws IOException {
+ ByteArrayInputStream in = new ByteArrayInputStream(packetData,4, 4);
+ byte[] byteBuf = new byte[4];
+ in.read(byteBuf);
+ return ByteBuffer.wrap(byteBuf).getInt();
+ }
+
+ private byte[] trimPacketBuffer(byte[] packetData) {
+ int newPacketDataSize = packetData.length - 8;
+ byte[] newPacketData = new byte[newPacketDataSize];
+ System.arraycopy(packetData, 8, newPacketData, 0, newPacketDataSize);
+ return newPacketData;
+ }
+
+ private String packTransmissionID(int transmissionNo, InetAddress contactAddr) {
+ return contactAddr.getHostAddress() + ":" + transmissionNo;
+ }
+
+ private byte[] concatPacketData(String transmissionID, int newPacketNo, byte[] newPacketData) {
+ ArrayList<byte[]> previousPacketData = this.partialPackets.get(transmissionID);
+ byte[] fullData = new byte[0];
+
+ previousPacketData.add(newPacketNo, newPacketData);
+ this.partialPackets.put(transmissionID, previousPacketData);
+
+ if (previousPacketData.contains(null)) {
+ throw new NoSuchElementException("Packet is not full");
+ } else {
+ for (byte[] prevData : previousPacketData) {
+ fullData = Bytes.concat(fullData, prevData);
+ }
+ }
+
+ return fullData;
+ }
+
+ public void addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) {
+ if (this.partialPackets.containsKey(transmissionID)) {
try {
+ byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData);
UDUPMessage msg = this.serializer.deserialize(allPacketData);
this.udp.sendMessage(msg.getContent());
- this.partialPackets.remove(senderAddress);
+ this.partialPackets.remove(transmissionID);
+ System.out.println("Kryo put together whole transmission");
} catch (Error | Exception e) {
System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest");
- this.partialPackets.put(senderAddress, allPacketData);
}
} else {
- this.partialPackets.put(senderAddress, packetData);
+ ArrayList<byte[]> newTransmission = new ArrayList<byte[]>();
+ newTransmission.add(packetData);
+ this.partialPackets.put(transmissionID, newTransmission);
}
}