From bb0d92a3a0339695776797d25252815bf8921fa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Fri, 10 Jan 2020 15:53:02 +0100 Subject: Separate server from udup --- .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 26 +++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index b71a475..d6180be 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -10,22 +10,42 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicBoolean; -public class UDUPServer { +public class UDUPServer implements Runnable { private UDUP udp; private UDUPSerializer serializer; private DatagramSocket socket; private InetAddress address; private HashMap> partialPackets; private int bufSize; + private final AtomicBoolean running; - public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException { - this.udp = udp; + public UDUPServer(InetAddress addr, int port, int bufSize) throws SocketException { this.socket = new DatagramSocket(port, addr); this.address = addr; this.bufSize = bufSize; this.partialPackets = new HashMap<>(); this.serializer = new UDUPSerializer(); + this.running = new AtomicBoolean(false); + } + + public void setUDUP(UDUP udup) { + this.udp = udup; + } + + public void run() { + System.out.println("UDP server running"); + this.running.getAndSet(true); + while(this.running.get()) { + try { + this.acceptMessage(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + this.running.getAndSet(false); + this.close(); + } + } } public void acceptMessage() throws IOException, InterruptedException { -- cgit v1.2.3 From 1f87c2d319b2671bd2d61feb3a76d102059c4c52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Fri, 10 Jan 2020 19:06:26 +0100 Subject: Make UDUP handle only gossip girl messages with its timestamps --- .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index d6180be..94882e4 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import com.google.common.primitives.Bytes; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.model.ValueUtils; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -54,14 +55,19 @@ public class UDUPServer implements Runnable { String transmissionID = packTransmissionID(transmissionNo, packet.getAddress()); int packetNo = readPacketNo(packet.getData()); byte[] packetData = trimPacketBuffer(packet.getData()); + UDUPMessage msg; if (packetNo == 1 && packet.getLength() < this.bufSize) { - UDUPMessage msg = this.serializer.deserialize(packetData); + msg = this.serializer.deserialize(packetData); + msg.getContent().setReceivedTimestamp(ValueUtils.currentTime()); System.out.println("UDP received message " + msg.getContent().getMessageId()); - sendMessageFurther(msg); } else { System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo); - this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); + msg = this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); + } + + if (msg != null) { + sendMessageFurther(msg); } } @@ -124,14 +130,16 @@ public class UDUPServer implements Runnable { return fullData; } - public void addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { + public UDUPMessage addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { + UDUPMessage msg = null; + if (this.partialPackets.containsKey(transmissionID)) { try { byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData); - UDUPMessage msg = this.serializer.deserialize(allPacketData); + msg = this.serializer.deserialize(allPacketData); + msg.getContent().setReceivedTimestamp(ValueUtils.currentTime()); this.partialPackets.remove(transmissionID); System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId()); - this.udp.sendMessage(msg.getContent()); } catch (Error | Exception e) { System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest"); } @@ -140,6 +148,8 @@ public class UDUPServer implements Runnable { newTransmission.add(newPacketNo-1, packetData); this.partialPackets.put(transmissionID, newTransmission); } + + return msg; } public void close() { -- cgit v1.2.3