From c077d789e25a88d8be8f8082c321d7aadb83cd96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Wed, 15 Jan 2020 03:24:48 +0100 Subject: Add cleaning to UDUP --- .../pl/edu/mimuw/cloudatlas/agent/AgentConfig.java | 2 +- .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 30 +++++++++++++++++++--- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java index 2363f4b..ad0f3bd 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java @@ -52,7 +52,7 @@ public class AgentConfig { modules.put(ModuleType.QUERY, new Qurnik()); modules.put(ModuleType.GOSSIP, new GossipGirl()); - UDUPServer server = new UDUPServer(serverAddr, port, bufsize); + UDUPServer server = new UDUPServer(serverAddr, port, bufsize, freshnessPeriod); modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server)); return modules; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index dae0420..677bf67 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -9,9 +9,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.*; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.NoSuchElementException; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; public class UDUPServer implements Runnable { @@ -20,25 +18,45 @@ public class UDUPServer implements Runnable { private DatagramSocket socket; private InetAddress address; private HashMap> partialPackets; + private HashMap partialPacketTimestamps; private int bufSize; private final AtomicBoolean running; + private long freshnessPeriod; - public UDUPServer(InetAddress addr, int port, int bufSize) throws SocketException { + public UDUPServer(InetAddress addr, int port, int bufSize, long freshnessPeriod) throws SocketException { this.socket = new DatagramSocket(port); this.address = addr; this.bufSize = bufSize; this.partialPackets = new HashMap<>(); this.serializer = new ByteSerializer(); this.running = new AtomicBoolean(false); + this.freshnessPeriod = freshnessPeriod; + this.partialPacketTimestamps = new HashMap<>(); } public void setUDUP(UDUP udup) { this.udp = udup; } + private void purgeData() { + long currentTime = System.currentTimeMillis(); + ArrayList packetsToRemove = new ArrayList<>(); + for (Map.Entry packetTimestamp : partialPacketTimestamps.entrySet()) { + if (packetTimestamp.getValue() + freshnessPeriod < currentTime) { + packetsToRemove.add(packetTimestamp.getKey()); + } + } + + for (String packetToRemove : packetsToRemove) { + partialPacketTimestamps.remove(packetToRemove); + partialPackets.remove(packetToRemove); + } + } + public void run() { System.out.println("UDP server running"); this.running.getAndSet(true); + while(this.running.get()) { try { this.acceptMessage(); @@ -71,6 +89,8 @@ public class UDUPServer implements Runnable { msg.getContent().setSenderAddress(packet.getAddress()); sendMessageFurther(msg); } + + purgeData(); } private DatagramPacket receivePacket() throws IOException { @@ -120,6 +140,7 @@ public class UDUPServer implements Runnable { previousPacketData.add(newPacketNo - 1, newPacketData); this.partialPackets.put(transmissionID, previousPacketData); + this.partialPacketTimestamps.put(transmissionID, System.currentTimeMillis()); if (previousPacketData.contains(null)) { throw new NoSuchElementException("Packet is not full"); @@ -135,6 +156,7 @@ public class UDUPServer implements Runnable { public UDUPMessage addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { UDUPMessage msg = null; + this.partialPacketTimestamps.put(transmissionID, System.currentTimeMillis()); if (this.partialPackets.containsKey(transmissionID)) { try { byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData); -- cgit v1.2.3