diff options
| author | Martin <marcin.j.chrzanowski@gmail.com> | 2019-12-26 15:27:44 +0100 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-12-26 15:27:44 +0100 | 
| commit | 8b076ed54b692381a3c1410b704bdad33ad5ca0b (patch) | |
| tree | 004c318b28166e70d3ef8fd988d75fef136c68ff /src/main/java/pl/edu | |
| parent | cbb43ba62d11888ce73314465dcffb537b6cd295 (diff) | |
Refactor handle (#75)
* Refactor passing messages to handler
* Add RMIMessage skeleton
* Handle RMI message in RMI module
Diffstat (limited to 'src/main/java/pl/edu')
10 files changed, 95 insertions, 60 deletions
| diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 4e75a51..1b078cf 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -5,29 +5,29 @@ import java.util.HashMap;  import java.util.Iterator;  import java.util.Map; -import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;  import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;  import pl.edu.mimuw.cloudatlas.agent.modules.RMI;  import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduler;  public class Agent { -    public static HashMap<AgentMessage.AgentModule, Module> initializeModules() { -        HashMap<AgentMessage.AgentModule, Module> modules = new HashMap<AgentMessage.AgentModule, Module>(); -        modules.put(AgentMessage.AgentModule.TIMER_SCHEDULER, new TimerScheduler(AgentMessage.AgentModule.TIMER_SCHEDULER)); -        modules.put(AgentMessage.AgentModule.RMI, new RMI(AgentMessage.AgentModule.RMI)); +    public static HashMap<ModuleType, Module> initializeModules() { +        HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>(); +        modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER)); +        modules.put(ModuleType.RMI, new RMI(ModuleType.RMI));          // TODO add modules as we implement them          return modules;      } -    public static HashMap<AgentMessage.AgentModule, Executor> initializeExecutors( -            HashMap<AgentMessage.AgentModule, Module> modules) { -        HashMap<AgentMessage.AgentModule, Executor> executors = new HashMap<AgentMessage.AgentModule, Executor>(); +    public static HashMap<ModuleType, Executor> initializeExecutors( +            HashMap<ModuleType, Module> modules) { +        HashMap<ModuleType, Executor> executors = new HashMap<ModuleType, Executor>();          Iterator it = modules.entrySet().iterator();          while (it.hasNext()) { -            Map.Entry<AgentMessage.AgentModule, Module> moduleEntry = -                    (Map.Entry<AgentMessage.AgentModule, Module>) it.next(); +            Map.Entry<ModuleType, Module> moduleEntry = +                    (Map.Entry<ModuleType, Module>) it.next();              Module module = moduleEntry.getValue();              Executor executor = new Executor(module);              executors.put(moduleEntry.getKey(), executor); @@ -36,13 +36,13 @@ public class Agent {          return executors;      } -    public static ArrayList<Thread>  initializeExecutorThreads(HashMap<AgentMessage.AgentModule, Executor> executors) { +    public static ArrayList<Thread>  initializeExecutorThreads(HashMap<ModuleType, Executor> executors) {          ArrayList<Thread> executorThreads = new ArrayList<Thread>();          Iterator it = executors.entrySet().iterator();          while (it.hasNext()) { -            Map.Entry<AgentMessage.AgentModule, Executor> executorEntry = -                    (Map.Entry<AgentMessage.AgentModule, Executor>) it.next(); +            Map.Entry<ModuleType, Executor> executorEntry = +                    (Map.Entry<ModuleType, Executor>) it.next();              Thread thread = new Thread(executorEntry.getValue());              thread.setDaemon(true);              System.out.println("Initializing executor " + executorEntry.getKey()); @@ -60,8 +60,8 @@ public class Agent {      }      public static void runModulesAsThreads() { -        HashMap<AgentMessage.AgentModule, Module> modules = initializeModules(); -        HashMap<AgentMessage.AgentModule, Executor> executors = initializeExecutors(modules); +        HashMap<ModuleType, Module> modules = initializeModules(); +        HashMap<ModuleType, Executor> executors = initializeExecutors(modules);          ArrayList<Thread> executorThreads = initializeExecutorThreads(executors);          Thread eventBusThread = new Thread(new EventBus(executors)); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java index 37c125a..4f939b6 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java @@ -1,6 +1,7 @@  package pl.edu.mimuw.cloudatlas.agent;  import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;  import java.util.HashMap;  import java.util.Iterator; @@ -12,18 +13,18 @@ import java.util.concurrent.LinkedBlockingQueue;   */  public class EventBus implements Runnable {      private LinkedBlockingQueue<AgentMessage> events; -    private HashMap<AgentMessage.AgentModule, Executor> executors; +    private HashMap<ModuleType, Executor> executors;      void setEventBusReference() {          Iterator it = this.executors.entrySet().iterator();          while (it.hasNext()) { -            Map.Entry<AgentMessage.AgentModule, Executor> executorEntry = -                    (Map.Entry<AgentMessage.AgentModule, Executor>) it.next(); +            Map.Entry<ModuleType, Executor> executorEntry = +                    (Map.Entry<ModuleType, Executor>) it.next();              executorEntry.getValue().setEventBus(this);          }      } -    EventBus(HashMap<AgentMessage.AgentModule, Executor> executors) { +    EventBus(HashMap<ModuleType, Executor> executors) {          this.executors = executors;          setEventBusReference();          this.events = new LinkedBlockingQueue<AgentMessage>(); @@ -43,13 +44,11 @@ public class EventBus implements Runnable {      }      public void routeMessage(AgentMessage msg) throws InterruptedException { -        assert msg.getCorrectMessageType() == msg.getDestinationModule();          System.out.println("Event bus routing message");          executors.get(msg.getDestinationModule()).addMessage(msg);      }      public void addMessage(AgentMessage msg) throws InterruptedException { -        assert msg.getCorrectMessageType() == msg.getDestinationModule();          this.events.put(msg);      }  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java index 6ebf6a6..d648fac 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Executor.java @@ -30,6 +30,10 @@ public class Executor implements Runnable {              } catch (InterruptedException e) {                  System.out.println("Executor interrupted. Exiting loop.");                  break; +            } catch (Module.InvalidMessageType e) { +                System.out.println("Executor got a message with type it can't handle"); +                System.out.println(e); +                break;              }          }      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java index b71831f..f343e0f 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java @@ -1,36 +1,25 @@  package pl.edu.mimuw.cloudatlas.agent.messages; -public abstract class AgentMessage { - -    public enum AgentModule { -        TIMER_SCHEDULER, -        TIMER_GTP, -        RMI, -        UDP, -        GOSSIP_IN, -        GOSSIP_OUT, -        STATE, -        QUERY -    } +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; +public abstract class AgentMessage {      private String messageId; -    private AgentModule destinationModule; +    private ModuleType destinationModule;      private long timestamp; -    public AgentMessage(String messageId, AgentModule destinationModule, long timestamp) { +    public AgentMessage(String messageId, ModuleType destinationModule, long timestamp) {          this.messageId = messageId;          this.destinationModule = destinationModule;          this.timestamp = timestamp;      } -    public AgentMessage(String messageId, AgentModule destinationModule) { +    public AgentMessage(String messageId, ModuleType destinationModule) {          this.messageId = messageId;          this.destinationModule = destinationModule;          this.timestamp = System.currentTimeMillis() / 1000L;      } -    public abstract AgentModule getCorrectMessageType(); -      public String getMessageId() {          return messageId;      } @@ -39,14 +28,16 @@ public abstract class AgentMessage {          this.messageId = messageId;      } -    public AgentModule getDestinationModule() { +    public ModuleType getDestinationModule() {          return destinationModule;      } -    public void setDestinationModule(AgentModule destinationModule) { +    public void setDestinationModule(ModuleType destinationModule) {          this.destinationModule = destinationModule;      } +    public abstract void callMe(Module module) throws InterruptedException, Module.InvalidMessageType; +      public long getTimestamp() {          return timestamp;      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RMIMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RMIMessage.java new file mode 100644 index 0000000..a8e3afb --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RMIMessage.java @@ -0,0 +1,14 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; + +public class RMIMessage extends AgentMessage { +    public RMIMessage(String messageId, long timestamp) { +        super(messageId, ModuleType.RMI, timestamp); +    } + +    public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType { +        module.handleTyped(this); +    } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java index 333b381..8566d67 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java @@ -1,6 +1,8 @@  package pl.edu.mimuw.cloudatlas.agent.messages;  import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask; +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;  public class TimerSchedulerMessage extends AgentMessage {      private String requestId; @@ -9,13 +11,12 @@ public class TimerSchedulerMessage extends AgentMessage {      private TimerScheduledTask task;      public TimerSchedulerMessage(String messageId, -                                 AgentModule destinationModule,                                   long timestamp,                                   String requestId,                                   long delay,                                   long baseTime,                                   TimerScheduledTask task) { -        super(messageId, destinationModule, timestamp); +        super(messageId, ModuleType.TIMER_SCHEDULER, timestamp);          this.requestId = requestId;          this.delay = delay;          this.baseTime = baseTime; @@ -48,10 +49,9 @@ public class TimerSchedulerMessage extends AgentMessage {      public String getRequestId() { return requestId; } -    public void setRequestId(String requestId) { this.requestId = requestId; } - -    @Override -    public AgentModule getCorrectMessageType() { -        return AgentModule.TIMER_SCHEDULER; +    public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType { +        module.handleTyped(this);      } + +    public void setRequestId(String requestId) { this.requestId = requestId; }  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java index 9744971..a50a95f 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java @@ -2,19 +2,37 @@ package pl.edu.mimuw.cloudatlas.agent.modules;  import pl.edu.mimuw.cloudatlas.agent.Executor;  import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.RMIMessage;  /*   * A Module is a (potentially stateful) event handler.   */  public abstract class Module { -    protected AgentMessage.AgentModule moduleType; +    public class InvalidMessageType extends Exception { +        public InvalidMessageType(String message) { +            super(message); +        } +    } + +    protected ModuleType moduleType;      protected Executor executor; -    public Module(AgentMessage.AgentModule moduleType) { +    public Module(ModuleType moduleType) {          this.moduleType = moduleType;      } -    public abstract void handle(AgentMessage event) throws InterruptedException ; +    public void handle(AgentMessage event) throws InterruptedException, InvalidMessageType { +        event.callMe(this); +    } + +    public void handleTyped(TimerSchedulerMessage message) throws InterruptedException, InvalidMessageType { +        throw new InvalidMessageType("Got a TimerSchedulerMessage in module " + moduleType.toString()); +    } + +    public void handleTyped(RMIMessage message) throws InterruptedException, InvalidMessageType { +        throw new InvalidMessageType("Got an RMIMessage in module " + moduleType.toString()); +    }      public void setExecutor(Executor executor) {          this.executor = executor; diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/ModuleType.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/ModuleType.java new file mode 100644 index 0000000..ff4a92e --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/ModuleType.java @@ -0,0 +1,12 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +public enum ModuleType { +    TIMER_SCHEDULER, +    TIMER_GTP, +    RMI, +    UDP, +    GOSSIP_IN, +    GOSSIP_OUT, +    STATE, +    QUERY +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/RMI.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/RMI.java index 452b214..1a86fc7 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/RMI.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/RMI.java @@ -2,7 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules;  import pl.edu.mimuw.cloudatlas.agent.ApiImplementation;  import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.RMIMessage;  import pl.edu.mimuw.cloudatlas.api.Api;  import pl.edu.mimuw.cloudatlas.interpreter.Main;  import pl.edu.mimuw.cloudatlas.model.ZMI; @@ -12,7 +12,7 @@ import java.rmi.registry.Registry;  import java.rmi.server.UnicastRemoteObject;  public class RMI extends Module { -    public RMI(AgentMessage.AgentModule moduleType) { +    public RMI(ModuleType moduleType) {          super(moduleType);          runRegistry();      } @@ -32,8 +32,6 @@ public class RMI extends Module {          }      } -    @Override -    public void handle(AgentMessage event) throws InterruptedException { -        assert event.getDestinationModule() == event.getCorrectMessageType(); +    public void handleTyped(RMIMessage event) throws InterruptedException {      }  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/TimerScheduler.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/TimerScheduler.java index 9390940..68a9326 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/TimerScheduler.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/TimerScheduler.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules;  import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;  import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;  import java.util.Timer; @@ -17,17 +18,15 @@ import java.util.Timer;  public class TimerScheduler extends Module {      private Timer timer; -    public TimerScheduler(AgentMessage.AgentModule moduleType) { +    public TimerScheduler(ModuleType moduleType) {          super(moduleType); -        assert moduleType == AgentMessage.AgentModule.TIMER_SCHEDULER; +        assert moduleType == ModuleType.TIMER_SCHEDULER;          this.timer = new Timer();          System.out.println("TimerScheduler instance initialized");      }      @Override -    public void handle(AgentMessage event) throws InterruptedException { -        assert event.getDestinationModule() == event.getCorrectMessageType(); -        TimerSchedulerMessage timerEvent = (TimerSchedulerMessage) event; +    public void handleTyped(TimerSchedulerMessage timerEvent) throws InterruptedException {          addTask(timerEvent);      } |