From cb173ac60b8307247efd83aaed5a0bb44a107766 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 23 Dec 2019 19:06:41 +0100 Subject: Add sending messages from modules --- .../pl/edu/mimuw/cloudatlas/agent/Executor.java | 12 ++++++++++++ .../java/pl/edu/mimuw/cloudatlas/agent/Module.java | 22 +++++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) (limited to 'src/main/java/pl/edu/mimuw/cloudatlas') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java index 0651a9b..f3cf2ea 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java @@ -11,16 +11,20 @@ import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; public class Executor implements Runnable { private Module module; private LinkedBlockingQueue events; + private EventBus eventBus; public Executor(Module module) { this.module = module; + this.module.setExecutor(this); this.events = new LinkedBlockingQueue(); } public void run() { + System.out.println("Executor " + this.module.toString() + " running"); while (!Thread.currentThread().interrupted()) { try { AgentMessage event = events.take(); + System.out.println("Executor " + this.module.toString() + " passed message to handle"); module.handle(event); } catch (InterruptedException e) { System.out.println("Executor interrupted. Exiting loop."); @@ -32,4 +36,12 @@ public class Executor implements Runnable { public void addMessage(AgentMessage event) throws InterruptedException { events.put(event); } + + public void passMessage(AgentMessage event) throws InterruptedException { + eventBus.addMessage(event); + } + + public void setEventBus(EventBus eventBus) { + this.eventBus = eventBus; + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Module.java index 46df31b..167f3b7 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Module.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Module.java @@ -6,5 +6,25 @@ import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; * A Module is a (potentially stateful) event handler. */ public abstract class Module { - public abstract void handle(AgentMessage event); + private AgentMessage.AgentModule moduleType; + private Executor executor; + + Module(AgentMessage.AgentModule moduleType) { + this.moduleType = moduleType; + } + + public abstract void handle(AgentMessage event) throws InterruptedException ; + + public void setExecutor(Executor executor) { + this.executor = executor; + } + + public void sendMessage(AgentMessage event) throws InterruptedException { + this.executor.passMessage(event); + } + + @Override + public String toString() { + return moduleType.toString(); + } } -- cgit v1.2.3 From 3b19a4abfdad42beb3e8c34fe902bd5c0c2aaf4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 23 Dec 2019 19:07:09 +0100 Subject: Implement EventBus --- .../pl/edu/mimuw/cloudatlas/agent/EventBus.java | 48 +++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) (limited to 'src/main/java/pl/edu/mimuw/cloudatlas') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java index 47cb1ff..afb4e27 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java @@ -1,7 +1,53 @@ package pl.edu.mimuw.cloudatlas.agent; +import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + /* * The EventBus routes messages sent between Modules. */ -public class EventBus { +public class EventBus implements Runnable { + private LinkedBlockingQueue events; + private HashMap executors; + + void setEventBusReference() { + Iterator it = this.executors.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry executorEntry = + (Map.Entry) it.next(); + executorEntry.getValue().setEventBus(this); + } + } + + EventBus(HashMap executors) { + this.executors = executors; + setEventBusReference(); + this.events = new LinkedBlockingQueue(); + } + + public void run() { + System.out.println("Event bus running"); + while (!Thread.currentThread().interrupted()) { + try { + AgentMessage event = events.take(); + routeMessage(event); + } catch (InterruptedException e) { + System.out.println("Event bus interrupted. Exiting loop."); + break; + } + } + } + + public void routeMessage(AgentMessage msg) throws InterruptedException { + System.out.println("Event bus routing message"); + executors.get(msg.getDestinationModule()).addMessage(msg); + } + + public void addMessage(AgentMessage msg) throws InterruptedException { + this.events.put(msg); + } } -- cgit v1.2.3 From 6af48bd213350954914e899b2bf23ebd0da5ed22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 23 Dec 2019 19:07:46 +0100 Subject: Implement executor and event bus dispatch on threads and queues --- .../java/pl/edu/mimuw/cloudatlas/agent/Agent.java | 77 ++++++++++++++++++++-- 1 file changed, 73 insertions(+), 4 deletions(-) (limited to 'src/main/java/pl/edu/mimuw/cloudatlas') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 8eb8b4f..0cfdfd6 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -4,24 +4,93 @@ import java.rmi.RemoteException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; import java.rmi.server.UnicastRemoteObject; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; import pl.edu.mimuw.cloudatlas.api.Api; import pl.edu.mimuw.cloudatlas.interpreter.Main; import pl.edu.mimuw.cloudatlas.model.ZMI; public class Agent { - public static void main(String[] args) { + + public static void runRegistry() { try { ZMI root = Main.createTestHierarchy2(); ApiImplementation api = new ApiImplementation(root); Api apiStub = - (Api) UnicastRemoteObject.exportObject(api, 0); + (Api) UnicastRemoteObject.exportObject(api, 0); Registry registry = LocateRegistry.getRegistry(); registry.rebind("Api", apiStub); - System.out.println("Api bound"); + System.out.println("Agent: api bound"); } catch (Exception e) { - System.err.println("Agent exception:"); + System.err.println("Agent registry initialization exception:"); e.printStackTrace(); } } + + public static HashMap initializeModules() { + HashMap modules = new HashMap(); + // TODO add modules as we implement them + return modules; + } + + public static HashMap initializeExecutors( + HashMap modules) { + HashMap executors = new HashMap(); + Iterator it = modules.entrySet().iterator(); + + while (it.hasNext()) { + Map.Entry moduleEntry = + (Map.Entry) it.next(); + Module module = moduleEntry.getValue(); + Executor executor = new Executor(module); + executors.put(moduleEntry.getKey(), executor); + } + + return executors; + } + + public static ArrayList initializeExecutorThreads(HashMap executors) { + ArrayList executorThreads = new ArrayList(); + Iterator it = executors.entrySet().iterator(); + + while (it.hasNext()) { + Map.Entry executorEntry = + (Map.Entry) it.next(); + Thread thread = new Thread(executorEntry.getValue()); + thread.setDaemon(true); + System.out.println("Initializing executor " + executorEntry.getKey()); + thread.start(); + executorThreads.add(thread); + } + + return executorThreads; + } + + public static void closeExecutors(ArrayList executorThreads) { + for (Thread executorThread : executorThreads) { + executorThread.interrupt(); + } + } + + public static void runModulesAsThreads() { + HashMap modules = initializeModules(); + HashMap executors = initializeExecutors(modules); + ArrayList executorThreads = initializeExecutorThreads(executors); + + Thread eventBusThread = new Thread(new EventBus(executors)); + System.out.println("Initializing event bus"); + eventBusThread.start(); + + System.out.println("Closing executors"); + closeExecutors(executorThreads); + } + + public static void main(String[] args) { + runRegistry(); + runModulesAsThreads(); + } } -- cgit v1.2.3 From b88b2866f5731491f1e7957a678559a1922c983a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 23 Dec 2019 19:08:08 +0100 Subject: Add simple message constructor --- .../java/pl/edu/mimuw/cloudatlas/agent/message/AgentMessage.java | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'src/main/java/pl/edu/mimuw/cloudatlas') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/message/AgentMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/message/AgentMessage.java index ab24f48..6e8f51c 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/message/AgentMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/message/AgentMessage.java @@ -1,5 +1,7 @@ package pl.edu.mimuw.cloudatlas.agent.message; +import pl.edu.mimuw.cloudatlas.agent.Agent; + public class AgentMessage { public enum AgentModule { @@ -23,6 +25,12 @@ public class AgentMessage { this.timestamp = timestamp; } + public AgentMessage(String messageId, AgentModule destinationModule) { + this.messageId = messageId; + this.destinationModule = destinationModule; + this.timestamp = System.currentTimeMillis() / 1000L; + } + public String getMessageId() { return messageId; } -- cgit v1.2.3