diff options
author | Magdalena GrodziĆska <mag.grodzinska@gmail.com> | 2020-01-10 18:42:22 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-10 18:42:22 +0100 |
commit | 1b28c8a208c510183479e090f7b8c32f9dadd7c2 (patch) | |
tree | f7fb5f8a0ef797307e507107263c8394d9248bc7 | |
parent | 3e3677a34ab63d05cbc7a3c45dca98a47fbac77f (diff) | |
parent | ad872a25f94f6297a659cf945c4e1547ed8f28d7 (diff) |
Merge pull request #89 from m-chrzan/refactor_udup
Refactor udup
-rw-r--r-- | build.gradle | 20 | ||||
-rw-r--r-- | src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java | 30 | ||||
-rw-r--r-- | src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java | 31 | ||||
-rw-r--r-- | src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java | 26 | ||||
-rw-r--r-- | src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java | 69 |
5 files changed, 114 insertions, 62 deletions
diff --git a/build.gradle b/build.gradle index 01e8685..de9a9df 100644 --- a/build.gradle +++ b/build.gradle @@ -22,6 +22,22 @@ ext.freshnessPeriod = { return System.getProperty("freshnessPeriod") ?: 60 * 1000 } +ext.UDUPHostname = { + return System.getProperty("hostname") ?: "localhost" +} + +ext.port = { + return System.getProperty("port") ?: 5999; +} + +ext.timeout = { + return System.getProperty("timeout") ?: 5000; +} + +ext.bufsize = { + return System.getProperty("bufsize") ?: 512; +} + repositories { // Use jcenter for resolving dependencies. // You can declare any Maven/Ivy/file repository here. @@ -61,6 +77,10 @@ task runAgent(type: JavaExec) { main = 'pl.edu.mimuw.cloudatlas.agent.Agent' systemProperty 'java.rmi.server.hostname', hostname() systemProperty 'freshness_period', freshnessPeriod() + systemProperty 'UDUPServer.hostname', UDUPHostname() + systemProperty 'UDUPServer.port', port() + systemProperty 'UDUPServer.timeout', port() + systemProperty 'UDUPServer.bufsize', port() } task runClient(type: JavaExec) { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 256fa6b..62cd544 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent; import java.net.Inet4Address; import java.net.InetAddress; +import java.net.SocketException; import java.net.UnknownHostException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; @@ -39,18 +40,19 @@ public class Agent { } } - public static HashMap<ModuleType, Module> initializeModules() { + public static HashMap<ModuleType, Module> initializeModules() throws UnknownHostException, SocketException, NullPointerException { HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>(); modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER)); modules.put(ModuleType.RMI, new Remik()); - Long freshnessPeriod = new Long(System.getProperty("freshness_period")); + Long freshnessPeriod = Long.getLong("freshness_period"); modules.put(ModuleType.STATE, new Stanik(freshnessPeriod)); modules.put(ModuleType.QUERY, new Qurnik()); - try { - modules.put(ModuleType.UDP, new UDUP(InetAddress.getByName("127.0.0.1"), 5988, 5000, 20000)); - } catch (UnknownHostException e) { - e.printStackTrace(); - } + + Integer port = Integer.getInteger("UDUPServer.port"); + Integer timeout = Integer.getInteger("UDUPServer.timeout"); + Integer bufsize = Integer.getInteger("UDUPServer.bufsize"); + UDUPServer server = new UDUPServer(InetAddress.getByName("127.0.0.1"), port, bufsize); + modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server)); // TODO add modules as we implement them return modules; } @@ -89,14 +91,24 @@ public class Agent { } public static void runModulesAsThreads() { - HashMap<ModuleType, Module> modules = initializeModules(); + HashMap<ModuleType, Module> modules = null; + + try { + modules = initializeModules(); + } catch (UnknownHostException | SocketException e) { + System.out.println("Module initialization failed"); + e.printStackTrace(); + return; + } + HashMap<ModuleType, Executor> executors = initializeExecutors(modules); ArrayList<Thread> executorThreads = initializeExecutorThreads(executors); - eventBus = new EventBus(executors); + Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer()); Thread eventBusThread = new Thread(eventBus); System.out.println("Initializing event bus"); eventBusThread.start(); + UDUPServerThread.start(); } private static void initZones() { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java index 8c6db8f..e2243e1 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java @@ -23,30 +23,24 @@ import java.util.concurrent.atomic.AtomicBoolean; * due to ValueContact design */ -// TODO set server port in global config - must be the same everywhere -// TODO same with buffer size - -// TODO separate server like newapiimpl // TODO add timestamps as close to sending as possible // TODO wysylac tylko remotegossipgirl message // TODO update timestampow odpowiedni w tym remotegossipgirlmessage -public class UDUP extends Module implements Runnable { +public class UDUP extends Module { private UDUPClient client; private UDUPServer server; - private final AtomicBoolean running; - public UDUP(InetAddress serverAddr, - int serverPort, + public UDUP(int serverPort, int timeout, - int bufferSize) { + int bufferSize, + UDUPServer server) { super(ModuleType.UDP); - this.running = new AtomicBoolean(false); try { this.client = new UDUPClient(this, serverPort, bufferSize); - this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize); - this.running.getAndSet(true); + this.server = server; + this.server.setUDUP(this); } catch (SocketException e) { e.printStackTrace(); this.client.close(); @@ -54,17 +48,8 @@ public class UDUP extends Module implements Runnable { } } - public void run() { - System.out.println("UDP server running"); - while(this.running.get()) { - try { - this.server.acceptMessage(); - } catch (IOException | InterruptedException e) { - e.printStackTrace(); - this.running.getAndSet(false); - this.server.close(); - } - } + public UDUPServer getServer() { + return this.server; } public void handleTyped(UDUPMessage event) throws InterruptedException { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index b71a475..d6180be 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -10,22 +10,42 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicBoolean; -public class UDUPServer { +public class UDUPServer implements Runnable { private UDUP udp; private UDUPSerializer serializer; private DatagramSocket socket; private InetAddress address; private HashMap<String, ArrayList<byte[]>> partialPackets; private int bufSize; + private final AtomicBoolean running; - public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException { - this.udp = udp; + public UDUPServer(InetAddress addr, int port, int bufSize) throws SocketException { this.socket = new DatagramSocket(port, addr); this.address = addr; this.bufSize = bufSize; this.partialPackets = new HashMap<>(); this.serializer = new UDUPSerializer(); + this.running = new AtomicBoolean(false); + } + + public void setUDUP(UDUP udup) { + this.udp = udup; + } + + public void run() { + System.out.println("UDP server running"); + this.running.getAndSet(true); + while(this.running.get()) { + try { + this.acceptMessage(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + this.running.getAndSet(false); + this.close(); + } + } } public void acceptMessage() throws IOException, InterruptedException { diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java index 13a322b..93ed8be 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java @@ -6,12 +6,14 @@ import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; import pl.edu.mimuw.cloudatlas.agent.modules.Module; import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; import pl.edu.mimuw.cloudatlas.agent.modules.UDUP; +import pl.edu.mimuw.cloudatlas.agent.modules.UDUPServer; import pl.edu.mimuw.cloudatlas.model.AttributesMap; import pl.edu.mimuw.cloudatlas.model.PathName; import pl.edu.mimuw.cloudatlas.model.ValueContact; import pl.edu.mimuw.cloudatlas.model.ValueInt; import java.net.InetAddress; +import java.net.SocketException; import java.net.UnknownHostException; // TODO add serialization tests that target custom serializers (type collections!) @@ -22,25 +24,30 @@ public class UDUPTest { public void messageBetweenUDUPs() { UDUP udp1 = null; UDUP udp2 = null; + UDUPServer server1 = null; + UDUPServer server2 = null; UDUPMessage msg1 = null; boolean testSuccess = true; + int timeout = 5000; try { System.out.println("Starting udp1"); + server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5996, 1000); udp1 = new UDUP( - InetAddress.getByName("127.0.0.2"), - 5999, - 5000, - 1000); + 5997, + timeout, + 1000, + server1); System.out.println("Starting udp2"); + server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5996, 1000); udp2 = new UDUP( - InetAddress.getByName("127.0.0.3"), - 5999, - 5000, - 1000); + 5997, + timeout, + 1000, + server2); UDUPMessage testContent = new UDUPMessage(); testContent.setDestinationModule(ModuleType.TEST); @@ -52,13 +59,13 @@ public class UDUPTest { testContent ); - } catch (UnknownHostException e) { + } catch (UnknownHostException | SocketException e) { e.printStackTrace(); } - Thread udpThread1 = new Thread(udp1); + Thread udpThread1 = new Thread(server1); udpThread1.start(); - Thread udpThread2 = new Thread(udp2); + Thread udpThread2 = new Thread(server2); udpThread2.start(); try { @@ -89,6 +96,8 @@ public class UDUPTest { public void bigMessageBetweenUDUPs() { UDUP udp1 = null; UDUP udp2 = null; + UDUPServer server1 = null; + UDUPServer server2 = null; UDUPMessage msg1 = null; boolean testSuccess = true; int timeout = 5000; @@ -96,19 +105,21 @@ public class UDUPTest { try { System.out.println("Starting udp1"); + server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5991, 1000); udp1 = new UDUP( - InetAddress.getByName("127.0.0.2"), - 5998, + 5997, timeout, - 30); + 30, + server1); System.out.println("Starting udp2"); + server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5991, 1000); udp2 = new UDUP( - InetAddress.getByName("127.0.0.3"), - 5998, + 5997, timeout, - 30); + 30, + server2); UDUPMessage testContent = new UDUPMessage(); testContent.setDestinationModule(ModuleType.TEST); @@ -120,13 +131,13 @@ public class UDUPTest { testContent ); - } catch (UnknownHostException e) { + } catch (UnknownHostException | SocketException e) { e.printStackTrace(); } - Thread udpThread1 = new Thread(udp1); + Thread udpThread1 = new Thread(server1); udpThread1.start(); - Thread udpThread2 = new Thread(udp2); + Thread udpThread2 = new Thread(server2); udpThread2.start(); try { @@ -152,6 +163,8 @@ public class UDUPTest { public void sendMultipleMessages() { UDUP udp1 = null; UDUP udp2 = null; + UDUPServer server1 = null; + UDUPServer server2 = null; UDUPMessage msg1 = null; UDUPMessage msg2 = null; UDUPMessage msg3 = null; @@ -161,19 +174,21 @@ public class UDUPTest { try { System.out.println("Starting udp1"); + server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5997, 1000); udp1 = new UDUP( - InetAddress.getByName("127.0.0.2"), 5997, timeout, - 1000); + 1000, + server1); System.out.println("Starting udp2"); + server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5997, 1000); udp2 = new UDUP( - InetAddress.getByName("127.0.0.3"), 5997, timeout, - 1000); + 1000, + server2); UDUPMessage testContent = new UDUPMessage(); testContent.setDestinationModule(ModuleType.TEST); @@ -197,13 +212,13 @@ public class UDUPTest { testContent ); - } catch (UnknownHostException e) { + } catch (UnknownHostException | SocketException e) { e.printStackTrace(); } - Thread udpThread1 = new Thread(udp1); + Thread udpThread1 = new Thread(server1); udpThread1.start(); - Thread udpThread2 = new Thread(udp2); + Thread udpThread2 = new Thread(server2); udpThread2.start(); try { |