m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
diff options
context:
space:
mode:
authorMarcin Chrzanowski <marcin.j.chrzanowski@gmail.com>2020-01-06 20:34:06 +0100
committerMarcin Chrzanowski <marcin.j.chrzanowski@gmail.com>2020-01-06 20:34:06 +0100
commit7b6d9a490cd07a6ffaf4b69df501a72c538621de (patch)
treeb9a636da41f8e23d84a9ef93961758ea45268f3b /src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
parent4aaa6218b853873c632aba0ed8696f29640041d1 (diff)
parent2feff1aa41c41008fcda2dd60c718cf09deb3fa1 (diff)
Merge branch 'master' into gossip-girl
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java72
1 files changed, 72 insertions, 0 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
new file mode 100644
index 0000000..6807a86
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -0,0 +1,72 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import com.google.common.primitives.Bytes;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+
+import java.io.IOException;
+import java.net.*;
+import java.util.HashMap;
+
+public class UDUPServer {
+ private UDUP udp;
+ private UDUPSerializer serializer;
+ private DatagramSocket socket;
+ private InetAddress address;
+ private HashMap<InetAddress, byte[]> partialPackets;
+ private int bufSize;
+
+ public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException {
+ this.udp = udp;
+ this.socket = new DatagramSocket(port, addr);
+ this.address = addr;
+ this.bufSize = bufSize;
+ this.partialPackets = new HashMap<>();
+ this.serializer = new UDUPSerializer();
+ }
+
+ public void acceptMessage() throws IOException, InterruptedException {
+ byte[] buf = new byte[bufSize];
+ DatagramPacket packet = new DatagramPacket(buf, buf.length);
+ this.socket.receive(packet);
+ System.out.println("UDP " + this.address + " received packet from " + packet.getAddress());
+
+ if (packet.getOffset() == 0) {
+ UDUPMessage msg = this.serializer.deserialize(packet.getData());
+ System.out.println("UDP received message " + msg.getContent().getMessageId());
+
+ if (packet.getLength() == this.bufSize) {
+ this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
+ } else {
+ if (msg.getContent().getDestinationModule() == ModuleType.TEST) {
+ System.out.println("UDP server: test message received");
+ } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) {
+ this.udp.sendMessage(msg.getContent());
+ }
+ }
+ } else {
+ this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
+ }
+ }
+
+ public void addPartialMessageAndCheckSerialization(InetAddress senderAddress, byte[] packetData) {
+ if (this.partialPackets.containsKey(senderAddress)) {
+ byte[] previousPacketData = this.partialPackets.get(senderAddress);
+ byte[] allPacketData = Bytes.concat(previousPacketData, packetData);
+ try {
+ UDUPMessage msg = this.serializer.deserialize(allPacketData);
+ this.udp.sendMessage(msg.getContent());
+ this.partialPackets.remove(senderAddress);
+ } catch (Error | Exception e) {
+ System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest");
+ this.partialPackets.put(senderAddress, allPacketData);
+ }
+ } else {
+ this.partialPackets.put(senderAddress, packetData);
+ }
+ }
+
+ public void close() {
+ this.socket.close();
+ }
+
+}