m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--build.gradle33
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java275
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java112
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java19
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java5
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java152
6 files changed, 338 insertions, 258 deletions
diff --git a/build.gradle b/build.gradle
index 66c50d1..13be045 100644
--- a/build.gradle
+++ b/build.gradle
@@ -30,22 +30,37 @@ ext.gossipPeriod = {
return System.getProperty("gossipPeriod") ?: 5 * 1000
}
-ext.UDUPHostname = {
+ext.UDUPServerHostname = {
return System.getProperty("hostname") ?: "localhost"
}
-ext.port = {
+ext.UDUPServerPort = {
return System.getProperty("port") ?: 5999;
}
-ext.timeout = {
+ext.UDUPServerTimeout = {
return System.getProperty("timeout") ?: 5000;
}
-ext.bufsize = {
+ext.UDUPServerBufsize = {
return System.getProperty("bufsize") ?: 512;
}
+/*
+Possible options:
+RoundRobinExp
+RoundRobinUniform
+RandomExp
+RandomUniform
+ */
+ext.zoneSelectionStrategy = {
+ return System.getProperty("zoneStrategy") ?: "RandomUniform"
+}
+
+ext.zonePath = {
+ return System.getProperty("zonePath") ?: "/uw/violet07"
+}
+
repositories {
// Use jcenter for resolving dependencies.
// You can declare any Maven/Ivy/file repository here.
@@ -89,10 +104,12 @@ task runAgent(type: JavaExec) {
systemProperty 'freshness_period', freshnessPeriod()
systemProperty 'query_period', queryPeriod()
systemProperty 'gossip_period', gossipPeriod()
- systemProperty 'UDUPServer.hostname', UDUPHostname()
- systemProperty 'UDUPServer.port', port()
- systemProperty 'UDUPServer.timeout', port()
- systemProperty 'UDUPServer.bufsize', port()
+ systemProperty 'UDUPServer.hostname', UDUPServerHostname()
+ systemProperty 'UDUPServer.port', UDUPServerPort()
+ systemProperty 'UDUPServer.timeout', UDUPServerTimeout()
+ systemProperty 'UDUPServer.bufsize', UDUPServerBufsize()
+ systemProperty 'Gossip.zone_strategy', zoneSelectionStrategy()
+ systemProperty 'zone_path', zonePath()
}
task runClient(type: JavaExec) {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
index 58b55da..a7d2d55 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
@@ -1,280 +1,55 @@
package pl.edu.mimuw.cloudatlas.agent;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.rmi.server.UnicastRemoteObject;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
-
-import pl.edu.mimuw.cloudatlas.agent.NewApiImplementation;
-import pl.edu.mimuw.cloudatlas.agent.messages.*;
-import pl.edu.mimuw.cloudatlas.agent.modules.*;
-import pl.edu.mimuw.cloudatlas.agent.modules.Module;
-import pl.edu.mimuw.cloudatlas.api.Api;
+import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage;
import pl.edu.mimuw.cloudatlas.interpreter.Main;
import pl.edu.mimuw.cloudatlas.model.PathName;
-import pl.edu.mimuw.cloudatlas.model.Value;
-import pl.edu.mimuw.cloudatlas.model.ValueContact;
-import pl.edu.mimuw.cloudatlas.model.ValueSet;
import pl.edu.mimuw.cloudatlas.model.ZMI;
public class Agent {
- private static EventBus eventBus;
- private static GossipGirlStrategies.ZoneSelectionStrategy zoneSelectionStrategy;
- private static GossipGirlStrategies gossipGirlStrategies;
- private static Random random = new Random();
- public static void runRegistry() {
+ private static void addZoneAndChildren(ZMI zmi, PathName pathName, EventBus eventBus) {
try {
- NewApiImplementation api = new NewApiImplementation(eventBus);
- Api apiStub =
- (Api) UnicastRemoteObject.exportObject(api, 0);
- Registry registry = LocateRegistry.getRegistry();
- registry.rebind("Api", apiStub);
- System.out.println("Agent: api bound");
+ UpdateAttributesMessage message = new UpdateAttributesMessage("", 0, pathName.toString(), zmi.getAttributes());
+ eventBus.addMessage(message);
+ for (ZMI son : zmi.getSons()) {
+ addZoneAndChildren(son, pathName.levelDown(son.getAttributes().getOrNull("name").toString()), eventBus);
+ }
} catch (Exception e) {
- System.err.println("Agent registry initialization exception:");
- e.printStackTrace();
- }
- }
-
- public static HashMap<ModuleType, Module> initializeModules() throws UnknownHostException, SocketException, NullPointerException {
- HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>();
- modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER));
- modules.put(ModuleType.RMI, new Remik());
- Long freshnessPeriod = Long.getLong("freshness_period");
- modules.put(ModuleType.STATE, new Stanik(freshnessPeriod));
- modules.put(ModuleType.QUERY, new Qurnik());
- modules.put(ModuleType.GOSSIP, new GossipGirl());
-
- Integer port = Integer.getInteger("UDUPServer.port");
- Integer timeout = Integer.getInteger("UDUPServer.timeout");
- Integer bufsize = Integer.getInteger("UDUPServer.bufsize");
- UDUPServer server = new UDUPServer(InetAddress.getByName("127.0.0.1"), port, bufsize);
- modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server));
- return modules;
- }
-
- public static HashMap<ModuleType, Executor> initializeExecutors(
- HashMap<ModuleType, Module> modules) {
- HashMap<ModuleType, Executor> executors = new HashMap<ModuleType, Executor>();
-
- for (Map.Entry<ModuleType, Module> moduleEntry : modules.entrySet()) {
- Module module = moduleEntry.getValue();
- Executor executor = new Executor(module);
- executors.put(moduleEntry.getKey(), executor);
- }
-
- return executors;
- }
-
- public static ArrayList<Thread> initializeExecutorThreads(HashMap<ModuleType, Executor> executors) {
- ArrayList<Thread> executorThreads = new ArrayList<Thread>();
-
- for (Map.Entry<ModuleType, Executor> executorEntry : executors.entrySet()) {
- Thread thread = new Thread(executorEntry.getValue());
- thread.setDaemon(true);
- System.out.println("Initializing executor " + executorEntry.getKey());
- thread.start();
- executorThreads.add(thread);
- }
-
- return executorThreads;
- }
-
- public static void closeExecutors(ArrayList<Thread> executorThreads) {
- for (Thread executorThread : executorThreads) {
- executorThread.interrupt();
- }
- }
-
- public static void runModulesAsThreads() {
- HashMap<ModuleType, Module> modules = null;
-
- try {
- modules = initializeModules();
- } catch (UnknownHostException | SocketException e) {
- System.out.println("Module initialization failed");
- e.printStackTrace();
- return;
+ System.out.println("ERROR: failed to add zone");
}
-
- HashMap<ModuleType, Executor> executors = initializeExecutors(modules);
- ArrayList<Thread> executorThreads = initializeExecutorThreads(executors);
- eventBus = new EventBus(executors);
- Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer());
- Thread eventBusThread = new Thread(eventBus);
- System.out.println("Initializing event bus");
- eventBusThread.start();
- UDUPServerThread.start();
}
- private static void initZones() {
+ public static void initZones(EventBus eventBus) {
try {
ZMI root = Main.createTestHierarchy2();
- addZoneAndChildren(root, new PathName(""));
+ addZoneAndChildren(root, new PathName(""), eventBus);
System.out.println("Initialized with test hierarchy");
} catch (Exception e) {
System.out.println("ERROR: failed to create test hierarchy");
}
}
- private static void startQueries(long queriesPeriod) {
- Supplier<TimerScheduledTask> taskSupplier = () ->
- new TimerScheduledTask() {
- public void run() {
- try {
- sendMessage(new RunQueriesMessage("", 0));
- } catch (InterruptedException e) {
- System.out.println("Interrupted while triggering queries");
- }
- }
- };
-
- startRecursiveTask(taskSupplier, queriesPeriod);
- }
-
- private static void startGossip(long gossipPeriod) {
- Supplier<TimerScheduledTask> taskSupplier = () ->
- new TimerScheduledTask() {
- public void run() {
- try {
- System.out.println("INFO: initiating gossip");
- PathName gossipLevel = gossipGirlStrategies.selectStrategy(zoneSelectionStrategy);
- ValueContact contact = selectContactFromLevel(gossipLevel);
- if (contact != null) {
- System.out.println("INFO: found a contact " + contact.toString());
- InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact);
- sendMessage(message);
- } else {
- System.out.println("DEBUG: couldn't find contact for gossip");
- }
- } catch (InterruptedException e) {
- System.out.println("Interrupted while initiating gossip");
- } catch (Exception e) {
- System.out.println("ERROR: something happened " + e.toString());
- }
- }
- };
-
- startRecursiveTask(taskSupplier, gossipPeriod);
- }
-
- private static ValueContact selectContactFromLevel(PathName path) throws Exception {
- CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture();
- eventBus.addMessage(new RequestStateMessage("", 0, responseFuture));
- StateMessage response = (StateMessage) responseFuture.get();
- ZMI root = response.getZMI();
- List<ZMI> siblings = getSiblings(root, path);
- filterEmptyContacts(siblings);
- if (siblings.isEmpty()) {
- return selectFallbackContact();
- }
- ZMI zmi = selectZMI(siblings);
- ValueSet contactsValue = (ValueSet) zmi.getAttributes().getOrNull("contacts");
- Set<Value> valueSet = contactsValue.getValue();
- return selectContactFromSet(valueSet);
- }
-
- private static ValueContact selectFallbackContact() throws Exception {
- return null;
- }
-
- private static ZMI selectZMI(List<ZMI> zmis) throws Exception {
- int i = random.nextInt(zmis.size());
- for (ZMI zmi : zmis) {
- if (i == 0) {
- return zmi;
- }
- i--;
- }
- System.out.println("ERROR: empty list passed to selectZMI");
- throw new Exception("empty list passed to selectZMI");
- }
-
- private static ValueContact selectContactFromSet(Set<Value> contacts) throws Exception {
- int i = random.nextInt(contacts.size());
- for (Value contact : contacts) {
- if (i == 0) {
- return (ValueContact) contact;
- }
- i--;
- }
- System.out.println("ERROR: empty list passed to selectContactFromSet");
- throw new Exception("empty list passed to selectContactFromSet");
- }
-
- private static List<ZMI> getSiblings(ZMI root, PathName path) {
- try {
- List<ZMI> siblingsAndI = root.findDescendant(path).getFather().getSons();
- List<ZMI> siblings = new ArrayList();
- for (ZMI siblingOrI : siblingsAndI) {
- if (!siblingOrI.getPathName().equals(path)) {
- siblings.add(siblingOrI);
- }
- }
- return siblings;
- } catch (ZMI.NoSuchZoneException e) {
- System.out.println("ERROR: didn't find path when looking for siblings");
- return new ArrayList();
- }
- }
-
- private static void filterEmptyContacts(List<ZMI> zmis) {
- Iterator<ZMI> iterator = zmis.iterator();
- while (iterator.hasNext()) {
- ZMI zmi = iterator.next();
- ValueSet contacts = (ValueSet) zmi.getAttributes().getOrNull("contacts");
- if (contacts == null || contacts.isNull() || contacts.isEmpty()) {
- iterator.remove();
- }
- }
- }
+ public static void main(String[] args) {
+ AgentConfig agentConfig = new AgentConfig();
- private static void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period) {
- TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier);
+ agentConfig.runModulesAsThreads();
- try {
- eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask));
- } catch (InterruptedException e) {
- System.out.println("Interrupted while starting queries");
- }
- }
+ EventBus eventBus = new EventBus(agentConfig.getExecutors());
+ agentConfig.runRegistry(eventBus);
+ agentConfig.startNonModuleThreads(eventBus);
- private static void addZoneAndChildren(ZMI zmi, PathName pathName) {
- try {
- UpdateAttributesMessage message = new UpdateAttributesMessage("", 0, pathName.toString(), zmi.getAttributes());
- eventBus.addMessage(message);
- for (ZMI son : zmi.getSons()) {
- addZoneAndChildren(son, pathName.levelDown(son.getAttributes().getOrNull("name").toString()));
- }
- } catch (Exception e) {
- System.out.println("ERROR: failed to add zone");
- }
- }
+ initZones(eventBus);
- public static void main(String[] args) {
- zoneSelectionStrategy = GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ;
- gossipGirlStrategies = new GossipGirlStrategies(new PathName("/uw/violet07"));
- runModulesAsThreads();
- runRegistry();
- initZones();
// TODO: make query period confiurable with config file and from tests
+
+ // TODO config setup
+ String zonePath = System.getProperty("zone_path");
+ String selectionStrategy = System.getProperty("Gossip.zone_strategy");
Long queryPeriod = Long.getLong("query_period");
- startQueries(queryPeriod);
Long gossipPeriod = Long.getLong("gossip_period");
- startGossip(gossipPeriod);
+
+ HierarchyConfig hierarchyConfig = new HierarchyConfig(eventBus, zonePath, selectionStrategy);
+ hierarchyConfig.startQueries(queryPeriod);
+ hierarchyConfig.startGossip(gossipPeriod);
}
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
new file mode 100644
index 0000000..16bd99a
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
@@ -0,0 +1,112 @@
+package pl.edu.mimuw.cloudatlas.agent;
+
+import pl.edu.mimuw.cloudatlas.agent.modules.*;
+import pl.edu.mimuw.cloudatlas.agent.modules.Module;
+import pl.edu.mimuw.cloudatlas.api.Api;
+
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+public class AgentConfig {
+ private HashMap<ModuleType, Executor> executors;
+ HashMap<ModuleType, Module> modules;
+
+ public HashMap<ModuleType, Executor> getExecutors() {
+ return executors;
+ }
+
+ public void runRegistry(EventBus eventBus) {
+ try {
+ NewApiImplementation api = new NewApiImplementation(eventBus);
+ Api apiStub =
+ (Api) UnicastRemoteObject.exportObject(api, 0);
+ Registry registry = LocateRegistry.getRegistry();
+ registry.rebind("Api", apiStub);
+ System.out.println("Agent: api bound");
+ } catch (Exception e) {
+ System.err.println("Agent registry initialization exception:");
+ e.printStackTrace();
+ }
+ }
+
+ private HashMap<ModuleType, Module> initializeModules() throws UnknownHostException, SocketException, NullPointerException {
+ // TODO config setup
+ Long freshnessPeriod = Long.getLong("freshness_period");
+ Integer port = Integer.getInteger("UDUPServer.port");
+ Integer timeout = Integer.getInteger("UDUPServer.timeout");
+ Integer bufsize = Integer.getInteger("UDUPServer.bufsize");
+ InetAddress serverAddr = InetAddress.getByName(System.getProperty("UDUPServer.hostname"));
+
+ HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>();
+ modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER));
+ modules.put(ModuleType.RMI, new Remik());
+ modules.put(ModuleType.STATE, new Stanik(freshnessPeriod));
+ modules.put(ModuleType.QUERY, new Qurnik());
+ modules.put(ModuleType.GOSSIP, new GossipGirl());
+
+ UDUPServer server = new UDUPServer(serverAddr, port, bufsize);
+ modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server));
+ return modules;
+ }
+
+ private HashMap<ModuleType, Executor> initializeExecutors(
+ HashMap<ModuleType, Module> modules) {
+ HashMap<ModuleType, Executor> executors = new HashMap<ModuleType, Executor>();
+
+ for (Map.Entry<ModuleType, Module> moduleEntry : modules.entrySet()) {
+ Module module = moduleEntry.getValue();
+ Executor executor = new Executor(module);
+ executors.put(moduleEntry.getKey(), executor);
+ }
+
+ return executors;
+ }
+
+ private ArrayList<Thread> initializeExecutorThreads(HashMap<ModuleType, Executor> executors) {
+ ArrayList<Thread> executorThreads = new ArrayList<Thread>();
+
+ for (Map.Entry<ModuleType, Executor> executorEntry : executors.entrySet()) {
+ Thread thread = new Thread(executorEntry.getValue());
+ thread.setDaemon(true);
+ System.out.println("Initializing executor " + executorEntry.getKey());
+ thread.start();
+ executorThreads.add(thread);
+ }
+
+ return executorThreads;
+ }
+
+ public void closeExecutors(ArrayList<Thread> executorThreads) {
+ for (Thread executorThread : executorThreads) {
+ executorThread.interrupt();
+ }
+ }
+
+ public void runModulesAsThreads() {
+ try {
+ modules = initializeModules();
+ } catch (UnknownHostException | SocketException e) {
+ System.out.println("Module initialization failed");
+ e.printStackTrace();
+ return;
+ }
+
+ executors = initializeExecutors(modules);
+ ArrayList<Thread> executorThreads = initializeExecutorThreads(executors);
+ }
+
+ void startNonModuleThreads(EventBus eventBus) {
+ Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer());
+ Thread eventBusThread = new Thread(eventBus);
+ System.out.println("Initializing event bus");
+ eventBusThread.start();
+ UDUPServerThread.start();
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java
new file mode 100644
index 0000000..62d4bcd
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java
@@ -0,0 +1,19 @@
+package pl.edu.mimuw.cloudatlas.agent;
+
+import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
+import pl.edu.mimuw.cloudatlas.agent.modules.RecursiveScheduledTask;
+import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask;
+
+import java.util.function.Supplier;
+
+public class AgentUtils {
+
+ public static void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period, EventBus eventBus) {
+ TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier);
+ try {
+ eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask));
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted while starting queries");
+ }
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
index a6d3b2d..a9b25ed 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
@@ -22,6 +22,7 @@ public class EventBus implements Runnable {
// Allows for testing with a mock EventBus
protected EventBus() {
+ this.events = new LinkedBlockingQueue<AgentMessage>();
}
EventBus(HashMap<ModuleType, Executor> executors) {
@@ -30,6 +31,10 @@ public class EventBus implements Runnable {
this.events = new LinkedBlockingQueue<AgentMessage>();
}
+ public void setEvents(LinkedBlockingQueue<AgentMessage> events) {
+ this.events = events;
+ }
+
public void run() {
System.out.println("Event bus running");
while (!Thread.currentThread().interrupted()) {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java
new file mode 100644
index 0000000..04b9b5b
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java
@@ -0,0 +1,152 @@
+package pl.edu.mimuw.cloudatlas.agent;
+
+import pl.edu.mimuw.cloudatlas.agent.messages.*;
+import pl.edu.mimuw.cloudatlas.agent.modules.GossipGirlStrategies;
+import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask;
+import pl.edu.mimuw.cloudatlas.model.*;
+
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+public class HierarchyConfig {
+ private GossipGirlStrategies gossipGirlStrategies;
+ private GossipGirlStrategies.ZoneSelectionStrategy zoneSelectionStrategy;
+ private Random random = new Random();
+ private EventBus eventBus;
+
+ HierarchyConfig(EventBus eventBus, String zonePath, String zoneStrategy) {
+ zoneSelectionStrategy = parseStrategy(zoneStrategy);
+ gossipGirlStrategies = new GossipGirlStrategies(new PathName(zonePath));
+ this.eventBus = eventBus;
+ }
+
+ private GossipGirlStrategies.ZoneSelectionStrategy parseStrategy(String selectionStrategy) {
+ switch (selectionStrategy) {
+ case "RoundRobinExp":
+ return GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_EXP_FREQ;
+ case "RoundRobinUniform":
+ return GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ;
+ case "RandomExp":
+ return GossipGirlStrategies.ZoneSelectionStrategy.RANDOM_DECR_EXP;
+ case "RandomUniform":
+ return GossipGirlStrategies.ZoneSelectionStrategy.RANDOM_UNFIORM;
+ default:
+ throw new UnsupportedOperationException("Selection strategy doesnt exist");
+ }
+ }
+
+ public void startGossip(long gossipPeriod) {
+ Supplier<TimerScheduledTask> taskSupplier = () ->
+ new TimerScheduledTask() {
+ public void run() {
+ try {
+ System.out.println("INFO: initiating gossip");
+ PathName gossipLevel = gossipGirlStrategies.selectStrategy(zoneSelectionStrategy);
+ ValueContact contact = selectContactFromLevel(gossipLevel);
+ if (contact != null) {
+ System.out.println("INFO: found a contact " + contact.toString());
+ InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact);
+ sendMessage(message);
+ } else {
+ System.out.println("DEBUG: couldn't find contact for gossip");
+ }
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted while initiating gossip");
+ } catch (Exception e) {
+ System.out.println("ERROR: something happened " + e.toString());
+ }
+ }
+ };
+
+ AgentUtils.startRecursiveTask(taskSupplier, gossipPeriod, eventBus);
+ }
+
+ private ValueContact selectContactFromLevel(PathName path) throws Exception {
+ CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture();
+ this.eventBus.addMessage(new RequestStateMessage("", 0, responseFuture));
+ StateMessage response = (StateMessage) responseFuture.get();
+ ZMI root = response.getZMI();
+ List<ZMI> siblings = getSiblings(root, path);
+ filterEmptyContacts(siblings);
+ if (siblings.isEmpty()) {
+ return selectFallbackContact();
+ }
+ ZMI zmi = selectZMI(siblings);
+ ValueSet contactsValue = (ValueSet) zmi.getAttributes().getOrNull("contacts");
+ Set<Value> valueSet = contactsValue.getValue();
+ return selectContactFromSet(valueSet);
+ }
+
+ // TODO
+ private ValueContact selectFallbackContact() throws Exception {
+ return null;
+ }
+
+ private ZMI selectZMI(List<ZMI> zmis) throws Exception {
+ int i = random.nextInt(zmis.size());
+ for (ZMI zmi : zmis) {
+ if (i == 0) {
+ return zmi;
+ }
+ i--;
+ }
+ System.out.println("ERROR: empty list passed to selectZMI");
+ throw new Exception("empty list passed to selectZMI");
+ }
+
+ private ValueContact selectContactFromSet(Set<Value> contacts) throws Exception {
+ int i = random.nextInt(contacts.size());
+ for (Value contact : contacts) {
+ if (i == 0) {
+ return (ValueContact) contact;
+ }
+ i--;
+ }
+ System.out.println("ERROR: empty list passed to selectContactFromSet");
+ throw new Exception("empty list passed to selectContactFromSet");
+ }
+
+ private List<ZMI> getSiblings(ZMI root, PathName path) {
+ try {
+ List<ZMI> siblingsAndI = root.findDescendant(path).getFather().getSons();
+ List<ZMI> siblings = new ArrayList();
+ for (ZMI siblingOrI : siblingsAndI) {
+ if (!siblingOrI.getPathName().equals(path)) {
+ siblings.add(siblingOrI);
+ }
+ }
+ return siblings;
+ } catch (ZMI.NoSuchZoneException e) {
+ System.out.println("ERROR: didn't find path when looking for siblings");
+ return new ArrayList();
+ }
+ }
+
+ private void filterEmptyContacts(List<ZMI> zmis) {
+ Iterator<ZMI> iterator = zmis.iterator();
+ while (iterator.hasNext()) {
+ ZMI zmi = iterator.next();
+ ValueSet contacts = (ValueSet) zmi.getAttributes().getOrNull("contacts");
+ if (contacts == null || contacts.isNull() || contacts.isEmpty()) {
+ iterator.remove();
+ }
+ }
+ }
+
+ public void startQueries(long queriesPeriod) {
+ Supplier<TimerScheduledTask> taskSupplier = () ->
+ new TimerScheduledTask() {
+ public void run() {
+ try {
+ sendMessage(new RunQueriesMessage("", 0));
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted while triggering queries");
+ }
+ }
+ };
+
+ AgentUtils.startRecursiveTask(taskSupplier, queriesPeriod, eventBus);
+ }
+}