diff options
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent')
5 files changed, 101 insertions, 61 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 690198b..a7d2d55 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -1,19 +1,55 @@ package pl.edu.mimuw.cloudatlas.agent; +import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; +import pl.edu.mimuw.cloudatlas.interpreter.Main; +import pl.edu.mimuw.cloudatlas.model.PathName; +import pl.edu.mimuw.cloudatlas.model.ZMI; + public class Agent { + private static void addZoneAndChildren(ZMI zmi, PathName pathName, EventBus eventBus) { + try { + UpdateAttributesMessage message = new UpdateAttributesMessage("", 0, pathName.toString(), zmi.getAttributes()); + eventBus.addMessage(message); + for (ZMI son : zmi.getSons()) { + addZoneAndChildren(son, pathName.levelDown(son.getAttributes().getOrNull("name").toString()), eventBus); + } + } catch (Exception e) { + System.out.println("ERROR: failed to add zone"); + } + } + + public static void initZones(EventBus eventBus) { + try { + ZMI root = Main.createTestHierarchy2(); + addZoneAndChildren(root, new PathName(""), eventBus); + System.out.println("Initialized with test hierarchy"); + } catch (Exception e) { + System.out.println("ERROR: failed to create test hierarchy"); + } + } + public static void main(String[] args) { AgentConfig agentConfig = new AgentConfig(); agentConfig.runModulesAsThreads(); - agentConfig.runRegistry(); - HierarchyConfig hierarchyConfig = new HierarchyConfig(agentConfig.getEventBus()); - hierarchyConfig.initZones(); + EventBus eventBus = new EventBus(agentConfig.getExecutors()); + agentConfig.runRegistry(eventBus); + agentConfig.startNonModuleThreads(eventBus); + + initZones(eventBus); + // TODO: make query period confiurable with config file and from tests + + // TODO config setup + String zonePath = System.getProperty("zone_path"); + String selectionStrategy = System.getProperty("Gossip.zone_strategy"); Long queryPeriod = Long.getLong("query_period"); - hierarchyConfig.startQueries(queryPeriod); Long gossipPeriod = Long.getLong("gossip_period"); + + HierarchyConfig hierarchyConfig = new HierarchyConfig(eventBus, zonePath, selectionStrategy); + hierarchyConfig.startQueries(queryPeriod); hierarchyConfig.startGossip(gossipPeriod); } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java index b49525e..16bd99a 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java @@ -15,15 +15,14 @@ import java.util.HashMap; import java.util.Map; public class AgentConfig { - private EventBus eventBus; + private HashMap<ModuleType, Executor> executors; + HashMap<ModuleType, Module> modules; - AgentConfig() {} - - public EventBus getEventBus() { - return eventBus; + public HashMap<ModuleType, Executor> getExecutors() { + return executors; } - public void runRegistry() { + public void runRegistry(EventBus eventBus) { try { NewApiImplementation api = new NewApiImplementation(eventBus); Api apiStub = @@ -38,18 +37,20 @@ public class AgentConfig { } private HashMap<ModuleType, Module> initializeModules() throws UnknownHostException, SocketException, NullPointerException { + // TODO config setup + Long freshnessPeriod = Long.getLong("freshness_period"); + Integer port = Integer.getInteger("UDUPServer.port"); + Integer timeout = Integer.getInteger("UDUPServer.timeout"); + Integer bufsize = Integer.getInteger("UDUPServer.bufsize"); + InetAddress serverAddr = InetAddress.getByName(System.getProperty("UDUPServer.hostname")); + HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>(); modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER)); modules.put(ModuleType.RMI, new Remik()); - Long freshnessPeriod = Long.getLong("freshness_period"); modules.put(ModuleType.STATE, new Stanik(freshnessPeriod)); modules.put(ModuleType.QUERY, new Qurnik()); modules.put(ModuleType.GOSSIP, new GossipGirl()); - Integer port = Integer.getInteger("UDUPServer.port"); - Integer timeout = Integer.getInteger("UDUPServer.timeout"); - Integer bufsize = Integer.getInteger("UDUPServer.bufsize"); - InetAddress serverAddr = InetAddress.getByName(System.getProperty("UDUPServer.hostname")); UDUPServer server = new UDUPServer(serverAddr, port, bufsize); modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server)); return modules; @@ -89,8 +90,6 @@ public class AgentConfig { } public void runModulesAsThreads() { - HashMap<ModuleType, Module> modules = null; - try { modules = initializeModules(); } catch (UnknownHostException | SocketException e) { @@ -99,15 +98,15 @@ public class AgentConfig { return; } - HashMap<ModuleType, Executor> executors = initializeExecutors(modules); + executors = initializeExecutors(modules); ArrayList<Thread> executorThreads = initializeExecutorThreads(executors); - eventBus = new EventBus(executors); + } + + void startNonModuleThreads(EventBus eventBus) { Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer()); Thread eventBusThread = new Thread(eventBus); System.out.println("Initializing event bus"); eventBusThread.start(); UDUPServerThread.start(); } - - } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java new file mode 100644 index 0000000..62d4bcd --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java @@ -0,0 +1,19 @@ +package pl.edu.mimuw.cloudatlas.agent; + +import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; +import pl.edu.mimuw.cloudatlas.agent.modules.RecursiveScheduledTask; +import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask; + +import java.util.function.Supplier; + +public class AgentUtils { + + public static void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period, EventBus eventBus) { + TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier); + try { + eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask)); + } catch (InterruptedException e) { + System.out.println("Interrupted while starting queries"); + } + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java index a6d3b2d..a9b25ed 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java @@ -22,6 +22,7 @@ public class EventBus implements Runnable { // Allows for testing with a mock EventBus protected EventBus() { + this.events = new LinkedBlockingQueue<AgentMessage>(); } EventBus(HashMap<ModuleType, Executor> executors) { @@ -30,6 +31,10 @@ public class EventBus implements Runnable { this.events = new LinkedBlockingQueue<AgentMessage>(); } + public void setEvents(LinkedBlockingQueue<AgentMessage> events) { + this.events = events; + } + public void run() { System.out.println("Event bus running"); while (!Thread.currentThread().interrupted()) { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java index 1ee9ac2..04b9b5b 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java @@ -1,13 +1,11 @@ package pl.edu.mimuw.cloudatlas.agent; import pl.edu.mimuw.cloudatlas.agent.messages.*; -import pl.edu.mimuw.cloudatlas.agent.modules.GossipGirl; import pl.edu.mimuw.cloudatlas.agent.modules.GossipGirlStrategies; -import pl.edu.mimuw.cloudatlas.agent.modules.RecursiveScheduledTask; import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask; -import pl.edu.mimuw.cloudatlas.interpreter.Main; import pl.edu.mimuw.cloudatlas.model.*; +import java.lang.reflect.UndeclaredThrowableException; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -18,12 +16,27 @@ public class HierarchyConfig { private Random random = new Random(); private EventBus eventBus; - HierarchyConfig(EventBus eventBus) { - zoneSelectionStrategy = GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ; - gossipGirlStrategies = new GossipGirlStrategies(new PathName("/uw/violet07")); + HierarchyConfig(EventBus eventBus, String zonePath, String zoneStrategy) { + zoneSelectionStrategy = parseStrategy(zoneStrategy); + gossipGirlStrategies = new GossipGirlStrategies(new PathName(zonePath)); this.eventBus = eventBus; } + private GossipGirlStrategies.ZoneSelectionStrategy parseStrategy(String selectionStrategy) { + switch (selectionStrategy) { + case "RoundRobinExp": + return GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_EXP_FREQ; + case "RoundRobinUniform": + return GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ; + case "RandomExp": + return GossipGirlStrategies.ZoneSelectionStrategy.RANDOM_DECR_EXP; + case "RandomUniform": + return GossipGirlStrategies.ZoneSelectionStrategy.RANDOM_UNFIORM; + default: + throw new UnsupportedOperationException("Selection strategy doesnt exist"); + } + } + public void startGossip(long gossipPeriod) { Supplier<TimerScheduledTask> taskSupplier = () -> new TimerScheduledTask() { @@ -47,7 +60,7 @@ public class HierarchyConfig { } }; - startRecursiveTask(taskSupplier, gossipPeriod); + AgentUtils.startRecursiveTask(taskSupplier, gossipPeriod, eventBus); } private ValueContact selectContactFromLevel(PathName path) throws Exception { @@ -122,38 +135,6 @@ public class HierarchyConfig { } } - public void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period) { - TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier); - - try { - this.eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask)); - } catch (InterruptedException e) { - System.out.println("Interrupted while starting queries"); - } - } - - private void addZoneAndChildren(ZMI zmi, PathName pathName) { - try { - UpdateAttributesMessage message = new UpdateAttributesMessage("", 0, pathName.toString(), zmi.getAttributes()); - this.eventBus.addMessage(message); - for (ZMI son : zmi.getSons()) { - addZoneAndChildren(son, pathName.levelDown(son.getAttributes().getOrNull("name").toString())); - } - } catch (Exception e) { - System.out.println("ERROR: failed to add zone"); - } - } - - public void initZones() { - try { - ZMI root = Main.createTestHierarchy2(); - addZoneAndChildren(root, new PathName("")); - System.out.println("Initialized with test hierarchy"); - } catch (Exception e) { - System.out.println("ERROR: failed to create test hierarchy"); - } - } - public void startQueries(long queriesPeriod) { Supplier<TimerScheduledTask> taskSupplier = () -> new TimerScheduledTask() { @@ -166,6 +147,6 @@ public class HierarchyConfig { } }; - startRecursiveTask(taskSupplier, queriesPeriod); + AgentUtils.startRecursiveTask(taskSupplier, queriesPeriod, eventBus); } } |