diff options
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent')
10 files changed, 95 insertions, 60 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 4e75a51..1b078cf 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -5,29 +5,29 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; import pl.edu.mimuw.cloudatlas.agent.modules.RMI; import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduler; public class Agent { - public static HashMap<AgentMessage.AgentModule, Module> initializeModules() { - HashMap<AgentMessage.AgentModule, Module> modules = new HashMap<AgentMessage.AgentModule, Module>(); - modules.put(AgentMessage.AgentModule.TIMER_SCHEDULER, new TimerScheduler(AgentMessage.AgentModule.TIMER_SCHEDULER)); - modules.put(AgentMessage.AgentModule.RMI, new RMI(AgentMessage.AgentModule.RMI)); + public static HashMap<ModuleType, Module> initializeModules() { + HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>(); + modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER)); + modules.put(ModuleType.RMI, new RMI(ModuleType.RMI)); // TODO add modules as we implement them return modules; } - public static HashMap<AgentMessage.AgentModule, Executor> initializeExecutors( - HashMap<AgentMessage.AgentModule, Module> modules) { - HashMap<AgentMessage.AgentModule, Executor> executors = new HashMap<AgentMessage.AgentModule, Executor>(); + public static HashMap<ModuleType, Executor> initializeExecutors( + HashMap<ModuleType, Module> modules) { + HashMap<ModuleType, Executor> executors = new HashMap<ModuleType, Executor>(); Iterator it = modules.entrySet().iterator(); while (it.hasNext()) { - Map.Entry<AgentMessage.AgentModule, Module> moduleEntry = - (Map.Entry<AgentMessage.AgentModule, Module>) it.next(); + Map.Entry<ModuleType, Module> moduleEntry = + (Map.Entry<ModuleType, Module>) it.next(); Module module = moduleEntry.getValue(); Executor executor = new Executor(module); executors.put(moduleEntry.getKey(), executor); @@ -36,13 +36,13 @@ public class Agent { return executors; } - public static ArrayList<Thread> initializeExecutorThreads(HashMap<AgentMessage.AgentModule, Executor> executors) { + public static ArrayList<Thread> initializeExecutorThreads(HashMap<ModuleType, Executor> executors) { ArrayList<Thread> executorThreads = new ArrayList<Thread>(); Iterator it = executors.entrySet().iterator(); while (it.hasNext()) { - Map.Entry<AgentMessage.AgentModule, Executor> executorEntry = - (Map.Entry<AgentMessage.AgentModule, Executor>) it.next(); + Map.Entry<ModuleType, Executor> executorEntry = + (Map.Entry<ModuleType, Executor>) it.next(); Thread thread = new Thread(executorEntry.getValue()); thread.setDaemon(true); System.out.println("Initializing executor " + executorEntry.getKey()); @@ -60,8 +60,8 @@ public class Agent { } public static void runModulesAsThreads() { - HashMap<AgentMessage.AgentModule, Module> modules = initializeModules(); - HashMap<AgentMessage.AgentModule, Executor> executors = initializeExecutors(modules); + HashMap<ModuleType, Module> modules = initializeModules(); + HashMap<ModuleType, Executor> executors = initializeExecutors(modules); ArrayList<Thread> executorThreads = initializeExecutorThreads(executors); Thread eventBusThread = new Thread(new EventBus(executors)); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java index 37c125a..4f939b6 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java @@ -1,6 +1,7 @@ package pl.edu.mimuw.cloudatlas.agent; import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; import java.util.HashMap; import java.util.Iterator; @@ -12,18 +13,18 @@ import java.util.concurrent.LinkedBlockingQueue; */ public class EventBus implements Runnable { private LinkedBlockingQueue<AgentMessage> events; - private HashMap<AgentMessage.AgentModule, Executor> executors; + private HashMap<ModuleType, Executor> executors; void setEventBusReference() { Iterator it = this.executors.entrySet().iterator(); while (it.hasNext()) { - Map.Entry<AgentMessage.AgentModule, Executor> executorEntry = - (Map.Entry<AgentMessage.AgentModule, Executor>) it.next(); + Map.Entry<ModuleType, Executor> executorEntry = + (Map.Entry<ModuleType, Executor>) it.next(); executorEntry.getValue().setEventBus(this); } } - EventBus(HashMap<AgentMessage.AgentModule, Executor> executors) { + EventBus(HashMap<ModuleType, Executor> executors) { this.executors = executors; setEventBusReference(); this.events = new LinkedBlockingQueue<AgentMessage>(); @@ -43,13 +44,11 @@ public class EventBus implements Runnable { } public void routeMessage(AgentMessage msg) throws InterruptedException { - assert msg.getCorrectMessageType() == msg.getDestinationModule(); System.out.println("Event bus routing message"); executors.get(msg.getDestinationModule()).addMessage(msg); } public void addMessage(AgentMessage msg) throws InterruptedException { - assert msg.getCorrectMessageType() == msg.getDestinationModule(); this.events.put(msg); } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java index 6ebf6a6..d648fac 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java @@ -30,6 +30,10 @@ public class Executor implements Runnable { } catch (InterruptedException e) { System.out.println("Executor interrupted. Exiting loop."); break; + } catch (Module.InvalidMessageType e) { + System.out.println("Executor got a message with type it can't handle"); + System.out.println(e); + break; } } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java index b71831f..f343e0f 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java @@ -1,36 +1,25 @@ package pl.edu.mimuw.cloudatlas.agent.messages; -public abstract class AgentMessage { - - public enum AgentModule { - TIMER_SCHEDULER, - TIMER_GTP, - RMI, - UDP, - GOSSIP_IN, - GOSSIP_OUT, - STATE, - QUERY - } +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; +public abstract class AgentMessage { private String messageId; - private AgentModule destinationModule; + private ModuleType destinationModule; private long timestamp; - public AgentMessage(String messageId, AgentModule destinationModule, long timestamp) { + public AgentMessage(String messageId, ModuleType destinationModule, long timestamp) { this.messageId = messageId; this.destinationModule = destinationModule; this.timestamp = timestamp; } - public AgentMessage(String messageId, AgentModule destinationModule) { + public AgentMessage(String messageId, ModuleType destinationModule) { this.messageId = messageId; this.destinationModule = destinationModule; this.timestamp = System.currentTimeMillis() / 1000L; } - public abstract AgentModule getCorrectMessageType(); - public String getMessageId() { return messageId; } @@ -39,14 +28,16 @@ public abstract class AgentMessage { this.messageId = messageId; } - public AgentModule getDestinationModule() { + public ModuleType getDestinationModule() { return destinationModule; } - public void setDestinationModule(AgentModule destinationModule) { + public void setDestinationModule(ModuleType destinationModule) { this.destinationModule = destinationModule; } + public abstract void callMe(Module module) throws InterruptedException, Module.InvalidMessageType; + public long getTimestamp() { return timestamp; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RMIMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RMIMessage.java new file mode 100644 index 0000000..a8e3afb --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RMIMessage.java @@ -0,0 +1,14 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; + +public class RMIMessage extends AgentMessage { + public RMIMessage(String messageId, long timestamp) { + super(messageId, ModuleType.RMI, timestamp); + } + + public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType { + module.handleTyped(this); + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java index 333b381..8566d67 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java @@ -1,6 +1,8 @@ package pl.edu.mimuw.cloudatlas.agent.messages; import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask; +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; public class TimerSchedulerMessage extends AgentMessage { private String requestId; @@ -9,13 +11,12 @@ public class TimerSchedulerMessage extends AgentMessage { private TimerScheduledTask task; public TimerSchedulerMessage(String messageId, - AgentModule destinationModule, long timestamp, String requestId, long delay, long baseTime, TimerScheduledTask task) { - super(messageId, destinationModule, timestamp); + super(messageId, ModuleType.TIMER_SCHEDULER, timestamp); this.requestId = requestId; this.delay = delay; this.baseTime = baseTime; @@ -48,10 +49,9 @@ public class TimerSchedulerMessage extends AgentMessage { public String getRequestId() { return requestId; } - public void setRequestId(String requestId) { this.requestId = requestId; } - - @Override - public AgentModule getCorrectMessageType() { - return AgentModule.TIMER_SCHEDULER; + public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType { + module.handleTyped(this); } + + public void setRequestId(String requestId) { this.requestId = requestId; } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java index 9744971..a50a95f 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java @@ -2,19 +2,37 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import pl.edu.mimuw.cloudatlas.agent.Executor; import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.RMIMessage; /* * A Module is a (potentially stateful) event handler. */ public abstract class Module { - protected AgentMessage.AgentModule moduleType; + public class InvalidMessageType extends Exception { + public InvalidMessageType(String message) { + super(message); + } + } + + protected ModuleType moduleType; protected Executor executor; - public Module(AgentMessage.AgentModule moduleType) { + public Module(ModuleType moduleType) { this.moduleType = moduleType; } - public abstract void handle(AgentMessage event) throws InterruptedException ; + public void handle(AgentMessage event) throws InterruptedException, InvalidMessageType { + event.callMe(this); + } + + public void handleTyped(TimerSchedulerMessage message) throws InterruptedException, InvalidMessageType { + throw new InvalidMessageType("Got a TimerSchedulerMessage in module " + moduleType.toString()); + } + + public void handleTyped(RMIMessage message) throws InterruptedException, InvalidMessageType { + throw new InvalidMessageType("Got an RMIMessage in module " + moduleType.toString()); + } public void setExecutor(Executor executor) { this.executor = executor; diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/ModuleType.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/ModuleType.java new file mode 100644 index 0000000..ff4a92e --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/ModuleType.java @@ -0,0 +1,12 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +public enum ModuleType { + TIMER_SCHEDULER, + TIMER_GTP, + RMI, + UDP, + GOSSIP_IN, + GOSSIP_OUT, + STATE, + QUERY +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/RMI.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/RMI.java index 452b214..1a86fc7 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/RMI.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/RMI.java @@ -2,7 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import pl.edu.mimuw.cloudatlas.agent.ApiImplementation; import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.RMIMessage; import pl.edu.mimuw.cloudatlas.api.Api; import pl.edu.mimuw.cloudatlas.interpreter.Main; import pl.edu.mimuw.cloudatlas.model.ZMI; @@ -12,7 +12,7 @@ import java.rmi.registry.Registry; import java.rmi.server.UnicastRemoteObject; public class RMI extends Module { - public RMI(AgentMessage.AgentModule moduleType) { + public RMI(ModuleType moduleType) { super(moduleType); runRegistry(); } @@ -32,8 +32,6 @@ public class RMI extends Module { } } - @Override - public void handle(AgentMessage event) throws InterruptedException { - assert event.getDestinationModule() == event.getCorrectMessageType(); + public void handleTyped(RMIMessage event) throws InterruptedException { } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/TimerScheduler.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/TimerScheduler.java index 9390940..68a9326 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/TimerScheduler.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/TimerScheduler.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; import java.util.Timer; @@ -17,17 +18,15 @@ import java.util.Timer; public class TimerScheduler extends Module { private Timer timer; - public TimerScheduler(AgentMessage.AgentModule moduleType) { + public TimerScheduler(ModuleType moduleType) { super(moduleType); - assert moduleType == AgentMessage.AgentModule.TIMER_SCHEDULER; + assert moduleType == ModuleType.TIMER_SCHEDULER; this.timer = new Timer(); System.out.println("TimerScheduler instance initialized"); } @Override - public void handle(AgentMessage event) throws InterruptedException { - assert event.getDestinationModule() == event.getCorrectMessageType(); - TimerSchedulerMessage timerEvent = (TimerSchedulerMessage) event; + public void handleTyped(TimerSchedulerMessage timerEvent) throws InterruptedException { addTask(timerEvent); } |