diff options
author | Magdalena GrodziĆska <mag.grodzinska@gmail.com> | 2020-01-15 04:46:19 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-15 04:46:19 +0100 |
commit | 97f644e3a983c6a24d26cb04b57b96a42afaa089 (patch) | |
tree | 6875ba1b10c804d728925c9cb282b404cd3c20f6 /src/main/java/pl/edu/mimuw/cloudatlas/agent | |
parent | 8678d8c922b439ff4a58e4b139c4085515a890f0 (diff) | |
parent | 0cb7ded87955286f950d2802fc6a0d1c9c26726a (diff) |
Merge pull request #123 from m-chrzan/clean_udup
Add cleaning to UDUP
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent')
-rw-r--r-- | src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java | 2 | ||||
-rw-r--r-- | src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java | 30 |
2 files changed, 27 insertions, 5 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java index 2363f4b..ad0f3bd 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java @@ -52,7 +52,7 @@ public class AgentConfig { modules.put(ModuleType.QUERY, new Qurnik()); modules.put(ModuleType.GOSSIP, new GossipGirl()); - UDUPServer server = new UDUPServer(serverAddr, port, bufsize); + UDUPServer server = new UDUPServer(serverAddr, port, bufsize, freshnessPeriod); modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server)); return modules; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index dae0420..677bf67 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -9,9 +9,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.*; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.NoSuchElementException; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; public class UDUPServer implements Runnable { @@ -20,25 +18,45 @@ public class UDUPServer implements Runnable { private DatagramSocket socket; private InetAddress address; private HashMap<String, ArrayList<byte[]>> partialPackets; + private HashMap<String, Long> partialPacketTimestamps; private int bufSize; private final AtomicBoolean running; + private long freshnessPeriod; - public UDUPServer(InetAddress addr, int port, int bufSize) throws SocketException { + public UDUPServer(InetAddress addr, int port, int bufSize, long freshnessPeriod) throws SocketException { this.socket = new DatagramSocket(port); this.address = addr; this.bufSize = bufSize; this.partialPackets = new HashMap<>(); this.serializer = new ByteSerializer(); this.running = new AtomicBoolean(false); + this.freshnessPeriod = freshnessPeriod; + this.partialPacketTimestamps = new HashMap<>(); } public void setUDUP(UDUP udup) { this.udp = udup; } + private void purgeData() { + long currentTime = System.currentTimeMillis(); + ArrayList<String> packetsToRemove = new ArrayList<>(); + for (Map.Entry<String, Long> packetTimestamp : partialPacketTimestamps.entrySet()) { + if (packetTimestamp.getValue() + freshnessPeriod < currentTime) { + packetsToRemove.add(packetTimestamp.getKey()); + } + } + + for (String packetToRemove : packetsToRemove) { + partialPacketTimestamps.remove(packetToRemove); + partialPackets.remove(packetToRemove); + } + } + public void run() { System.out.println("UDP server running"); this.running.getAndSet(true); + while(this.running.get()) { try { this.acceptMessage(); @@ -71,6 +89,8 @@ public class UDUPServer implements Runnable { msg.getContent().setSenderAddress(packet.getAddress()); sendMessageFurther(msg); } + + purgeData(); } private DatagramPacket receivePacket() throws IOException { @@ -120,6 +140,7 @@ public class UDUPServer implements Runnable { previousPacketData.add(newPacketNo - 1, newPacketData); this.partialPackets.put(transmissionID, previousPacketData); + this.partialPacketTimestamps.put(transmissionID, System.currentTimeMillis()); if (previousPacketData.contains(null)) { throw new NoSuchElementException("Packet is not full"); @@ -135,6 +156,7 @@ public class UDUPServer implements Runnable { public UDUPMessage addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { UDUPMessage msg = null; + this.partialPacketTimestamps.put(transmissionID, System.currentTimeMillis()); if (this.partialPackets.containsKey(transmissionID)) { try { byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData); |