m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagdalena GrodziƄska <mag.grodzinska@gmail.com>2020-01-15 04:46:19 +0100
committerGitHub <noreply@github.com>2020-01-15 04:46:19 +0100
commit97f644e3a983c6a24d26cb04b57b96a42afaa089 (patch)
tree6875ba1b10c804d728925c9cb282b404cd3c20f6
parent8678d8c922b439ff4a58e4b139c4085515a890f0 (diff)
parent0cb7ded87955286f950d2802fc6a0d1c9c26726a (diff)
Merge pull request #123 from m-chrzan/clean_udup
Add cleaning to UDUP
-rw-r--r--build.gradle9
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java30
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java8
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java11
-rw-r--r--src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java12
6 files changed, 53 insertions, 19 deletions
diff --git a/build.gradle b/build.gradle
index c6ebcae..51a49c5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -51,6 +51,14 @@ ext.querySignerHostname = {
}
/*
+Example: -DfallbackContacts=\{\"/uw/violet07\":[192,168,0,11]}
+escape at the beginning and before "
+ */
+ext.fallbackContacts = {
+ return System.getProperty("fallbackContacts") ?: "{\"" + zonePath() + "\":[127,0,0,1]}"
+}
+
+/*
Possible options:
RoundRobinExp
RoundRobinUniform
@@ -138,6 +146,7 @@ task runFetcher(type: JavaExec) {
main = 'pl.edu.mimuw.cloudatlas.fetcher.Fetcher'
args(hostname() , 1099)
systemProperty 'zone_path', zonePath()
+ systemProperty 'fallback_contacts', fallbackContacts()
}
task runInterpreter(type: JavaExec) {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
index 2363f4b..ad0f3bd 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
@@ -52,7 +52,7 @@ public class AgentConfig {
modules.put(ModuleType.QUERY, new Qurnik());
modules.put(ModuleType.GOSSIP, new GossipGirl());
- UDUPServer server = new UDUPServer(serverAddr, port, bufsize);
+ UDUPServer server = new UDUPServer(serverAddr, port, bufsize, freshnessPeriod);
modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server));
return modules;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index dae0420..677bf67 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -9,9 +9,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.NoSuchElementException;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class UDUPServer implements Runnable {
@@ -20,25 +18,45 @@ public class UDUPServer implements Runnable {
private DatagramSocket socket;
private InetAddress address;
private HashMap<String, ArrayList<byte[]>> partialPackets;
+ private HashMap<String, Long> partialPacketTimestamps;
private int bufSize;
private final AtomicBoolean running;
+ private long freshnessPeriod;
- public UDUPServer(InetAddress addr, int port, int bufSize) throws SocketException {
+ public UDUPServer(InetAddress addr, int port, int bufSize, long freshnessPeriod) throws SocketException {
this.socket = new DatagramSocket(port);
this.address = addr;
this.bufSize = bufSize;
this.partialPackets = new HashMap<>();
this.serializer = new ByteSerializer();
this.running = new AtomicBoolean(false);
+ this.freshnessPeriod = freshnessPeriod;
+ this.partialPacketTimestamps = new HashMap<>();
}
public void setUDUP(UDUP udup) {
this.udp = udup;
}
+ private void purgeData() {
+ long currentTime = System.currentTimeMillis();
+ ArrayList<String> packetsToRemove = new ArrayList<>();
+ for (Map.Entry<String, Long> packetTimestamp : partialPacketTimestamps.entrySet()) {
+ if (packetTimestamp.getValue() + freshnessPeriod < currentTime) {
+ packetsToRemove.add(packetTimestamp.getKey());
+ }
+ }
+
+ for (String packetToRemove : packetsToRemove) {
+ partialPacketTimestamps.remove(packetToRemove);
+ partialPackets.remove(packetToRemove);
+ }
+ }
+
public void run() {
System.out.println("UDP server running");
this.running.getAndSet(true);
+
while(this.running.get()) {
try {
this.acceptMessage();
@@ -71,6 +89,8 @@ public class UDUPServer implements Runnable {
msg.getContent().setSenderAddress(packet.getAddress());
sendMessageFurther(msg);
}
+
+ purgeData();
}
private DatagramPacket receivePacket() throws IOException {
@@ -120,6 +140,7 @@ public class UDUPServer implements Runnable {
previousPacketData.add(newPacketNo - 1, newPacketData);
this.partialPackets.put(transmissionID, previousPacketData);
+ this.partialPacketTimestamps.put(transmissionID, System.currentTimeMillis());
if (previousPacketData.contains(null)) {
throw new NoSuchElementException("Packet is not full");
@@ -135,6 +156,7 @@ public class UDUPServer implements Runnable {
public UDUPMessage addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) {
UDUPMessage msg = null;
+ this.partialPacketTimestamps.put(transmissionID, System.currentTimeMillis());
if (this.partialPackets.containsKey(transmissionID)) {
try {
byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData);
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java b/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java
index 5f34fe9..f147b2a 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java
@@ -133,9 +133,9 @@ public class ClientController {
return "contactsForm";
}
- private Set<ValueContact> parseContactsString(DataStringInput contactsInput) throws Exception {
+ public static Set<ValueContact> parseContactsString(String contactsInput) throws Exception {
Gson gson = new Gson();
- Map<String, ArrayList> contactStrings = gson.fromJson(contactsInput.getString(), Map.class);
+ Map<String, ArrayList> contactStrings = gson.fromJson(contactsInput, Map.class);
Set<ValueContact> contactObjects = new HashSet<ValueContact>();
ArrayList<Double> cAddr;
byte[] inetArray = new byte[4];
@@ -160,7 +160,7 @@ public class ClientController {
Set<ValueContact> contactObjects;
try {
- contactObjects = parseContactsString(contactsObject);
+ contactObjects = parseContactsString(contactsObject.getString());
this.agentApi.setFallbackContacts(contactObjects);
} catch (Exception e) {
success = false;
@@ -263,7 +263,7 @@ public class ClientController {
case "Contact":
DataStringInput contactsString = new DataStringInput();
contactsString.setString(attributeObject.getValueString());
- attributeValue = parseContactsString(contactsString).iterator().next();
+ attributeValue = parseContactsString(contactsString.getString()).iterator().next();
break;
case "List":
List parsedListValue = gson.fromJson(attributeObject.getValueString(), List.class);
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java
index 12d795a..2b8d033 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java
@@ -7,11 +7,10 @@ import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.io.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.*;
import com.google.gson.Gson;
+import pl.edu.mimuw.cloudatlas.client.ClientController;
import pl.edu.mimuw.cloudatlas.model.*;
public class Fetcher {
@@ -98,13 +97,17 @@ public class Fetcher {
String jsonAttribs;
System.out.println(System.getProperty("user.dir"));
-
+ String fallbackContactsString = System.getProperty("fallback_contacts");
+ System.out.println(fallbackContactsString);
try {
initializeApiStub();
initializePythonProcess();
bufferRead = new BufferedReader( new InputStreamReader(pythonProcess.getInputStream()));
+ Set<String> fallbackContacts = new HashSet<String>();
+ api.setFallbackContacts(ClientController.parseContactsString(fallbackContactsString));
+
while((jsonAttribs = bufferRead.readLine()) != null) {
System.out.println(jsonAttribs);
System.out.flush();
diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java
index 4f32d89..eae7af9 100644
--- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java
+++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java
@@ -34,7 +34,7 @@ public class UDUPTest {
try {
System.out.println("Starting udp1");
- server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5996, 1000);
+ server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5996, 1000, 60 * 1000);
udp1 = new UDUP(
5997,
timeout,
@@ -43,7 +43,7 @@ public class UDUPTest {
System.out.println("Starting udp2");
- server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5996, 1000);
+ server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5996, 1000, 60 * 1000);
udp2 = new UDUP(
5997,
timeout,
@@ -106,7 +106,7 @@ public class UDUPTest {
try {
System.out.println("Starting udp1");
- server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5991, 1000);
+ server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5991, 1000, 60 * 1000);
udp1 = new UDUP(
5997,
timeout,
@@ -115,7 +115,7 @@ public class UDUPTest {
System.out.println("Starting udp2");
- server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5991, 1000);
+ server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5991, 1000, 60 * 1000);
udp2 = new UDUP(
5997,
timeout,
@@ -181,7 +181,7 @@ public class UDUPTest {
try {
System.out.println("Starting udp1");
- server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5997, 1000);
+ server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5997, 1000, 60 * 1000);
udp1 = new UDUP(
5997,
timeout,
@@ -190,7 +190,7 @@ public class UDUPTest {
System.out.println("Starting udp2");
- server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5997, 1000);
+ server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5997, 1000, 60 * 1000);
udp2 = new UDUP(
5997,
timeout,