diff options
Diffstat (limited to 'src')
7 files changed, 262 insertions, 9 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 8eb8b4f..0cfdfd6 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -4,24 +4,93 @@ import java.rmi.RemoteException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; import java.rmi.server.UnicastRemoteObject; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; import pl.edu.mimuw.cloudatlas.api.Api; import pl.edu.mimuw.cloudatlas.interpreter.Main; import pl.edu.mimuw.cloudatlas.model.ZMI; public class Agent { - public static void main(String[] args) { + + public static void runRegistry() { try { ZMI root = Main.createTestHierarchy2(); ApiImplementation api = new ApiImplementation(root); Api apiStub = - (Api) UnicastRemoteObject.exportObject(api, 0); + (Api) UnicastRemoteObject.exportObject(api, 0); Registry registry = LocateRegistry.getRegistry(); registry.rebind("Api", apiStub); - System.out.println("Api bound"); + System.out.println("Agent: api bound"); } catch (Exception e) { - System.err.println("Agent exception:"); + System.err.println("Agent registry initialization exception:"); e.printStackTrace(); } } + + public static HashMap<AgentMessage.AgentModule, Module> initializeModules() { + HashMap<AgentMessage.AgentModule, Module> modules = new HashMap<AgentMessage.AgentModule, Module>(); + // TODO add modules as we implement them + return modules; + } + + public static HashMap<AgentMessage.AgentModule, Executor> initializeExecutors( + HashMap<AgentMessage.AgentModule, Module> modules) { + HashMap<AgentMessage.AgentModule, Executor> executors = new HashMap<AgentMessage.AgentModule, Executor>(); + Iterator it = modules.entrySet().iterator(); + + while (it.hasNext()) { + Map.Entry<AgentMessage.AgentModule, Module> moduleEntry = + (Map.Entry<AgentMessage.AgentModule, Module>) it.next(); + Module module = moduleEntry.getValue(); + Executor executor = new Executor(module); + executors.put(moduleEntry.getKey(), executor); + } + + return executors; + } + + public static ArrayList<Thread> initializeExecutorThreads(HashMap<AgentMessage.AgentModule, Executor> executors) { + ArrayList<Thread> executorThreads = new ArrayList<Thread>(); + Iterator it = executors.entrySet().iterator(); + + while (it.hasNext()) { + Map.Entry<AgentMessage.AgentModule, Executor> executorEntry = + (Map.Entry<AgentMessage.AgentModule, Executor>) it.next(); + Thread thread = new Thread(executorEntry.getValue()); + thread.setDaemon(true); + System.out.println("Initializing executor " + executorEntry.getKey()); + thread.start(); + executorThreads.add(thread); + } + + return executorThreads; + } + + public static void closeExecutors(ArrayList<Thread> executorThreads) { + for (Thread executorThread : executorThreads) { + executorThread.interrupt(); + } + } + + public static void runModulesAsThreads() { + HashMap<AgentMessage.AgentModule, Module> modules = initializeModules(); + HashMap<AgentMessage.AgentModule, Executor> executors = initializeExecutors(modules); + ArrayList<Thread> executorThreads = initializeExecutorThreads(executors); + + Thread eventBusThread = new Thread(new EventBus(executors)); + System.out.println("Initializing event bus"); + eventBusThread.start(); + + System.out.println("Closing executors"); + closeExecutors(executorThreads); + } + + public static void main(String[] args) { + runRegistry(); + runModulesAsThreads(); + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java index 47cb1ff..afb4e27 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java @@ -1,7 +1,53 @@ package pl.edu.mimuw.cloudatlas.agent; +import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + /* * The EventBus routes messages sent between Modules. */ -public class EventBus { +public class EventBus implements Runnable { + private LinkedBlockingQueue<AgentMessage> events; + private HashMap<AgentMessage.AgentModule, Executor> executors; + + void setEventBusReference() { + Iterator it = this.executors.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<AgentMessage.AgentModule, Executor> executorEntry = + (Map.Entry<AgentMessage.AgentModule, Executor>) it.next(); + executorEntry.getValue().setEventBus(this); + } + } + + EventBus(HashMap<AgentMessage.AgentModule, Executor> executors) { + this.executors = executors; + setEventBusReference(); + this.events = new LinkedBlockingQueue<AgentMessage>(); + } + + public void run() { + System.out.println("Event bus running"); + while (!Thread.currentThread().interrupted()) { + try { + AgentMessage event = events.take(); + routeMessage(event); + } catch (InterruptedException e) { + System.out.println("Event bus interrupted. Exiting loop."); + break; + } + } + } + + public void routeMessage(AgentMessage msg) throws InterruptedException { + System.out.println("Event bus routing message"); + executors.get(msg.getDestinationModule()).addMessage(msg); + } + + public void addMessage(AgentMessage msg) throws InterruptedException { + this.events.put(msg); + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java index 0651a9b..f3cf2ea 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java @@ -11,16 +11,20 @@ import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; public class Executor implements Runnable { private Module module; private LinkedBlockingQueue<AgentMessage> events; + private EventBus eventBus; public Executor(Module module) { this.module = module; + this.module.setExecutor(this); this.events = new LinkedBlockingQueue<AgentMessage>(); } public void run() { + System.out.println("Executor " + this.module.toString() + " running"); while (!Thread.currentThread().interrupted()) { try { AgentMessage event = events.take(); + System.out.println("Executor " + this.module.toString() + " passed message to handle"); module.handle(event); } catch (InterruptedException e) { System.out.println("Executor interrupted. Exiting loop."); @@ -32,4 +36,12 @@ public class Executor implements Runnable { public void addMessage(AgentMessage event) throws InterruptedException { events.put(event); } + + public void passMessage(AgentMessage event) throws InterruptedException { + eventBus.addMessage(event); + } + + public void setEventBus(EventBus eventBus) { + this.eventBus = eventBus; + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Module.java index 46df31b..167f3b7 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Module.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Module.java @@ -6,5 +6,25 @@ import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; * A Module is a (potentially stateful) event handler. */ public abstract class Module { - public abstract void handle(AgentMessage event); + private AgentMessage.AgentModule moduleType; + private Executor executor; + + Module(AgentMessage.AgentModule moduleType) { + this.moduleType = moduleType; + } + + public abstract void handle(AgentMessage event) throws InterruptedException ; + + public void setExecutor(Executor executor) { + this.executor = executor; + } + + public void sendMessage(AgentMessage event) throws InterruptedException { + this.executor.passMessage(event); + } + + @Override + public String toString() { + return moduleType.toString(); + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/message/AgentMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/message/AgentMessage.java index ab24f48..6e8f51c 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/message/AgentMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/message/AgentMessage.java @@ -1,5 +1,7 @@ package pl.edu.mimuw.cloudatlas.agent.message; +import pl.edu.mimuw.cloudatlas.agent.Agent; + public class AgentMessage { public enum AgentModule { @@ -23,6 +25,12 @@ public class AgentMessage { this.timestamp = timestamp; } + public AgentMessage(String messageId, AgentModule destinationModule) { + this.messageId = messageId; + this.destinationModule = destinationModule; + this.timestamp = System.currentTimeMillis() / 1000L; + } + public String getMessageId() { return messageId; } diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java new file mode 100644 index 0000000..fd7c244 --- /dev/null +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java @@ -0,0 +1,93 @@ +package pl.edu.mimuw.cloudatlas.agent; + +import org.junit.Test; +import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; + +import java.util.ArrayList; +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; + +public class EventBusTest { + public abstract class MessageCounterModule extends Module { + public int counter = 0; + + MessageCounterModule(AgentMessage.AgentModule moduleType) { + super(moduleType); + } + } + + public HashMap<AgentMessage.AgentModule, Module> initializeTwoModules() { + HashMap<AgentMessage.AgentModule, Module> modules = new HashMap<AgentMessage.AgentModule, Module>(); + modules.put(AgentMessage.AgentModule.RMI, new MessageCounterModule(AgentMessage.AgentModule.RMI) { + @Override + public void handle(AgentMessage event) throws InterruptedException { + System.out.println("Module 1 handle called"); + sendMessage(new AgentMessage("1", AgentMessage.AgentModule.UDP)); + counter ++; + } + }); + + modules.put(AgentMessage.AgentModule.UDP, new MessageCounterModule(AgentMessage.AgentModule.UDP) { + @Override + public void handle(AgentMessage event) { + System.out.println("Module 2 handle called"); + counter++; + } + }); + + return modules; + } + + public HashMap<AgentMessage.AgentModule, Module> initializeModule() { + HashMap<AgentMessage.AgentModule, Module> modules = new HashMap<AgentMessage.AgentModule, Module>(); + + modules.put(AgentMessage.AgentModule.RMI, new MessageCounterModule(AgentMessage.AgentModule.RMI) { + @Override + public void handle(AgentMessage event) { + System.out.println("Module 1 handle called"); + counter++; + } + }); + + return modules; + } + + @Test + public void messageModule() throws InterruptedException { + HashMap<AgentMessage.AgentModule, Module> modules = initializeModule(); + HashMap<AgentMessage.AgentModule, Executor> executors = Agent.initializeExecutors(modules); + ArrayList<Thread> executorThreads = Agent.initializeExecutorThreads(executors); + EventBus eventBus = new EventBus(executors); + Thread eventBusThread = new Thread(eventBus); + + eventBusThread.start(); + eventBus.addMessage(new AgentMessage("0", AgentMessage.AgentModule.RMI)); + Thread.sleep(1000); + eventBusThread.interrupt(); + Agent.closeExecutors(executorThreads); + assertEquals(1, ((MessageCounterModule) modules.get(AgentMessage.AgentModule.RMI)).counter); + } + + @Test + public void messagingBetweenModules() throws InterruptedException { + HashMap<AgentMessage.AgentModule, Module> modules = initializeTwoModules(); + HashMap<AgentMessage.AgentModule, Executor> executors = Agent.initializeExecutors(modules); + ArrayList<Thread> executorThreads = Agent.initializeExecutorThreads(executors); + EventBus eventBus = new EventBus(executors); + Thread eventBusThread = new Thread(eventBus); + eventBusThread.start(); + + eventBus.addMessage(new AgentMessage( + "0", + AgentMessage.AgentModule.RMI, + System.currentTimeMillis() / 1000L)); + + Thread.sleep(1000); + + eventBusThread.interrupt(); + Agent.closeExecutors(executorThreads); + assertEquals(1, ((MessageCounterModule) modules.get(AgentMessage.AgentModule.RMI)).counter); + assertEquals(1, ((MessageCounterModule) modules.get(AgentMessage.AgentModule.UDP)).counter); + } +} diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/ExecutorTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/ExecutorTest.java index 445a5b4..2a6ef49 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/ExecutorTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/ExecutorTest.java @@ -9,6 +9,11 @@ import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage.AgentModule; public class ExecutorTest { public class MessageCounterModule extends Module { public int counter = 0; + + MessageCounterModule(AgentModule moduleType) { + super(moduleType); + } + public void handle(AgentMessage m) { counter++; } @@ -16,7 +21,7 @@ public class ExecutorTest { @Test public void testDoesntExecuteWhenNoMessages() throws Exception { - MessageCounterModule module = new MessageCounterModule(); + MessageCounterModule module = new MessageCounterModule(AgentModule.UDP); Executor executor = new Executor(module); Thread thread = new Thread(executor); thread.start(); @@ -27,7 +32,7 @@ public class ExecutorTest { @Test public void testExecutesHandlerOnce() throws Exception { - MessageCounterModule module = new MessageCounterModule(); + MessageCounterModule module = new MessageCounterModule(AgentModule.UDP); Executor executor = new Executor(module); executor.addMessage(new AgentMessage("", AgentModule.UDP, 0) {}); Thread thread = new Thread(executor); @@ -39,7 +44,7 @@ public class ExecutorTest { @Test public void testExecutesHandlerMultipleTimes() throws Exception { - MessageCounterModule module = new MessageCounterModule(); + MessageCounterModule module = new MessageCounterModule(AgentModule.UDP); Executor executor = new Executor(module); executor.addMessage(new AgentMessage("", AgentModule.UDP, 0) {}); executor.addMessage(new AgentMessage("", AgentModule.UDP, 0) {}); |