From c077d789e25a88d8be8f8082c321d7aadb83cd96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Wed, 15 Jan 2020 03:24:48 +0100 Subject: Add cleaning to UDUP --- .../pl/edu/mimuw/cloudatlas/agent/AgentConfig.java | 2 +- .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 30 +++++++++++++++++++--- 2 files changed, 27 insertions(+), 5 deletions(-) (limited to 'src/main/java/pl/edu/mimuw') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java index 2363f4b..ad0f3bd 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java @@ -52,7 +52,7 @@ public class AgentConfig { modules.put(ModuleType.QUERY, new Qurnik()); modules.put(ModuleType.GOSSIP, new GossipGirl()); - UDUPServer server = new UDUPServer(serverAddr, port, bufsize); + UDUPServer server = new UDUPServer(serverAddr, port, bufsize, freshnessPeriod); modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server)); return modules; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index dae0420..677bf67 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -9,9 +9,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.*; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.NoSuchElementException; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; public class UDUPServer implements Runnable { @@ -20,25 +18,45 @@ public class UDUPServer implements Runnable { private DatagramSocket socket; private InetAddress address; private HashMap> partialPackets; + private HashMap partialPacketTimestamps; private int bufSize; private final AtomicBoolean running; + private long freshnessPeriod; - public UDUPServer(InetAddress addr, int port, int bufSize) throws SocketException { + public UDUPServer(InetAddress addr, int port, int bufSize, long freshnessPeriod) throws SocketException { this.socket = new DatagramSocket(port); this.address = addr; this.bufSize = bufSize; this.partialPackets = new HashMap<>(); this.serializer = new ByteSerializer(); this.running = new AtomicBoolean(false); + this.freshnessPeriod = freshnessPeriod; + this.partialPacketTimestamps = new HashMap<>(); } public void setUDUP(UDUP udup) { this.udp = udup; } + private void purgeData() { + long currentTime = System.currentTimeMillis(); + ArrayList packetsToRemove = new ArrayList<>(); + for (Map.Entry packetTimestamp : partialPacketTimestamps.entrySet()) { + if (packetTimestamp.getValue() + freshnessPeriod < currentTime) { + packetsToRemove.add(packetTimestamp.getKey()); + } + } + + for (String packetToRemove : packetsToRemove) { + partialPacketTimestamps.remove(packetToRemove); + partialPackets.remove(packetToRemove); + } + } + public void run() { System.out.println("UDP server running"); this.running.getAndSet(true); + while(this.running.get()) { try { this.acceptMessage(); @@ -71,6 +89,8 @@ public class UDUPServer implements Runnable { msg.getContent().setSenderAddress(packet.getAddress()); sendMessageFurther(msg); } + + purgeData(); } private DatagramPacket receivePacket() throws IOException { @@ -120,6 +140,7 @@ public class UDUPServer implements Runnable { previousPacketData.add(newPacketNo - 1, newPacketData); this.partialPackets.put(transmissionID, previousPacketData); + this.partialPacketTimestamps.put(transmissionID, System.currentTimeMillis()); if (previousPacketData.contains(null)) { throw new NoSuchElementException("Packet is not full"); @@ -135,6 +156,7 @@ public class UDUPServer implements Runnable { public UDUPMessage addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { UDUPMessage msg = null; + this.partialPacketTimestamps.put(transmissionID, System.currentTimeMillis()); if (this.partialPackets.containsKey(transmissionID)) { try { byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData); -- cgit v1.2.3 From 0cb7ded87955286f950d2802fc6a0d1c9c26726a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Wed, 15 Jan 2020 04:44:54 +0100 Subject: Add setting contacts in a fetcher flag --- .../java/pl/edu/mimuw/cloudatlas/client/ClientController.java | 8 ++++---- src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java | 11 +++++++---- 2 files changed, 11 insertions(+), 8 deletions(-) (limited to 'src/main/java/pl/edu/mimuw') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java b/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java index 5f34fe9..f147b2a 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java @@ -133,9 +133,9 @@ public class ClientController { return "contactsForm"; } - private Set parseContactsString(DataStringInput contactsInput) throws Exception { + public static Set parseContactsString(String contactsInput) throws Exception { Gson gson = new Gson(); - Map contactStrings = gson.fromJson(contactsInput.getString(), Map.class); + Map contactStrings = gson.fromJson(contactsInput, Map.class); Set contactObjects = new HashSet(); ArrayList cAddr; byte[] inetArray = new byte[4]; @@ -160,7 +160,7 @@ public class ClientController { Set contactObjects; try { - contactObjects = parseContactsString(contactsObject); + contactObjects = parseContactsString(contactsObject.getString()); this.agentApi.setFallbackContacts(contactObjects); } catch (Exception e) { success = false; @@ -263,7 +263,7 @@ public class ClientController { case "Contact": DataStringInput contactsString = new DataStringInput(); contactsString.setString(attributeObject.getValueString()); - attributeValue = parseContactsString(contactsString).iterator().next(); + attributeValue = parseContactsString(contactsString.getString()).iterator().next(); break; case "List": List parsedListValue = gson.fromJson(attributeObject.getValueString(), List.class); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java index 12d795a..2b8d033 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java @@ -7,11 +7,10 @@ import java.rmi.RemoteException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; import java.io.*; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.util.*; import com.google.gson.Gson; +import pl.edu.mimuw.cloudatlas.client.ClientController; import pl.edu.mimuw.cloudatlas.model.*; public class Fetcher { @@ -98,13 +97,17 @@ public class Fetcher { String jsonAttribs; System.out.println(System.getProperty("user.dir")); - + String fallbackContactsString = System.getProperty("fallback_contacts"); + System.out.println(fallbackContactsString); try { initializeApiStub(); initializePythonProcess(); bufferRead = new BufferedReader( new InputStreamReader(pythonProcess.getInputStream())); + Set fallbackContacts = new HashSet(); + api.setFallbackContacts(ClientController.parseContactsString(fallbackContactsString)); + while((jsonAttribs = bufferRead.readLine()) != null) { System.out.println(jsonAttribs); System.out.flush(); -- cgit v1.2.3