m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/pl/edu/mimuw/cloudatlas/agent
diff options
context:
space:
mode:
authorMagdalena Grodzińska <mag.grodzinska@gmail.com>2020-01-15 03:24:48 +0100
committerMagdalena Grodzińska <mag.grodzinska@gmail.com>2020-01-15 03:24:48 +0100
commitc077d789e25a88d8be8f8082c321d7aadb83cd96 (patch)
treeb3a1843bd8bec3262180c5101bb7ab216ac85c7c /src/main/java/pl/edu/mimuw/cloudatlas/agent
parent5096d5fdc1a550a53511ca4478394f778cef10be (diff)
Add cleaning to UDUP
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java30
2 files changed, 27 insertions, 5 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
index 2363f4b..ad0f3bd 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
@@ -52,7 +52,7 @@ public class AgentConfig {
modules.put(ModuleType.QUERY, new Qurnik());
modules.put(ModuleType.GOSSIP, new GossipGirl());
- UDUPServer server = new UDUPServer(serverAddr, port, bufsize);
+ UDUPServer server = new UDUPServer(serverAddr, port, bufsize, freshnessPeriod);
modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server));
return modules;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index dae0420..677bf67 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -9,9 +9,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.NoSuchElementException;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class UDUPServer implements Runnable {
@@ -20,25 +18,45 @@ public class UDUPServer implements Runnable {
private DatagramSocket socket;
private InetAddress address;
private HashMap<String, ArrayList<byte[]>> partialPackets;
+ private HashMap<String, Long> partialPacketTimestamps;
private int bufSize;
private final AtomicBoolean running;
+ private long freshnessPeriod;
- public UDUPServer(InetAddress addr, int port, int bufSize) throws SocketException {
+ public UDUPServer(InetAddress addr, int port, int bufSize, long freshnessPeriod) throws SocketException {
this.socket = new DatagramSocket(port);
this.address = addr;
this.bufSize = bufSize;
this.partialPackets = new HashMap<>();
this.serializer = new ByteSerializer();
this.running = new AtomicBoolean(false);
+ this.freshnessPeriod = freshnessPeriod;
+ this.partialPacketTimestamps = new HashMap<>();
}
public void setUDUP(UDUP udup) {
this.udp = udup;
}
+ private void purgeData() {
+ long currentTime = System.currentTimeMillis();
+ ArrayList<String> packetsToRemove = new ArrayList<>();
+ for (Map.Entry<String, Long> packetTimestamp : partialPacketTimestamps.entrySet()) {
+ if (packetTimestamp.getValue() + freshnessPeriod < currentTime) {
+ packetsToRemove.add(packetTimestamp.getKey());
+ }
+ }
+
+ for (String packetToRemove : packetsToRemove) {
+ partialPacketTimestamps.remove(packetToRemove);
+ partialPackets.remove(packetToRemove);
+ }
+ }
+
public void run() {
System.out.println("UDP server running");
this.running.getAndSet(true);
+
while(this.running.get()) {
try {
this.acceptMessage();
@@ -71,6 +89,8 @@ public class UDUPServer implements Runnable {
msg.getContent().setSenderAddress(packet.getAddress());
sendMessageFurther(msg);
}
+
+ purgeData();
}
private DatagramPacket receivePacket() throws IOException {
@@ -120,6 +140,7 @@ public class UDUPServer implements Runnable {
previousPacketData.add(newPacketNo - 1, newPacketData);
this.partialPackets.put(transmissionID, previousPacketData);
+ this.partialPacketTimestamps.put(transmissionID, System.currentTimeMillis());
if (previousPacketData.contains(null)) {
throw new NoSuchElementException("Packet is not full");
@@ -135,6 +156,7 @@ public class UDUPServer implements Runnable {
public UDUPMessage addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) {
UDUPMessage msg = null;
+ this.partialPacketTimestamps.put(transmissionID, System.currentTimeMillis());
if (this.partialPackets.containsKey(transmissionID)) {
try {
byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData);