m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/pl/edu/mimuw/cloudatlas/agent
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java44
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java33
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java19
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java5
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java61
5 files changed, 101 insertions, 61 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
index 690198b..a7d2d55 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
@@ -1,19 +1,55 @@
package pl.edu.mimuw.cloudatlas.agent;
+import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage;
+import pl.edu.mimuw.cloudatlas.interpreter.Main;
+import pl.edu.mimuw.cloudatlas.model.PathName;
+import pl.edu.mimuw.cloudatlas.model.ZMI;
+
public class Agent {
+ private static void addZoneAndChildren(ZMI zmi, PathName pathName, EventBus eventBus) {
+ try {
+ UpdateAttributesMessage message = new UpdateAttributesMessage("", 0, pathName.toString(), zmi.getAttributes());
+ eventBus.addMessage(message);
+ for (ZMI son : zmi.getSons()) {
+ addZoneAndChildren(son, pathName.levelDown(son.getAttributes().getOrNull("name").toString()), eventBus);
+ }
+ } catch (Exception e) {
+ System.out.println("ERROR: failed to add zone");
+ }
+ }
+
+ public static void initZones(EventBus eventBus) {
+ try {
+ ZMI root = Main.createTestHierarchy2();
+ addZoneAndChildren(root, new PathName(""), eventBus);
+ System.out.println("Initialized with test hierarchy");
+ } catch (Exception e) {
+ System.out.println("ERROR: failed to create test hierarchy");
+ }
+ }
+
public static void main(String[] args) {
AgentConfig agentConfig = new AgentConfig();
agentConfig.runModulesAsThreads();
- agentConfig.runRegistry();
- HierarchyConfig hierarchyConfig = new HierarchyConfig(agentConfig.getEventBus());
- hierarchyConfig.initZones();
+ EventBus eventBus = new EventBus(agentConfig.getExecutors());
+ agentConfig.runRegistry(eventBus);
+ agentConfig.startNonModuleThreads(eventBus);
+
+ initZones(eventBus);
+
// TODO: make query period confiurable with config file and from tests
+
+ // TODO config setup
+ String zonePath = System.getProperty("zone_path");
+ String selectionStrategy = System.getProperty("Gossip.zone_strategy");
Long queryPeriod = Long.getLong("query_period");
- hierarchyConfig.startQueries(queryPeriod);
Long gossipPeriod = Long.getLong("gossip_period");
+
+ HierarchyConfig hierarchyConfig = new HierarchyConfig(eventBus, zonePath, selectionStrategy);
+ hierarchyConfig.startQueries(queryPeriod);
hierarchyConfig.startGossip(gossipPeriod);
}
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
index b49525e..16bd99a 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
@@ -15,15 +15,14 @@ import java.util.HashMap;
import java.util.Map;
public class AgentConfig {
- private EventBus eventBus;
+ private HashMap<ModuleType, Executor> executors;
+ HashMap<ModuleType, Module> modules;
- AgentConfig() {}
-
- public EventBus getEventBus() {
- return eventBus;
+ public HashMap<ModuleType, Executor> getExecutors() {
+ return executors;
}
- public void runRegistry() {
+ public void runRegistry(EventBus eventBus) {
try {
NewApiImplementation api = new NewApiImplementation(eventBus);
Api apiStub =
@@ -38,18 +37,20 @@ public class AgentConfig {
}
private HashMap<ModuleType, Module> initializeModules() throws UnknownHostException, SocketException, NullPointerException {
+ // TODO config setup
+ Long freshnessPeriod = Long.getLong("freshness_period");
+ Integer port = Integer.getInteger("UDUPServer.port");
+ Integer timeout = Integer.getInteger("UDUPServer.timeout");
+ Integer bufsize = Integer.getInteger("UDUPServer.bufsize");
+ InetAddress serverAddr = InetAddress.getByName(System.getProperty("UDUPServer.hostname"));
+
HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>();
modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER));
modules.put(ModuleType.RMI, new Remik());
- Long freshnessPeriod = Long.getLong("freshness_period");
modules.put(ModuleType.STATE, new Stanik(freshnessPeriod));
modules.put(ModuleType.QUERY, new Qurnik());
modules.put(ModuleType.GOSSIP, new GossipGirl());
- Integer port = Integer.getInteger("UDUPServer.port");
- Integer timeout = Integer.getInteger("UDUPServer.timeout");
- Integer bufsize = Integer.getInteger("UDUPServer.bufsize");
- InetAddress serverAddr = InetAddress.getByName(System.getProperty("UDUPServer.hostname"));
UDUPServer server = new UDUPServer(serverAddr, port, bufsize);
modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server));
return modules;
@@ -89,8 +90,6 @@ public class AgentConfig {
}
public void runModulesAsThreads() {
- HashMap<ModuleType, Module> modules = null;
-
try {
modules = initializeModules();
} catch (UnknownHostException | SocketException e) {
@@ -99,15 +98,15 @@ public class AgentConfig {
return;
}
- HashMap<ModuleType, Executor> executors = initializeExecutors(modules);
+ executors = initializeExecutors(modules);
ArrayList<Thread> executorThreads = initializeExecutorThreads(executors);
- eventBus = new EventBus(executors);
+ }
+
+ void startNonModuleThreads(EventBus eventBus) {
Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer());
Thread eventBusThread = new Thread(eventBus);
System.out.println("Initializing event bus");
eventBusThread.start();
UDUPServerThread.start();
}
-
-
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java
new file mode 100644
index 0000000..62d4bcd
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java
@@ -0,0 +1,19 @@
+package pl.edu.mimuw.cloudatlas.agent;
+
+import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
+import pl.edu.mimuw.cloudatlas.agent.modules.RecursiveScheduledTask;
+import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask;
+
+import java.util.function.Supplier;
+
+public class AgentUtils {
+
+ public static void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period, EventBus eventBus) {
+ TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier);
+ try {
+ eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask));
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted while starting queries");
+ }
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
index a6d3b2d..a9b25ed 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
@@ -22,6 +22,7 @@ public class EventBus implements Runnable {
// Allows for testing with a mock EventBus
protected EventBus() {
+ this.events = new LinkedBlockingQueue<AgentMessage>();
}
EventBus(HashMap<ModuleType, Executor> executors) {
@@ -30,6 +31,10 @@ public class EventBus implements Runnable {
this.events = new LinkedBlockingQueue<AgentMessage>();
}
+ public void setEvents(LinkedBlockingQueue<AgentMessage> events) {
+ this.events = events;
+ }
+
public void run() {
System.out.println("Event bus running");
while (!Thread.currentThread().interrupted()) {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java
index 1ee9ac2..04b9b5b 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java
@@ -1,13 +1,11 @@
package pl.edu.mimuw.cloudatlas.agent;
import pl.edu.mimuw.cloudatlas.agent.messages.*;
-import pl.edu.mimuw.cloudatlas.agent.modules.GossipGirl;
import pl.edu.mimuw.cloudatlas.agent.modules.GossipGirlStrategies;
-import pl.edu.mimuw.cloudatlas.agent.modules.RecursiveScheduledTask;
import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask;
-import pl.edu.mimuw.cloudatlas.interpreter.Main;
import pl.edu.mimuw.cloudatlas.model.*;
+import java.lang.reflect.UndeclaredThrowableException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
@@ -18,12 +16,27 @@ public class HierarchyConfig {
private Random random = new Random();
private EventBus eventBus;
- HierarchyConfig(EventBus eventBus) {
- zoneSelectionStrategy = GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ;
- gossipGirlStrategies = new GossipGirlStrategies(new PathName("/uw/violet07"));
+ HierarchyConfig(EventBus eventBus, String zonePath, String zoneStrategy) {
+ zoneSelectionStrategy = parseStrategy(zoneStrategy);
+ gossipGirlStrategies = new GossipGirlStrategies(new PathName(zonePath));
this.eventBus = eventBus;
}
+ private GossipGirlStrategies.ZoneSelectionStrategy parseStrategy(String selectionStrategy) {
+ switch (selectionStrategy) {
+ case "RoundRobinExp":
+ return GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_EXP_FREQ;
+ case "RoundRobinUniform":
+ return GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ;
+ case "RandomExp":
+ return GossipGirlStrategies.ZoneSelectionStrategy.RANDOM_DECR_EXP;
+ case "RandomUniform":
+ return GossipGirlStrategies.ZoneSelectionStrategy.RANDOM_UNFIORM;
+ default:
+ throw new UnsupportedOperationException("Selection strategy doesnt exist");
+ }
+ }
+
public void startGossip(long gossipPeriod) {
Supplier<TimerScheduledTask> taskSupplier = () ->
new TimerScheduledTask() {
@@ -47,7 +60,7 @@ public class HierarchyConfig {
}
};
- startRecursiveTask(taskSupplier, gossipPeriod);
+ AgentUtils.startRecursiveTask(taskSupplier, gossipPeriod, eventBus);
}
private ValueContact selectContactFromLevel(PathName path) throws Exception {
@@ -122,38 +135,6 @@ public class HierarchyConfig {
}
}
- public void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period) {
- TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier);
-
- try {
- this.eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask));
- } catch (InterruptedException e) {
- System.out.println("Interrupted while starting queries");
- }
- }
-
- private void addZoneAndChildren(ZMI zmi, PathName pathName) {
- try {
- UpdateAttributesMessage message = new UpdateAttributesMessage("", 0, pathName.toString(), zmi.getAttributes());
- this.eventBus.addMessage(message);
- for (ZMI son : zmi.getSons()) {
- addZoneAndChildren(son, pathName.levelDown(son.getAttributes().getOrNull("name").toString()));
- }
- } catch (Exception e) {
- System.out.println("ERROR: failed to add zone");
- }
- }
-
- public void initZones() {
- try {
- ZMI root = Main.createTestHierarchy2();
- addZoneAndChildren(root, new PathName(""));
- System.out.println("Initialized with test hierarchy");
- } catch (Exception e) {
- System.out.println("ERROR: failed to create test hierarchy");
- }
- }
-
public void startQueries(long queriesPeriod) {
Supplier<TimerScheduledTask> taskSupplier = () ->
new TimerScheduledTask() {
@@ -166,6 +147,6 @@ public class HierarchyConfig {
}
};
- startRecursiveTask(taskSupplier, queriesPeriod);
+ AgentUtils.startRecursiveTask(taskSupplier, queriesPeriod, eventBus);
}
}