m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMagdalena GrodziƄska <mag.grodzinska@gmail.com>2020-01-10 18:42:22 +0100
committerGitHub <noreply@github.com>2020-01-10 18:42:22 +0100
commit1b28c8a208c510183479e090f7b8c32f9dadd7c2 (patch)
treef7fb5f8a0ef797307e507107263c8394d9248bc7 /src
parent3e3677a34ab63d05cbc7a3c45dca98a47fbac77f (diff)
parentad872a25f94f6297a659cf945c4e1547ed8f28d7 (diff)
Merge pull request #89 from m-chrzan/refactor_udup
Refactor udup
Diffstat (limited to 'src')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java30
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java31
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java26
-rw-r--r--src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java69
4 files changed, 94 insertions, 62 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
index 256fa6b..62cd544 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
@@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent;
import java.net.Inet4Address;
import java.net.InetAddress;
+import java.net.SocketException;
import java.net.UnknownHostException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
@@ -39,18 +40,19 @@ public class Agent {
}
}
- public static HashMap<ModuleType, Module> initializeModules() {
+ public static HashMap<ModuleType, Module> initializeModules() throws UnknownHostException, SocketException, NullPointerException {
HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>();
modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER));
modules.put(ModuleType.RMI, new Remik());
- Long freshnessPeriod = new Long(System.getProperty("freshness_period"));
+ Long freshnessPeriod = Long.getLong("freshness_period");
modules.put(ModuleType.STATE, new Stanik(freshnessPeriod));
modules.put(ModuleType.QUERY, new Qurnik());
- try {
- modules.put(ModuleType.UDP, new UDUP(InetAddress.getByName("127.0.0.1"), 5988, 5000, 20000));
- } catch (UnknownHostException e) {
- e.printStackTrace();
- }
+
+ Integer port = Integer.getInteger("UDUPServer.port");
+ Integer timeout = Integer.getInteger("UDUPServer.timeout");
+ Integer bufsize = Integer.getInteger("UDUPServer.bufsize");
+ UDUPServer server = new UDUPServer(InetAddress.getByName("127.0.0.1"), port, bufsize);
+ modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server));
// TODO add modules as we implement them
return modules;
}
@@ -89,14 +91,24 @@ public class Agent {
}
public static void runModulesAsThreads() {
- HashMap<ModuleType, Module> modules = initializeModules();
+ HashMap<ModuleType, Module> modules = null;
+
+ try {
+ modules = initializeModules();
+ } catch (UnknownHostException | SocketException e) {
+ System.out.println("Module initialization failed");
+ e.printStackTrace();
+ return;
+ }
+
HashMap<ModuleType, Executor> executors = initializeExecutors(modules);
ArrayList<Thread> executorThreads = initializeExecutorThreads(executors);
-
eventBus = new EventBus(executors);
+ Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer());
Thread eventBusThread = new Thread(eventBus);
System.out.println("Initializing event bus");
eventBusThread.start();
+ UDUPServerThread.start();
}
private static void initZones() {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
index 8c6db8f..e2243e1 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
@@ -23,30 +23,24 @@ import java.util.concurrent.atomic.AtomicBoolean;
* due to ValueContact design
*/
-// TODO set server port in global config - must be the same everywhere
-// TODO same with buffer size
-
-// TODO separate server like newapiimpl
// TODO add timestamps as close to sending as possible
// TODO wysylac tylko remotegossipgirl message
// TODO update timestampow odpowiedni w tym remotegossipgirlmessage
-public class UDUP extends Module implements Runnable {
+public class UDUP extends Module {
private UDUPClient client;
private UDUPServer server;
- private final AtomicBoolean running;
- public UDUP(InetAddress serverAddr,
- int serverPort,
+ public UDUP(int serverPort,
int timeout,
- int bufferSize) {
+ int bufferSize,
+ UDUPServer server) {
super(ModuleType.UDP);
- this.running = new AtomicBoolean(false);
try {
this.client = new UDUPClient(this, serverPort, bufferSize);
- this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize);
- this.running.getAndSet(true);
+ this.server = server;
+ this.server.setUDUP(this);
} catch (SocketException e) {
e.printStackTrace();
this.client.close();
@@ -54,17 +48,8 @@ public class UDUP extends Module implements Runnable {
}
}
- public void run() {
- System.out.println("UDP server running");
- while(this.running.get()) {
- try {
- this.server.acceptMessage();
- } catch (IOException | InterruptedException e) {
- e.printStackTrace();
- this.running.getAndSet(false);
- this.server.close();
- }
- }
+ public UDUPServer getServer() {
+ return this.server;
}
public void handleTyped(UDUPMessage event) throws InterruptedException {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index b71a475..d6180be 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -10,22 +10,42 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
-public class UDUPServer {
+public class UDUPServer implements Runnable {
private UDUP udp;
private UDUPSerializer serializer;
private DatagramSocket socket;
private InetAddress address;
private HashMap<String, ArrayList<byte[]>> partialPackets;
private int bufSize;
+ private final AtomicBoolean running;
- public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException {
- this.udp = udp;
+ public UDUPServer(InetAddress addr, int port, int bufSize) throws SocketException {
this.socket = new DatagramSocket(port, addr);
this.address = addr;
this.bufSize = bufSize;
this.partialPackets = new HashMap<>();
this.serializer = new UDUPSerializer();
+ this.running = new AtomicBoolean(false);
+ }
+
+ public void setUDUP(UDUP udup) {
+ this.udp = udup;
+ }
+
+ public void run() {
+ System.out.println("UDP server running");
+ this.running.getAndSet(true);
+ while(this.running.get()) {
+ try {
+ this.acceptMessage();
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ this.running.getAndSet(false);
+ this.close();
+ }
+ }
}
public void acceptMessage() throws IOException, InterruptedException {
diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java
index 13a322b..93ed8be 100644
--- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java
+++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java
@@ -6,12 +6,14 @@ import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage;
import pl.edu.mimuw.cloudatlas.agent.modules.Module;
import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
import pl.edu.mimuw.cloudatlas.agent.modules.UDUP;
+import pl.edu.mimuw.cloudatlas.agent.modules.UDUPServer;
import pl.edu.mimuw.cloudatlas.model.AttributesMap;
import pl.edu.mimuw.cloudatlas.model.PathName;
import pl.edu.mimuw.cloudatlas.model.ValueContact;
import pl.edu.mimuw.cloudatlas.model.ValueInt;
import java.net.InetAddress;
+import java.net.SocketException;
import java.net.UnknownHostException;
// TODO add serialization tests that target custom serializers (type collections!)
@@ -22,25 +24,30 @@ public class UDUPTest {
public void messageBetweenUDUPs() {
UDUP udp1 = null;
UDUP udp2 = null;
+ UDUPServer server1 = null;
+ UDUPServer server2 = null;
UDUPMessage msg1 = null;
boolean testSuccess = true;
+ int timeout = 5000;
try {
System.out.println("Starting udp1");
+ server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5996, 1000);
udp1 = new UDUP(
- InetAddress.getByName("127.0.0.2"),
- 5999,
- 5000,
- 1000);
+ 5997,
+ timeout,
+ 1000,
+ server1);
System.out.println("Starting udp2");
+ server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5996, 1000);
udp2 = new UDUP(
- InetAddress.getByName("127.0.0.3"),
- 5999,
- 5000,
- 1000);
+ 5997,
+ timeout,
+ 1000,
+ server2);
UDUPMessage testContent = new UDUPMessage();
testContent.setDestinationModule(ModuleType.TEST);
@@ -52,13 +59,13 @@ public class UDUPTest {
testContent
);
- } catch (UnknownHostException e) {
+ } catch (UnknownHostException | SocketException e) {
e.printStackTrace();
}
- Thread udpThread1 = new Thread(udp1);
+ Thread udpThread1 = new Thread(server1);
udpThread1.start();
- Thread udpThread2 = new Thread(udp2);
+ Thread udpThread2 = new Thread(server2);
udpThread2.start();
try {
@@ -89,6 +96,8 @@ public class UDUPTest {
public void bigMessageBetweenUDUPs() {
UDUP udp1 = null;
UDUP udp2 = null;
+ UDUPServer server1 = null;
+ UDUPServer server2 = null;
UDUPMessage msg1 = null;
boolean testSuccess = true;
int timeout = 5000;
@@ -96,19 +105,21 @@ public class UDUPTest {
try {
System.out.println("Starting udp1");
+ server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5991, 1000);
udp1 = new UDUP(
- InetAddress.getByName("127.0.0.2"),
- 5998,
+ 5997,
timeout,
- 30);
+ 30,
+ server1);
System.out.println("Starting udp2");
+ server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5991, 1000);
udp2 = new UDUP(
- InetAddress.getByName("127.0.0.3"),
- 5998,
+ 5997,
timeout,
- 30);
+ 30,
+ server2);
UDUPMessage testContent = new UDUPMessage();
testContent.setDestinationModule(ModuleType.TEST);
@@ -120,13 +131,13 @@ public class UDUPTest {
testContent
);
- } catch (UnknownHostException e) {
+ } catch (UnknownHostException | SocketException e) {
e.printStackTrace();
}
- Thread udpThread1 = new Thread(udp1);
+ Thread udpThread1 = new Thread(server1);
udpThread1.start();
- Thread udpThread2 = new Thread(udp2);
+ Thread udpThread2 = new Thread(server2);
udpThread2.start();
try {
@@ -152,6 +163,8 @@ public class UDUPTest {
public void sendMultipleMessages() {
UDUP udp1 = null;
UDUP udp2 = null;
+ UDUPServer server1 = null;
+ UDUPServer server2 = null;
UDUPMessage msg1 = null;
UDUPMessage msg2 = null;
UDUPMessage msg3 = null;
@@ -161,19 +174,21 @@ public class UDUPTest {
try {
System.out.println("Starting udp1");
+ server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5997, 1000);
udp1 = new UDUP(
- InetAddress.getByName("127.0.0.2"),
5997,
timeout,
- 1000);
+ 1000,
+ server1);
System.out.println("Starting udp2");
+ server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5997, 1000);
udp2 = new UDUP(
- InetAddress.getByName("127.0.0.3"),
5997,
timeout,
- 1000);
+ 1000,
+ server2);
UDUPMessage testContent = new UDUPMessage();
testContent.setDestinationModule(ModuleType.TEST);
@@ -197,13 +212,13 @@ public class UDUPTest {
testContent
);
- } catch (UnknownHostException e) {
+ } catch (UnknownHostException | SocketException e) {
e.printStackTrace();
}
- Thread udpThread1 = new Thread(udp1);
+ Thread udpThread1 = new Thread(server1);
udpThread1.start();
- Thread udpThread2 = new Thread(udp2);
+ Thread udpThread2 = new Thread(server2);
udpThread2.start();
try {