diff options
author | Magdalena GrodziĆska <mag.grodzinska@gmail.com> | 2020-01-15 04:46:19 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-15 04:46:19 +0100 |
commit | 97f644e3a983c6a24d26cb04b57b96a42afaa089 (patch) | |
tree | 6875ba1b10c804d728925c9cb282b404cd3c20f6 /src/main/java | |
parent | 8678d8c922b439ff4a58e4b139c4085515a890f0 (diff) | |
parent | 0cb7ded87955286f950d2802fc6a0d1c9c26726a (diff) |
Merge pull request #123 from m-chrzan/clean_udup
Add cleaning to UDUP
Diffstat (limited to 'src/main/java')
4 files changed, 38 insertions, 13 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java index 2363f4b..ad0f3bd 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java @@ -52,7 +52,7 @@ public class AgentConfig { modules.put(ModuleType.QUERY, new Qurnik()); modules.put(ModuleType.GOSSIP, new GossipGirl()); - UDUPServer server = new UDUPServer(serverAddr, port, bufsize); + UDUPServer server = new UDUPServer(serverAddr, port, bufsize, freshnessPeriod); modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server)); return modules; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index dae0420..677bf67 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -9,9 +9,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.*; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.NoSuchElementException; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; public class UDUPServer implements Runnable { @@ -20,25 +18,45 @@ public class UDUPServer implements Runnable { private DatagramSocket socket; private InetAddress address; private HashMap<String, ArrayList<byte[]>> partialPackets; + private HashMap<String, Long> partialPacketTimestamps; private int bufSize; private final AtomicBoolean running; + private long freshnessPeriod; - public UDUPServer(InetAddress addr, int port, int bufSize) throws SocketException { + public UDUPServer(InetAddress addr, int port, int bufSize, long freshnessPeriod) throws SocketException { this.socket = new DatagramSocket(port); this.address = addr; this.bufSize = bufSize; this.partialPackets = new HashMap<>(); this.serializer = new ByteSerializer(); this.running = new AtomicBoolean(false); + this.freshnessPeriod = freshnessPeriod; + this.partialPacketTimestamps = new HashMap<>(); } public void setUDUP(UDUP udup) { this.udp = udup; } + private void purgeData() { + long currentTime = System.currentTimeMillis(); + ArrayList<String> packetsToRemove = new ArrayList<>(); + for (Map.Entry<String, Long> packetTimestamp : partialPacketTimestamps.entrySet()) { + if (packetTimestamp.getValue() + freshnessPeriod < currentTime) { + packetsToRemove.add(packetTimestamp.getKey()); + } + } + + for (String packetToRemove : packetsToRemove) { + partialPacketTimestamps.remove(packetToRemove); + partialPackets.remove(packetToRemove); + } + } + public void run() { System.out.println("UDP server running"); this.running.getAndSet(true); + while(this.running.get()) { try { this.acceptMessage(); @@ -71,6 +89,8 @@ public class UDUPServer implements Runnable { msg.getContent().setSenderAddress(packet.getAddress()); sendMessageFurther(msg); } + + purgeData(); } private DatagramPacket receivePacket() throws IOException { @@ -120,6 +140,7 @@ public class UDUPServer implements Runnable { previousPacketData.add(newPacketNo - 1, newPacketData); this.partialPackets.put(transmissionID, previousPacketData); + this.partialPacketTimestamps.put(transmissionID, System.currentTimeMillis()); if (previousPacketData.contains(null)) { throw new NoSuchElementException("Packet is not full"); @@ -135,6 +156,7 @@ public class UDUPServer implements Runnable { public UDUPMessage addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { UDUPMessage msg = null; + this.partialPacketTimestamps.put(transmissionID, System.currentTimeMillis()); if (this.partialPackets.containsKey(transmissionID)) { try { byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java b/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java index 5f34fe9..f147b2a 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java @@ -133,9 +133,9 @@ public class ClientController { return "contactsForm"; } - private Set<ValueContact> parseContactsString(DataStringInput contactsInput) throws Exception { + public static Set<ValueContact> parseContactsString(String contactsInput) throws Exception { Gson gson = new Gson(); - Map<String, ArrayList> contactStrings = gson.fromJson(contactsInput.getString(), Map.class); + Map<String, ArrayList> contactStrings = gson.fromJson(contactsInput, Map.class); Set<ValueContact> contactObjects = new HashSet<ValueContact>(); ArrayList<Double> cAddr; byte[] inetArray = new byte[4]; @@ -160,7 +160,7 @@ public class ClientController { Set<ValueContact> contactObjects; try { - contactObjects = parseContactsString(contactsObject); + contactObjects = parseContactsString(contactsObject.getString()); this.agentApi.setFallbackContacts(contactObjects); } catch (Exception e) { success = false; @@ -263,7 +263,7 @@ public class ClientController { case "Contact": DataStringInput contactsString = new DataStringInput(); contactsString.setString(attributeObject.getValueString()); - attributeValue = parseContactsString(contactsString).iterator().next(); + attributeValue = parseContactsString(contactsString.getString()).iterator().next(); break; case "List": List parsedListValue = gson.fromJson(attributeObject.getValueString(), List.class); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java index 12d795a..2b8d033 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java @@ -7,11 +7,10 @@ import java.rmi.RemoteException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; import java.io.*; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.util.*; import com.google.gson.Gson; +import pl.edu.mimuw.cloudatlas.client.ClientController; import pl.edu.mimuw.cloudatlas.model.*; public class Fetcher { @@ -98,13 +97,17 @@ public class Fetcher { String jsonAttribs; System.out.println(System.getProperty("user.dir")); - + String fallbackContactsString = System.getProperty("fallback_contacts"); + System.out.println(fallbackContactsString); try { initializeApiStub(); initializePythonProcess(); bufferRead = new BufferedReader( new InputStreamReader(pythonProcess.getInputStream())); + Set<String> fallbackContacts = new HashSet<String>(); + api.setFallbackContacts(ClientController.parseContactsString(fallbackContactsString)); + while((jsonAttribs = bufferRead.readLine()) != null) { System.out.println(jsonAttribs); System.out.flush(); |