From cb173ac60b8307247efd83aaed5a0bb44a107766 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 23 Dec 2019 19:06:41 +0100 Subject: Add sending messages from modules --- .../pl/edu/mimuw/cloudatlas/agent/Executor.java | 12 ++++++++++++ .../java/pl/edu/mimuw/cloudatlas/agent/Module.java | 22 +++++++++++++++++++++- .../edu/mimuw/cloudatlas/agent/EventBusTest.java | 4 ++++ 3 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java (limited to 'src') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java index 0651a9b..f3cf2ea 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java @@ -11,16 +11,20 @@ import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; public class Executor implements Runnable { private Module module; private LinkedBlockingQueue events; + private EventBus eventBus; public Executor(Module module) { this.module = module; + this.module.setExecutor(this); this.events = new LinkedBlockingQueue(); } public void run() { + System.out.println("Executor " + this.module.toString() + " running"); while (!Thread.currentThread().interrupted()) { try { AgentMessage event = events.take(); + System.out.println("Executor " + this.module.toString() + " passed message to handle"); module.handle(event); } catch (InterruptedException e) { System.out.println("Executor interrupted. Exiting loop."); @@ -32,4 +36,12 @@ public class Executor implements Runnable { public void addMessage(AgentMessage event) throws InterruptedException { events.put(event); } + + public void passMessage(AgentMessage event) throws InterruptedException { + eventBus.addMessage(event); + } + + public void setEventBus(EventBus eventBus) { + this.eventBus = eventBus; + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Module.java index 46df31b..167f3b7 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Module.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Module.java @@ -6,5 +6,25 @@ import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; * A Module is a (potentially stateful) event handler. */ public abstract class Module { - public abstract void handle(AgentMessage event); + private AgentMessage.AgentModule moduleType; + private Executor executor; + + Module(AgentMessage.AgentModule moduleType) { + this.moduleType = moduleType; + } + + public abstract void handle(AgentMessage event) throws InterruptedException ; + + public void setExecutor(Executor executor) { + this.executor = executor; + } + + public void sendMessage(AgentMessage event) throws InterruptedException { + this.executor.passMessage(event); + } + + @Override + public String toString() { + return moduleType.toString(); + } } diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java new file mode 100644 index 0000000..7320e69 --- /dev/null +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java @@ -0,0 +1,4 @@ +package pl.edu.mimuw.cloudatlas.agent; + +public class EventBusTest { +} -- cgit v1.2.3 From 3b19a4abfdad42beb3e8c34fe902bd5c0c2aaf4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 23 Dec 2019 19:07:09 +0100 Subject: Implement EventBus --- .../pl/edu/mimuw/cloudatlas/agent/EventBus.java | 48 +++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java index 47cb1ff..afb4e27 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java @@ -1,7 +1,53 @@ package pl.edu.mimuw.cloudatlas.agent; +import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + /* * The EventBus routes messages sent between Modules. */ -public class EventBus { +public class EventBus implements Runnable { + private LinkedBlockingQueue events; + private HashMap executors; + + void setEventBusReference() { + Iterator it = this.executors.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry executorEntry = + (Map.Entry) it.next(); + executorEntry.getValue().setEventBus(this); + } + } + + EventBus(HashMap executors) { + this.executors = executors; + setEventBusReference(); + this.events = new LinkedBlockingQueue(); + } + + public void run() { + System.out.println("Event bus running"); + while (!Thread.currentThread().interrupted()) { + try { + AgentMessage event = events.take(); + routeMessage(event); + } catch (InterruptedException e) { + System.out.println("Event bus interrupted. Exiting loop."); + break; + } + } + } + + public void routeMessage(AgentMessage msg) throws InterruptedException { + System.out.println("Event bus routing message"); + executors.get(msg.getDestinationModule()).addMessage(msg); + } + + public void addMessage(AgentMessage msg) throws InterruptedException { + this.events.put(msg); + } } -- cgit v1.2.3 From 6af48bd213350954914e899b2bf23ebd0da5ed22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 23 Dec 2019 19:07:46 +0100 Subject: Implement executor and event bus dispatch on threads and queues --- .../java/pl/edu/mimuw/cloudatlas/agent/Agent.java | 77 ++++++++++++++++++++-- 1 file changed, 73 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 8eb8b4f..0cfdfd6 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -4,24 +4,93 @@ import java.rmi.RemoteException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; import java.rmi.server.UnicastRemoteObject; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; import pl.edu.mimuw.cloudatlas.api.Api; import pl.edu.mimuw.cloudatlas.interpreter.Main; import pl.edu.mimuw.cloudatlas.model.ZMI; public class Agent { - public static void main(String[] args) { + + public static void runRegistry() { try { ZMI root = Main.createTestHierarchy2(); ApiImplementation api = new ApiImplementation(root); Api apiStub = - (Api) UnicastRemoteObject.exportObject(api, 0); + (Api) UnicastRemoteObject.exportObject(api, 0); Registry registry = LocateRegistry.getRegistry(); registry.rebind("Api", apiStub); - System.out.println("Api bound"); + System.out.println("Agent: api bound"); } catch (Exception e) { - System.err.println("Agent exception:"); + System.err.println("Agent registry initialization exception:"); e.printStackTrace(); } } + + public static HashMap initializeModules() { + HashMap modules = new HashMap(); + // TODO add modules as we implement them + return modules; + } + + public static HashMap initializeExecutors( + HashMap modules) { + HashMap executors = new HashMap(); + Iterator it = modules.entrySet().iterator(); + + while (it.hasNext()) { + Map.Entry moduleEntry = + (Map.Entry) it.next(); + Module module = moduleEntry.getValue(); + Executor executor = new Executor(module); + executors.put(moduleEntry.getKey(), executor); + } + + return executors; + } + + public static ArrayList initializeExecutorThreads(HashMap executors) { + ArrayList executorThreads = new ArrayList(); + Iterator it = executors.entrySet().iterator(); + + while (it.hasNext()) { + Map.Entry executorEntry = + (Map.Entry) it.next(); + Thread thread = new Thread(executorEntry.getValue()); + thread.setDaemon(true); + System.out.println("Initializing executor " + executorEntry.getKey()); + thread.start(); + executorThreads.add(thread); + } + + return executorThreads; + } + + public static void closeExecutors(ArrayList executorThreads) { + for (Thread executorThread : executorThreads) { + executorThread.interrupt(); + } + } + + public static void runModulesAsThreads() { + HashMap modules = initializeModules(); + HashMap executors = initializeExecutors(modules); + ArrayList executorThreads = initializeExecutorThreads(executors); + + Thread eventBusThread = new Thread(new EventBus(executors)); + System.out.println("Initializing event bus"); + eventBusThread.start(); + + System.out.println("Closing executors"); + closeExecutors(executorThreads); + } + + public static void main(String[] args) { + runRegistry(); + runModulesAsThreads(); + } } -- cgit v1.2.3 From b88b2866f5731491f1e7957a678559a1922c983a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 23 Dec 2019 19:08:08 +0100 Subject: Add simple message constructor --- .../java/pl/edu/mimuw/cloudatlas/agent/message/AgentMessage.java | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'src') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/message/AgentMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/message/AgentMessage.java index ab24f48..6e8f51c 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/message/AgentMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/message/AgentMessage.java @@ -1,5 +1,7 @@ package pl.edu.mimuw.cloudatlas.agent.message; +import pl.edu.mimuw.cloudatlas.agent.Agent; + public class AgentMessage { public enum AgentModule { @@ -23,6 +25,12 @@ public class AgentMessage { this.timestamp = timestamp; } + public AgentMessage(String messageId, AgentModule destinationModule) { + this.messageId = messageId; + this.destinationModule = destinationModule; + this.timestamp = System.currentTimeMillis() / 1000L; + } + public String getMessageId() { return messageId; } -- cgit v1.2.3 From 4456b71ab46b375eb21861051423222e6e5bb01e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 23 Dec 2019 19:08:44 +0100 Subject: Adjust executor tests to module changes --- src/test/java/pl/edu/mimuw/cloudatlas/agent/ExecutorTest.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/ExecutorTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/ExecutorTest.java index 445a5b4..2a6ef49 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/ExecutorTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/ExecutorTest.java @@ -9,6 +9,11 @@ import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage.AgentModule; public class ExecutorTest { public class MessageCounterModule extends Module { public int counter = 0; + + MessageCounterModule(AgentModule moduleType) { + super(moduleType); + } + public void handle(AgentMessage m) { counter++; } @@ -16,7 +21,7 @@ public class ExecutorTest { @Test public void testDoesntExecuteWhenNoMessages() throws Exception { - MessageCounterModule module = new MessageCounterModule(); + MessageCounterModule module = new MessageCounterModule(AgentModule.UDP); Executor executor = new Executor(module); Thread thread = new Thread(executor); thread.start(); @@ -27,7 +32,7 @@ public class ExecutorTest { @Test public void testExecutesHandlerOnce() throws Exception { - MessageCounterModule module = new MessageCounterModule(); + MessageCounterModule module = new MessageCounterModule(AgentModule.UDP); Executor executor = new Executor(module); executor.addMessage(new AgentMessage("", AgentModule.UDP, 0) {}); Thread thread = new Thread(executor); @@ -39,7 +44,7 @@ public class ExecutorTest { @Test public void testExecutesHandlerMultipleTimes() throws Exception { - MessageCounterModule module = new MessageCounterModule(); + MessageCounterModule module = new MessageCounterModule(AgentModule.UDP); Executor executor = new Executor(module); executor.addMessage(new AgentMessage("", AgentModule.UDP, 0) {}); executor.addMessage(new AgentMessage("", AgentModule.UDP, 0) {}); -- cgit v1.2.3 From 87577a88139354a9640a2afc1058dad4c2ef9d27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 23 Dec 2019 19:09:03 +0100 Subject: Add event bus tests --- .../edu/mimuw/cloudatlas/agent/EventBusTest.java | 74 ++++++++++++++++++++++ 1 file changed, 74 insertions(+) (limited to 'src') diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java index 7320e69..d9a30ed 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java @@ -1,4 +1,78 @@ package pl.edu.mimuw.cloudatlas.agent; +import org.junit.Test; +import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; + +import java.util.ArrayList; +import java.util.HashMap; + public class EventBusTest { + + public static HashMap initializeTwoModules() { + HashMap modules = new HashMap(); + modules.put(AgentMessage.AgentModule.RMI, new Module(AgentMessage.AgentModule.RMI) { + @Override + public void handle(AgentMessage event) throws InterruptedException { + System.out.println("Module 1 handle called"); + sendMessage(new AgentMessage("1", AgentMessage.AgentModule.UDP)); + } + }); + + modules.put(AgentMessage.AgentModule.UDP, new Module(AgentMessage.AgentModule.UDP) { + @Override + public void handle(AgentMessage event) { + System.out.println("Module 2 handle called"); + } + }); + + return modules; + } + + public static HashMap initializeModule() { + HashMap modules = new HashMap(); + + modules.put(AgentMessage.AgentModule.RMI, new Module(AgentMessage.AgentModule.RMI) { + @Override + public void handle(AgentMessage event) { + System.out.println("Module 1 handle called"); + } + }); + + return modules; + } + + @Test + public void messageModule() throws InterruptedException { + HashMap modules = initializeModule(); + HashMap executors = Agent.initializeExecutors(modules); + ArrayList executorThreads = Agent.initializeExecutorThreads(executors); + EventBus eventBus = new EventBus(executors); + Thread eventBusThread = new Thread(eventBus); + + eventBusThread.start(); + eventBus.addMessage(new AgentMessage("0", AgentMessage.AgentModule.RMI)); + Thread.sleep(1000); + eventBusThread.interrupt(); + Agent.closeExecutors(executorThreads); + } + + @Test + public void messagingBetweenModules() throws InterruptedException { + HashMap modules = initializeTwoModules(); + HashMap executors = Agent.initializeExecutors(modules); + ArrayList executorThreads = Agent.initializeExecutorThreads(executors); + EventBus eventBus = new EventBus(executors); + Thread eventBusThread = new Thread(eventBus); + eventBusThread.start(); + + eventBus.addMessage(new AgentMessage( + "0", + AgentMessage.AgentModule.RMI, + System.currentTimeMillis() / 1000L)); + + Thread.sleep(10000); + + eventBusThread.interrupt(); + Agent.closeExecutors(executorThreads); + } } -- cgit v1.2.3 From 7a104ca7eeb0d312bc76d2393495387079032aed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 23 Dec 2019 19:16:54 +0100 Subject: Improve event bus tests --- .../edu/mimuw/cloudatlas/agent/EventBusTest.java | 27 +++++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java index d9a30ed..fd7c244 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java @@ -6,35 +6,47 @@ import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; import java.util.ArrayList; import java.util.HashMap; +import static org.junit.Assert.assertEquals; + public class EventBusTest { + public abstract class MessageCounterModule extends Module { + public int counter = 0; + + MessageCounterModule(AgentMessage.AgentModule moduleType) { + super(moduleType); + } + } - public static HashMap initializeTwoModules() { + public HashMap initializeTwoModules() { HashMap modules = new HashMap(); - modules.put(AgentMessage.AgentModule.RMI, new Module(AgentMessage.AgentModule.RMI) { + modules.put(AgentMessage.AgentModule.RMI, new MessageCounterModule(AgentMessage.AgentModule.RMI) { @Override public void handle(AgentMessage event) throws InterruptedException { System.out.println("Module 1 handle called"); sendMessage(new AgentMessage("1", AgentMessage.AgentModule.UDP)); + counter ++; } }); - modules.put(AgentMessage.AgentModule.UDP, new Module(AgentMessage.AgentModule.UDP) { + modules.put(AgentMessage.AgentModule.UDP, new MessageCounterModule(AgentMessage.AgentModule.UDP) { @Override public void handle(AgentMessage event) { System.out.println("Module 2 handle called"); + counter++; } }); return modules; } - public static HashMap initializeModule() { + public HashMap initializeModule() { HashMap modules = new HashMap(); - modules.put(AgentMessage.AgentModule.RMI, new Module(AgentMessage.AgentModule.RMI) { + modules.put(AgentMessage.AgentModule.RMI, new MessageCounterModule(AgentMessage.AgentModule.RMI) { @Override public void handle(AgentMessage event) { System.out.println("Module 1 handle called"); + counter++; } }); @@ -54,6 +66,7 @@ public class EventBusTest { Thread.sleep(1000); eventBusThread.interrupt(); Agent.closeExecutors(executorThreads); + assertEquals(1, ((MessageCounterModule) modules.get(AgentMessage.AgentModule.RMI)).counter); } @Test @@ -70,9 +83,11 @@ public class EventBusTest { AgentMessage.AgentModule.RMI, System.currentTimeMillis() / 1000L)); - Thread.sleep(10000); + Thread.sleep(1000); eventBusThread.interrupt(); Agent.closeExecutors(executorThreads); + assertEquals(1, ((MessageCounterModule) modules.get(AgentMessage.AgentModule.RMI)).counter); + assertEquals(1, ((MessageCounterModule) modules.get(AgentMessage.AgentModule.UDP)).counter); } } -- cgit v1.2.3