diff options
12 files changed, 393 insertions, 33 deletions
@@ -3,3 +3,7 @@ # Ignore Gradle build output directory build + +# Ignore IntelliJ directories +.idea +scratch
\ No newline at end of file diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 0cfdfd6..4bd522d 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -1,6 +1,5 @@ package pl.edu.mimuw.cloudatlas.agent; -import java.rmi.RemoteException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; import java.rmi.server.UnicastRemoteObject; @@ -9,7 +8,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.modules.Module; import pl.edu.mimuw.cloudatlas.api.Api; import pl.edu.mimuw.cloudatlas.interpreter.Main; import pl.edu.mimuw.cloudatlas.model.ZMI; diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java index afb4e27..37c125a 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java @@ -1,6 +1,6 @@ package pl.edu.mimuw.cloudatlas.agent; -import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; import java.util.HashMap; import java.util.Iterator; @@ -43,11 +43,13 @@ public class EventBus implements Runnable { } public void routeMessage(AgentMessage msg) throws InterruptedException { + assert msg.getCorrectMessageType() == msg.getDestinationModule(); System.out.println("Event bus routing message"); executors.get(msg.getDestinationModule()).addMessage(msg); } public void addMessage(AgentMessage msg) throws InterruptedException { + assert msg.getCorrectMessageType() == msg.getDestinationModule(); this.events.put(msg); } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java index f3cf2ea..6ebf6a6 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java @@ -2,7 +2,8 @@ package pl.edu.mimuw.cloudatlas.agent; import java.util.concurrent.LinkedBlockingQueue; -import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.modules.Module; /* * Queues messages sent to a particular module and ensures they are eventually diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/message/AgentMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java index 6e8f51c..b71831f 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/message/AgentMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java @@ -1,8 +1,6 @@ -package pl.edu.mimuw.cloudatlas.agent.message; +package pl.edu.mimuw.cloudatlas.agent.messages; -import pl.edu.mimuw.cloudatlas.agent.Agent; - -public class AgentMessage { +public abstract class AgentMessage { public enum AgentModule { TIMER_SCHEDULER, @@ -31,6 +29,8 @@ public class AgentMessage { this.timestamp = System.currentTimeMillis() / 1000L; } + public abstract AgentModule getCorrectMessageType(); + public String getMessageId() { return messageId; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/message/TimerSchedulerMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java index 9743533..333b381 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/message/TimerSchedulerMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java @@ -1,14 +1,20 @@ -package pl.edu.mimuw.cloudatlas.agent.message; +package pl.edu.mimuw.cloudatlas.agent.messages; -import java.util.TimerTask; +import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask; public class TimerSchedulerMessage extends AgentMessage { private String requestId; private long delay; private long baseTime; - private TimerTask task; - - public TimerSchedulerMessage(String messageId, AgentModule destinationModule, long timestamp, String requestId, long delay, long baseTime, TimerTask task) { + private TimerScheduledTask task; + + public TimerSchedulerMessage(String messageId, + AgentModule destinationModule, + long timestamp, + String requestId, + long delay, + long baseTime, + TimerScheduledTask task) { super(messageId, destinationModule, timestamp); this.requestId = requestId; this.delay = delay; @@ -32,15 +38,20 @@ public class TimerSchedulerMessage extends AgentMessage { this.baseTime = baseTime; } - public TimerTask getTask() { + public TimerScheduledTask getTask() { return task; } - public void setTask(TimerTask task) { + public void setTask(TimerScheduledTask task) { this.task = task; } public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } + + @Override + public AgentModule getCorrectMessageType() { + return AgentModule.TIMER_SCHEDULER; + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java index 167f3b7..9744971 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Module.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java @@ -1,15 +1,16 @@ -package pl.edu.mimuw.cloudatlas.agent; +package pl.edu.mimuw.cloudatlas.agent.modules; -import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.Executor; +import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; /* * A Module is a (potentially stateful) event handler. */ public abstract class Module { - private AgentMessage.AgentModule moduleType; - private Executor executor; + protected AgentMessage.AgentModule moduleType; + protected Executor executor; - Module(AgentMessage.AgentModule moduleType) { + public Module(AgentMessage.AgentModule moduleType) { this.moduleType = moduleType; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/TimerScheduledTask.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/TimerScheduledTask.java new file mode 100644 index 0000000..7a0da70 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/TimerScheduledTask.java @@ -0,0 +1,19 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; + +import java.util.TimerTask; + +public abstract class TimerScheduledTask extends TimerTask { + private TimerScheduler scheduler; + + public abstract void run(); + + public void setScheduler(TimerScheduler scheduler) { + this.scheduler = scheduler; + } + + protected void sendMessage(AgentMessage msg) throws InterruptedException { + this.scheduler.passMessageFromTask(msg); + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/TimerScheduler.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/TimerScheduler.java new file mode 100644 index 0000000..9390940 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/TimerScheduler.java @@ -0,0 +1,47 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; + +import java.util.Timer; + +/** + * Initializes a timer within a constructor during its attachment to the executor + * Runs in a thread separate from executor - maybe refactor so that it's attached to executor's thread + * + * Handle used to attach tasks to schedule + * Tasks declared as inherited from TimerTask + * + * TODO: add request id and custom time + */ +public class TimerScheduler extends Module { + private Timer timer; + + public TimerScheduler(AgentMessage.AgentModule moduleType) { + super(moduleType); + assert moduleType == AgentMessage.AgentModule.TIMER_SCHEDULER; + this.timer = new Timer(); + System.out.println("TimerScheduler instance initialized"); + } + + @Override + public void handle(AgentMessage event) throws InterruptedException { + assert event.getDestinationModule() == event.getCorrectMessageType(); + TimerSchedulerMessage timerEvent = (TimerSchedulerMessage) event; + addTask(timerEvent); + } + + public void addTask(TimerSchedulerMessage msg) { + TimerScheduledTask task = msg.getTask(); + task.setScheduler(this); + this.timer.schedule(task, msg.getDelay()); + System.out.println("Task with delay " + msg.getDelay() + " scheduled"); + } + + // TODO + public void removeTask(String requestId) {} + + public void passMessageFromTask(AgentMessage msg) throws InterruptedException { + this.executor.passMessage(msg); + } +} diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java index fd7c244..f3df887 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/EventBusTest.java @@ -1,7 +1,13 @@ package pl.edu.mimuw.cloudatlas.agent; +import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; -import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; +import org.junit.runners.JUnit4; +import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask; import java.util.ArrayList; import java.util.HashMap; @@ -23,7 +29,13 @@ public class EventBusTest { @Override public void handle(AgentMessage event) throws InterruptedException { System.out.println("Module 1 handle called"); - sendMessage(new AgentMessage("1", AgentMessage.AgentModule.UDP)); + // TODO correct message subclass + sendMessage(new AgentMessage("1", AgentMessage.AgentModule.UDP) { + @Override + public AgentModule getCorrectMessageType() { + return AgentModule.UDP; + } + }); counter ++; } }); @@ -54,6 +66,8 @@ public class EventBusTest { } @Test + @Ignore + // TODO correct message subclass public void messageModule() throws InterruptedException { HashMap<AgentMessage.AgentModule, Module> modules = initializeModule(); HashMap<AgentMessage.AgentModule, Executor> executors = Agent.initializeExecutors(modules); @@ -62,7 +76,12 @@ public class EventBusTest { Thread eventBusThread = new Thread(eventBus); eventBusThread.start(); - eventBus.addMessage(new AgentMessage("0", AgentMessage.AgentModule.RMI)); + eventBus.addMessage(new AgentMessage("0", AgentMessage.AgentModule.RMI) { + @Override + public AgentModule getCorrectMessageType() { + return AgentModule.RMI; + } + }); Thread.sleep(1000); eventBusThread.interrupt(); Agent.closeExecutors(executorThreads); @@ -70,6 +89,8 @@ public class EventBusTest { } @Test + @Ignore + // TODO correct message subclass public void messagingBetweenModules() throws InterruptedException { HashMap<AgentMessage.AgentModule, Module> modules = initializeTwoModules(); HashMap<AgentMessage.AgentModule, Executor> executors = Agent.initializeExecutors(modules); @@ -78,10 +99,12 @@ public class EventBusTest { Thread eventBusThread = new Thread(eventBus); eventBusThread.start(); - eventBus.addMessage(new AgentMessage( - "0", - AgentMessage.AgentModule.RMI, - System.currentTimeMillis() / 1000L)); + eventBus.addMessage(new AgentMessage("0", AgentMessage.AgentModule.RMI) { + @Override + public AgentModule getCorrectMessageType() { + return AgentModule.RMI; + } + }); Thread.sleep(1000); @@ -90,4 +113,73 @@ public class EventBusTest { assertEquals(1, ((MessageCounterModule) modules.get(AgentMessage.AgentModule.RMI)).counter); assertEquals(1, ((MessageCounterModule) modules.get(AgentMessage.AgentModule.UDP)).counter); } + + @Test + public void sendWrongMessageType1() throws InterruptedException { + HashMap<AgentMessage.AgentModule, Module> modules = initializeModule(); + HashMap<AgentMessage.AgentModule, Executor> executors = Agent.initializeExecutors(modules); + ArrayList<Thread> executorThreads = Agent.initializeExecutorThreads(executors); + EventBus eventBus = new EventBus(executors); + Thread eventBusThread = new Thread(eventBus); + Boolean routingErrorCaught = false; + eventBusThread.start(); + + try { + eventBus.addMessage(new TimerSchedulerMessage( + "0", + AgentMessage.AgentModule.RMI, + System.currentTimeMillis() / 1000L, + "1", + 10, + System.currentTimeMillis() / 1000L, + new TimerScheduledTask() { + @Override + public void run() { + System.out.println("Task executed"); + } + })); + Thread.sleep(1000); + } catch (AssertionError e) { + System.out.println("Wrong timer-scheduler message type error caught"); + routingErrorCaught = true; + } finally { + eventBusThread.interrupt(); + Agent.closeExecutors(executorThreads); + } + + if (!routingErrorCaught) { + Assert.fail("Routing not detected as faulty"); + } + } + + @Test + public void sendWrongMessageType2() throws InterruptedException { + HashMap<AgentMessage.AgentModule, Module> modules = initializeModule(); + HashMap<AgentMessage.AgentModule, Executor> executors = Agent.initializeExecutors(modules); + ArrayList<Thread> executorThreads = Agent.initializeExecutorThreads(executors); + EventBus eventBus = new EventBus(executors); + Thread eventBusThread = new Thread(eventBus); + Boolean routingErrorCaught = false; + eventBusThread.start(); + + try { + eventBus.addMessage(new AgentMessage("0", AgentMessage.AgentModule.RMI) { + @Override + public AgentModule getCorrectMessageType() { + return AgentModule.QUERY; + } + }); + Thread.sleep(1000); + } catch (AssertionError e) { + System.out.println("Wrong timer-scheduler message type error caught"); + routingErrorCaught = true; + } finally { + eventBusThread.interrupt(); + Agent.closeExecutors(executorThreads); + } + + if (!routingErrorCaught) { + Assert.fail("Routing not detected as faulty"); + } + } } diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/ExecutorTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/ExecutorTest.java index 2a6ef49..8653258 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/ExecutorTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/ExecutorTest.java @@ -3,8 +3,11 @@ package pl.edu.mimuw.cloudatlas.agent; import org.junit.Test; import static org.junit.Assert.*; -import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage; -import pl.edu.mimuw.cloudatlas.agent.message.AgentMessage.AgentModule; +import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage.AgentModule; +import pl.edu.mimuw.cloudatlas.agent.modules.Module; + +// TODO make agent messages specific subclass public class ExecutorTest { public class MessageCounterModule extends Module { @@ -34,7 +37,12 @@ public class ExecutorTest { public void testExecutesHandlerOnce() throws Exception { MessageCounterModule module = new MessageCounterModule(AgentModule.UDP); Executor executor = new Executor(module); - executor.addMessage(new AgentMessage("", AgentModule.UDP, 0) {}); + executor.addMessage(new AgentMessage("", AgentModule.UDP, 0) { + @Override + public AgentModule getCorrectMessageType() { + return AgentModule.UDP; + } + }); Thread thread = new Thread(executor); thread.start(); Thread.sleep(100); @@ -46,12 +54,27 @@ public class ExecutorTest { public void testExecutesHandlerMultipleTimes() throws Exception { MessageCounterModule module = new MessageCounterModule(AgentModule.UDP); Executor executor = new Executor(module); - executor.addMessage(new AgentMessage("", AgentModule.UDP, 0) {}); - executor.addMessage(new AgentMessage("", AgentModule.UDP, 0) {}); + executor.addMessage(new AgentMessage("", AgentModule.UDP, 0) { + @Override + public AgentModule getCorrectMessageType() { + return AgentModule.UDP; + } + }); + executor.addMessage(new AgentMessage("", AgentModule.UDP, 0) { + @Override + public AgentModule getCorrectMessageType() { + return AgentModule.UDP; + } + }); Thread thread = new Thread(executor); thread.start(); Thread.sleep(100); - executor.addMessage(new AgentMessage("", AgentModule.UDP, 0) {}); + executor.addMessage(new AgentMessage("", AgentModule.UDP, 0) { + @Override + public AgentModule getCorrectMessageType() { + return AgentModule.UDP; + } + }); Thread.sleep(100); thread.interrupt(); assertEquals(3, module.counter); diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/SchedulerTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/SchedulerTest.java new file mode 100644 index 0000000..133b3b9 --- /dev/null +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/SchedulerTest.java @@ -0,0 +1,160 @@ +package pl.edu.mimuw.cloudatlas.agent; + +import org.junit.Test; +import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask; +import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduler; + +import java.util.ArrayList; +import java.util.HashMap; + +// TODO better task tests with counters after enabling messaging from tasks +// TODO add wrong message test with switched types + +public class SchedulerTest { + private HashMap<AgentMessage.AgentModule, Module> modules; + private HashMap<AgentMessage.AgentModule, Executor> executors; + private ArrayList<Thread> executorThreads; + private EventBus eventBus; + private Thread eventBusThread; + + public SchedulerTest() { + this.modules = initializeModule(); + this.executors = Agent.initializeExecutors(modules); + this.executorThreads = Agent.initializeExecutorThreads(executors); + this.eventBus = new EventBus(executors); + this.eventBusThread = new Thread(eventBus); + eventBusThread.start(); + } + + public HashMap<AgentMessage.AgentModule, Module> initializeModule() { + HashMap<AgentMessage.AgentModule, Module> modules = new HashMap<AgentMessage.AgentModule, Module>(); + modules.put(AgentMessage.AgentModule.TIMER_SCHEDULER, new TimerScheduler(AgentMessage.AgentModule.TIMER_SCHEDULER)); + return modules; + } + + @Test + public void initializeWrongModuleType() { + try { + Module timer = new TimerScheduler(AgentMessage.AgentModule.RMI); + } catch (AssertionError e) { + System.out.println("Wrong timer type during init error caught"); + } + } + + @Test + public void sendWrongMessageType() throws InterruptedException { + try { + this.eventBus.addMessage(new TimerSchedulerMessage( + "0", + AgentMessage.AgentModule.UDP, + System.currentTimeMillis() / 1000L, + "1", + 10, + System.currentTimeMillis() / 1000L, + new TimerScheduledTask() { + @Override + public void run() { + System.out.println("Task executed"); + } + })); + Thread.sleep(1000); + } catch (AssertionError e) { + System.out.println("Wrong timer-scheduler message type error caught"); + } + } + + @Test + public void scheduleTask() throws InterruptedException { + this.eventBus.addMessage(new TimerSchedulerMessage( + "0", + AgentMessage.AgentModule.TIMER_SCHEDULER, + System.currentTimeMillis() / 1000L, + "1", + 10, + System.currentTimeMillis() / 1000L, + new TimerScheduledTask() { + @Override + public void run() { + System.out.println("Task executed"); + } + })); + + Thread.sleep(1000); + } + + @Test + public void scheduleTwoTasks() throws InterruptedException { + this.eventBus.addMessage(new TimerSchedulerMessage( + "0", + AgentMessage.AgentModule.TIMER_SCHEDULER, + System.currentTimeMillis() / 1000L, + "1", + 10, + System.currentTimeMillis() / 1000L, + new TimerScheduledTask() { + @Override + public void run() { + System.out.println("Task 1 executed"); + } + })); + + this.eventBus.addMessage(new TimerSchedulerMessage( + "0", + AgentMessage.AgentModule.TIMER_SCHEDULER, + System.currentTimeMillis() / 1000L, + "1", + 20, + System.currentTimeMillis() / 1000L, + new TimerScheduledTask() { + @Override + public void run() { + System.out.println("Task 2 executed"); + } + })); + + + Thread.sleep(1000); + } + + @Test + public void scheduleTwoMessagingTasks() throws InterruptedException { + TimerSchedulerMessage messageToSend = new TimerSchedulerMessage( + "0", + AgentMessage.AgentModule.TIMER_SCHEDULER, + System.currentTimeMillis() / 1000L, + "1", + 20, + System.currentTimeMillis() / 1000L, + new TimerScheduledTask() { + @Override + public void run() { + System.out.println("Task 2 executed"); + } + }); + + this.eventBus.addMessage(new TimerSchedulerMessage( + "0", + AgentMessage.AgentModule.TIMER_SCHEDULER, + System.currentTimeMillis() / 1000L, + "1", + 10, + System.currentTimeMillis() / 1000L, + new TimerScheduledTask() { + @Override + public void run() { + try { + this.sendMessage(messageToSend); + } catch (InterruptedException e) { + System.out.println("Task 1 message interrupted"); + e.printStackTrace(); + } + System.out.println("Task 1 executed"); + } + })); + + Thread.sleep(1000); + } +} |