m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/pl
diff options
context:
space:
mode:
authorMarcin Chrzanowski <marcin.j.chrzanowski@gmail.com>2020-01-12 16:06:34 +0100
committerMarcin Chrzanowski <marcin.j.chrzanowski@gmail.com>2020-01-12 16:06:34 +0100
commit6710ea855694dfc6486604d06f046a90f69f1373 (patch)
treee6751ace086a53631109f2b8f5d458c2bed45ac5 /src/main/java/pl
parentfd554f89eeff6ffb3dcd80447c4284c976090e9c (diff)
parent100e3d23b47d9d772d64dd0c7e596cd20de218b9 (diff)
Merge branch 'master' into gossip-gc
Diffstat (limited to 'src/main/java/pl')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java275
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java112
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java19
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java6
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java156
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java10
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java6
7 files changed, 331 insertions, 253 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
index 58b55da..0cbda2d 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
@@ -1,280 +1,55 @@
package pl.edu.mimuw.cloudatlas.agent;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.rmi.server.UnicastRemoteObject;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
-
-import pl.edu.mimuw.cloudatlas.agent.NewApiImplementation;
-import pl.edu.mimuw.cloudatlas.agent.messages.*;
-import pl.edu.mimuw.cloudatlas.agent.modules.*;
-import pl.edu.mimuw.cloudatlas.agent.modules.Module;
-import pl.edu.mimuw.cloudatlas.api.Api;
+import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage;
import pl.edu.mimuw.cloudatlas.interpreter.Main;
import pl.edu.mimuw.cloudatlas.model.PathName;
-import pl.edu.mimuw.cloudatlas.model.Value;
-import pl.edu.mimuw.cloudatlas.model.ValueContact;
-import pl.edu.mimuw.cloudatlas.model.ValueSet;
import pl.edu.mimuw.cloudatlas.model.ZMI;
public class Agent {
- private static EventBus eventBus;
- private static GossipGirlStrategies.ZoneSelectionStrategy zoneSelectionStrategy;
- private static GossipGirlStrategies gossipGirlStrategies;
- private static Random random = new Random();
- public static void runRegistry() {
+ private static void addZoneAndChildren(ZMI zmi, PathName pathName, EventBus eventBus) {
try {
- NewApiImplementation api = new NewApiImplementation(eventBus);
- Api apiStub =
- (Api) UnicastRemoteObject.exportObject(api, 0);
- Registry registry = LocateRegistry.getRegistry();
- registry.rebind("Api", apiStub);
- System.out.println("Agent: api bound");
+ UpdateAttributesMessage message = new UpdateAttributesMessage("", 0, pathName.toString(), zmi.getAttributes());
+ eventBus.addMessage(message);
+ for (ZMI son : zmi.getSons()) {
+ addZoneAndChildren(son, pathName.levelDown(son.getAttributes().getOrNull("name").toString()), eventBus);
+ }
} catch (Exception e) {
- System.err.println("Agent registry initialization exception:");
- e.printStackTrace();
- }
- }
-
- public static HashMap<ModuleType, Module> initializeModules() throws UnknownHostException, SocketException, NullPointerException {
- HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>();
- modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER));
- modules.put(ModuleType.RMI, new Remik());
- Long freshnessPeriod = Long.getLong("freshness_period");
- modules.put(ModuleType.STATE, new Stanik(freshnessPeriod));
- modules.put(ModuleType.QUERY, new Qurnik());
- modules.put(ModuleType.GOSSIP, new GossipGirl());
-
- Integer port = Integer.getInteger("UDUPServer.port");
- Integer timeout = Integer.getInteger("UDUPServer.timeout");
- Integer bufsize = Integer.getInteger("UDUPServer.bufsize");
- UDUPServer server = new UDUPServer(InetAddress.getByName("127.0.0.1"), port, bufsize);
- modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server));
- return modules;
- }
-
- public static HashMap<ModuleType, Executor> initializeExecutors(
- HashMap<ModuleType, Module> modules) {
- HashMap<ModuleType, Executor> executors = new HashMap<ModuleType, Executor>();
-
- for (Map.Entry<ModuleType, Module> moduleEntry : modules.entrySet()) {
- Module module = moduleEntry.getValue();
- Executor executor = new Executor(module);
- executors.put(moduleEntry.getKey(), executor);
- }
-
- return executors;
- }
-
- public static ArrayList<Thread> initializeExecutorThreads(HashMap<ModuleType, Executor> executors) {
- ArrayList<Thread> executorThreads = new ArrayList<Thread>();
-
- for (Map.Entry<ModuleType, Executor> executorEntry : executors.entrySet()) {
- Thread thread = new Thread(executorEntry.getValue());
- thread.setDaemon(true);
- System.out.println("Initializing executor " + executorEntry.getKey());
- thread.start();
- executorThreads.add(thread);
- }
-
- return executorThreads;
- }
-
- public static void closeExecutors(ArrayList<Thread> executorThreads) {
- for (Thread executorThread : executorThreads) {
- executorThread.interrupt();
- }
- }
-
- public static void runModulesAsThreads() {
- HashMap<ModuleType, Module> modules = null;
-
- try {
- modules = initializeModules();
- } catch (UnknownHostException | SocketException e) {
- System.out.println("Module initialization failed");
- e.printStackTrace();
- return;
+ System.out.println("ERROR: failed to add zone");
}
-
- HashMap<ModuleType, Executor> executors = initializeExecutors(modules);
- ArrayList<Thread> executorThreads = initializeExecutorThreads(executors);
- eventBus = new EventBus(executors);
- Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer());
- Thread eventBusThread = new Thread(eventBus);
- System.out.println("Initializing event bus");
- eventBusThread.start();
- UDUPServerThread.start();
}
- private static void initZones() {
+ public static void initZones(EventBus eventBus) {
try {
ZMI root = Main.createTestHierarchy2();
- addZoneAndChildren(root, new PathName(""));
+ addZoneAndChildren(root, new PathName(""), eventBus);
System.out.println("Initialized with test hierarchy");
} catch (Exception e) {
System.out.println("ERROR: failed to create test hierarchy");
}
}
- private static void startQueries(long queriesPeriod) {
- Supplier<TimerScheduledTask> taskSupplier = () ->
- new TimerScheduledTask() {
- public void run() {
- try {
- sendMessage(new RunQueriesMessage("", 0));
- } catch (InterruptedException e) {
- System.out.println("Interrupted while triggering queries");
- }
- }
- };
-
- startRecursiveTask(taskSupplier, queriesPeriod);
- }
-
- private static void startGossip(long gossipPeriod) {
- Supplier<TimerScheduledTask> taskSupplier = () ->
- new TimerScheduledTask() {
- public void run() {
- try {
- System.out.println("INFO: initiating gossip");
- PathName gossipLevel = gossipGirlStrategies.selectStrategy(zoneSelectionStrategy);
- ValueContact contact = selectContactFromLevel(gossipLevel);
- if (contact != null) {
- System.out.println("INFO: found a contact " + contact.toString());
- InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact);
- sendMessage(message);
- } else {
- System.out.println("DEBUG: couldn't find contact for gossip");
- }
- } catch (InterruptedException e) {
- System.out.println("Interrupted while initiating gossip");
- } catch (Exception e) {
- System.out.println("ERROR: something happened " + e.toString());
- }
- }
- };
-
- startRecursiveTask(taskSupplier, gossipPeriod);
- }
-
- private static ValueContact selectContactFromLevel(PathName path) throws Exception {
- CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture();
- eventBus.addMessage(new RequestStateMessage("", 0, responseFuture));
- StateMessage response = (StateMessage) responseFuture.get();
- ZMI root = response.getZMI();
- List<ZMI> siblings = getSiblings(root, path);
- filterEmptyContacts(siblings);
- if (siblings.isEmpty()) {
- return selectFallbackContact();
- }
- ZMI zmi = selectZMI(siblings);
- ValueSet contactsValue = (ValueSet) zmi.getAttributes().getOrNull("contacts");
- Set<Value> valueSet = contactsValue.getValue();
- return selectContactFromSet(valueSet);
- }
-
- private static ValueContact selectFallbackContact() throws Exception {
- return null;
- }
-
- private static ZMI selectZMI(List<ZMI> zmis) throws Exception {
- int i = random.nextInt(zmis.size());
- for (ZMI zmi : zmis) {
- if (i == 0) {
- return zmi;
- }
- i--;
- }
- System.out.println("ERROR: empty list passed to selectZMI");
- throw new Exception("empty list passed to selectZMI");
- }
-
- private static ValueContact selectContactFromSet(Set<Value> contacts) throws Exception {
- int i = random.nextInt(contacts.size());
- for (Value contact : contacts) {
- if (i == 0) {
- return (ValueContact) contact;
- }
- i--;
- }
- System.out.println("ERROR: empty list passed to selectContactFromSet");
- throw new Exception("empty list passed to selectContactFromSet");
- }
-
- private static List<ZMI> getSiblings(ZMI root, PathName path) {
- try {
- List<ZMI> siblingsAndI = root.findDescendant(path).getFather().getSons();
- List<ZMI> siblings = new ArrayList();
- for (ZMI siblingOrI : siblingsAndI) {
- if (!siblingOrI.getPathName().equals(path)) {
- siblings.add(siblingOrI);
- }
- }
- return siblings;
- } catch (ZMI.NoSuchZoneException e) {
- System.out.println("ERROR: didn't find path when looking for siblings");
- return new ArrayList();
- }
- }
-
- private static void filterEmptyContacts(List<ZMI> zmis) {
- Iterator<ZMI> iterator = zmis.iterator();
- while (iterator.hasNext()) {
- ZMI zmi = iterator.next();
- ValueSet contacts = (ValueSet) zmi.getAttributes().getOrNull("contacts");
- if (contacts == null || contacts.isNull() || contacts.isEmpty()) {
- iterator.remove();
- }
- }
- }
+ public static void main(String[] args) {
+ AgentConfig agentConfig = new AgentConfig();
- private static void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period) {
- TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier);
+ agentConfig.runModulesAsThreads();
- try {
- eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask));
- } catch (InterruptedException e) {
- System.out.println("Interrupted while starting queries");
- }
- }
+ EventBus eventBus = new EventBus(agentConfig.getExecutors());
+ agentConfig.runRegistry(eventBus);
+ agentConfig.startNonModuleThreads(eventBus);
- private static void addZoneAndChildren(ZMI zmi, PathName pathName) {
- try {
- UpdateAttributesMessage message = new UpdateAttributesMessage("", 0, pathName.toString(), zmi.getAttributes());
- eventBus.addMessage(message);
- for (ZMI son : zmi.getSons()) {
- addZoneAndChildren(son, pathName.levelDown(son.getAttributes().getOrNull("name").toString()));
- }
- } catch (Exception e) {
- System.out.println("ERROR: failed to add zone");
- }
- }
+ // initZones(eventBus);
- public static void main(String[] args) {
- zoneSelectionStrategy = GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ;
- gossipGirlStrategies = new GossipGirlStrategies(new PathName("/uw/violet07"));
- runModulesAsThreads();
- runRegistry();
- initZones();
// TODO: make query period confiurable with config file and from tests
+
+ // TODO config setup
+ String zonePath = System.getProperty("zone_path");
+ String selectionStrategy = System.getProperty("Gossip.zone_strategy");
Long queryPeriod = Long.getLong("query_period");
- startQueries(queryPeriod);
Long gossipPeriod = Long.getLong("gossip_period");
- startGossip(gossipPeriod);
+
+ HierarchyConfig hierarchyConfig = new HierarchyConfig(eventBus, zonePath, selectionStrategy);
+ hierarchyConfig.startQueries(queryPeriod);
+ hierarchyConfig.startGossip(gossipPeriod);
}
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
new file mode 100644
index 0000000..38d764a
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
@@ -0,0 +1,112 @@
+package pl.edu.mimuw.cloudatlas.agent;
+
+import pl.edu.mimuw.cloudatlas.agent.modules.*;
+import pl.edu.mimuw.cloudatlas.agent.modules.Module;
+import pl.edu.mimuw.cloudatlas.api.Api;
+
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+public class AgentConfig {
+ private HashMap<ModuleType, Executor> executors;
+ HashMap<ModuleType, Module> modules;
+
+ public HashMap<ModuleType, Executor> getExecutors() {
+ return executors;
+ }
+
+ public void runRegistry(EventBus eventBus) {
+ try {
+ NewApiImplementation api = new NewApiImplementation(eventBus);
+ Api apiStub =
+ (Api) UnicastRemoteObject.exportObject(api, 0);
+ Registry registry = LocateRegistry.getRegistry();
+ registry.rebind("Api", apiStub);
+ System.out.println("Agent: api bound");
+ } catch (Exception e) {
+ System.err.println("Agent registry initialization exception:");
+ e.printStackTrace();
+ }
+ }
+
+ private HashMap<ModuleType, Module> initializeModules() throws UnknownHostException, SocketException, NullPointerException {
+ // TODO config setup
+ Long freshnessPeriod = Long.getLong("freshness_period");
+ Integer port = Integer.getInteger("UDUPServer.port");
+ Integer timeout = Integer.getInteger("UDUPServer.timeout");
+ Integer bufsize = Integer.getInteger("UDUPServer.bufsize");
+ InetAddress serverAddr = InetAddress.getByName(System.getProperty("UDUPServer.hostname"));
+
+ HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>();
+ modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER));
+ modules.put(ModuleType.RMI, new Remik());
+ modules.put(ModuleType.STATE, new Stanik(freshnessPeriod));
+ modules.put(ModuleType.QUERY, new Qurnik());
+ modules.put(ModuleType.GOSSIP, new GossipGirl());
+
+ UDUPServer server = new UDUPServer(serverAddr, port, bufsize);
+ modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server));
+ return modules;
+ }
+
+ public static HashMap<ModuleType, Executor> initializeExecutors(
+ HashMap<ModuleType, Module> modules) {
+ HashMap<ModuleType, Executor> executors = new HashMap<ModuleType, Executor>();
+
+ for (Map.Entry<ModuleType, Module> moduleEntry : modules.entrySet()) {
+ Module module = moduleEntry.getValue();
+ Executor executor = new Executor(module);
+ executors.put(moduleEntry.getKey(), executor);
+ }
+
+ return executors;
+ }
+
+ public static ArrayList<Thread> initializeExecutorThreads(HashMap<ModuleType, Executor> executors) {
+ ArrayList<Thread> executorThreads = new ArrayList<Thread>();
+
+ for (Map.Entry<ModuleType, Executor> executorEntry : executors.entrySet()) {
+ Thread thread = new Thread(executorEntry.getValue());
+ thread.setDaemon(true);
+ System.out.println("Initializing executor " + executorEntry.getKey());
+ thread.start();
+ executorThreads.add(thread);
+ }
+
+ return executorThreads;
+ }
+
+ public void closeExecutors(ArrayList<Thread> executorThreads) {
+ for (Thread executorThread : executorThreads) {
+ executorThread.interrupt();
+ }
+ }
+
+ public void runModulesAsThreads() {
+ try {
+ modules = initializeModules();
+ } catch (UnknownHostException | SocketException e) {
+ System.out.println("Module initialization failed");
+ e.printStackTrace();
+ return;
+ }
+
+ executors = initializeExecutors(modules);
+ ArrayList<Thread> executorThreads = initializeExecutorThreads(executors);
+ }
+
+ void startNonModuleThreads(EventBus eventBus) {
+ Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer());
+ Thread eventBusThread = new Thread(eventBus);
+ System.out.println("Initializing event bus");
+ eventBusThread.start();
+ UDUPServerThread.start();
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java
new file mode 100644
index 0000000..62d4bcd
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java
@@ -0,0 +1,19 @@
+package pl.edu.mimuw.cloudatlas.agent;
+
+import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
+import pl.edu.mimuw.cloudatlas.agent.modules.RecursiveScheduledTask;
+import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask;
+
+import java.util.function.Supplier;
+
+public class AgentUtils {
+
+ public static void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period, EventBus eventBus) {
+ TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier);
+ try {
+ eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask));
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted while starting queries");
+ }
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
index a6d3b2d..0e972f4 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
@@ -22,6 +22,7 @@ public class EventBus implements Runnable {
// Allows for testing with a mock EventBus
protected EventBus() {
+ this.events = new LinkedBlockingQueue<AgentMessage>();
}
EventBus(HashMap<ModuleType, Executor> executors) {
@@ -30,6 +31,10 @@ public class EventBus implements Runnable {
this.events = new LinkedBlockingQueue<AgentMessage>();
}
+ public void setEvents(LinkedBlockingQueue<AgentMessage> events) {
+ this.events = events;
+ }
+
public void run() {
System.out.println("Event bus running");
while (!Thread.currentThread().interrupted()) {
@@ -49,6 +54,7 @@ public class EventBus implements Runnable {
}
public void addMessage(AgentMessage msg) throws InterruptedException {
+ System.out.println("INFO: message added for " + msg.getDestinationModule().toString());
this.events.put(msg);
}
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java
new file mode 100644
index 0000000..dc5241d
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java
@@ -0,0 +1,156 @@
+package pl.edu.mimuw.cloudatlas.agent;
+
+import pl.edu.mimuw.cloudatlas.agent.messages.*;
+import pl.edu.mimuw.cloudatlas.agent.modules.GossipGirlStrategies;
+import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask;
+import pl.edu.mimuw.cloudatlas.model.*;
+
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+public class HierarchyConfig {
+ private GossipGirlStrategies gossipGirlStrategies;
+ private GossipGirlStrategies.ZoneSelectionStrategy zoneSelectionStrategy;
+ private Random random = new Random();
+ private EventBus eventBus;
+
+ HierarchyConfig(EventBus eventBus, String zonePath, String zoneStrategy) {
+ zoneSelectionStrategy = parseStrategy(zoneStrategy);
+ gossipGirlStrategies = new GossipGirlStrategies(new PathName(zonePath));
+ this.eventBus = eventBus;
+ }
+
+ private GossipGirlStrategies.ZoneSelectionStrategy parseStrategy(String selectionStrategy) {
+ switch (selectionStrategy) {
+ case "RoundRobinExp":
+ return GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_EXP_FREQ;
+ case "RoundRobinUniform":
+ return GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ;
+ case "RandomExp":
+ return GossipGirlStrategies.ZoneSelectionStrategy.RANDOM_DECR_EXP;
+ case "RandomUniform":
+ return GossipGirlStrategies.ZoneSelectionStrategy.RANDOM_UNFIORM;
+ default:
+ throw new UnsupportedOperationException("Selection strategy doesnt exist");
+ }
+ }
+
+ public void startGossip(long gossipPeriod) {
+ Supplier<TimerScheduledTask> taskSupplier = () ->
+ new TimerScheduledTask() {
+ public void run() {
+ try {
+ System.out.println("INFO: initiating gossip");
+ PathName gossipLevel = gossipGirlStrategies.selectStrategy(zoneSelectionStrategy);
+ ValueContact contact = selectContactFromLevel(gossipLevel);
+ if (contact != null) {
+ System.out.println("INFO: found a contact " + contact.toString());
+ InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact);
+ sendMessage(message);
+ } else {
+ System.out.println("DEBUG: couldn't find contact for gossip");
+ }
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted while initiating gossip");
+ } catch (Exception e) {
+ System.out.println("ERROR: something happened " + e.toString());
+ }
+ }
+ };
+
+ AgentUtils.startRecursiveTask(taskSupplier, gossipPeriod, eventBus);
+ }
+
+ private ValueContact selectContactFromLevel(PathName path) throws Exception {
+ CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture();
+ this.eventBus.addMessage(new RequestStateMessage("", 0, responseFuture));
+ StateMessage response = (StateMessage) responseFuture.get();
+ ZMI root = response.getZMI();
+ List<ZMI> siblings = getSiblings(root, path);
+ filterEmptyContacts(siblings);
+ if (siblings.isEmpty()) {
+ return selectFallbackContact(response.getContacts());
+ }
+ ZMI zmi = selectZMI(siblings);
+ ValueSet contactsValue = (ValueSet) zmi.getAttributes().getOrNull("contacts");
+ Set<Value> valueSet = contactsValue.getValue();
+ return selectContactFromSet(valueSet);
+ }
+
+ // TODO
+ private ValueContact selectFallbackContact(Set<ValueContact> contacts) throws Exception {
+ if (contacts.isEmpty()) {
+ return null;
+ } else {
+ return selectContactFromSet(contacts);
+ }
+ }
+
+ private ZMI selectZMI(List<ZMI> zmis) throws Exception {
+ int i = random.nextInt(zmis.size());
+ for (ZMI zmi : zmis) {
+ if (i == 0) {
+ return zmi;
+ }
+ i--;
+ }
+ System.out.println("ERROR: empty list passed to selectZMI");
+ throw new Exception("empty list passed to selectZMI");
+ }
+
+ private <T> ValueContact selectContactFromSet(Set<T> contacts) throws Exception {
+ int i = random.nextInt(contacts.size());
+ for (T contact : contacts) {
+ if (i == 0) {
+ return (ValueContact) contact;
+ }
+ i--;
+ }
+ System.out.println("ERROR: empty list passed to selectContactFromSet");
+ throw new Exception("empty list passed to selectContactFromSet");
+ }
+
+ private List<ZMI> getSiblings(ZMI root, PathName path) {
+ try {
+ List<ZMI> siblingsAndI = root.findDescendant(path).getFather().getSons();
+ List<ZMI> siblings = new ArrayList();
+ for (ZMI siblingOrI : siblingsAndI) {
+ if (!siblingOrI.getPathName().equals(path)) {
+ siblings.add(siblingOrI);
+ }
+ }
+ return siblings;
+ } catch (ZMI.NoSuchZoneException e) {
+ System.out.println("ERROR: didn't find path when looking for siblings");
+ return new ArrayList();
+ }
+ }
+
+ private void filterEmptyContacts(List<ZMI> zmis) {
+ Iterator<ZMI> iterator = zmis.iterator();
+ while (iterator.hasNext()) {
+ ZMI zmi = iterator.next();
+ ValueSet contacts = (ValueSet) zmi.getAttributes().getOrNull("contacts");
+ if (contacts == null || contacts.isNull() || contacts.isEmpty()) {
+ iterator.remove();
+ }
+ }
+ }
+
+ public void startQueries(long queriesPeriod) {
+ Supplier<TimerScheduledTask> taskSupplier = () ->
+ new TimerScheduledTask() {
+ public void run() {
+ try {
+ sendMessage(new RunQueriesMessage("", 0));
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted while triggering queries");
+ }
+ }
+ };
+
+ AgentUtils.startRecursiveTask(taskSupplier, queriesPeriod, eventBus);
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java
index 759723b..806d41f 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java
@@ -2,9 +2,11 @@ package pl.edu.mimuw.cloudatlas.agent.messages;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
import pl.edu.mimuw.cloudatlas.model.Attribute;
+import pl.edu.mimuw.cloudatlas.model.ValueContact;
import pl.edu.mimuw.cloudatlas.model.ValueQuery;
import pl.edu.mimuw.cloudatlas.model.ValueTime;
import pl.edu.mimuw.cloudatlas.model.ZMI;
@@ -12,11 +14,13 @@ import pl.edu.mimuw.cloudatlas.model.ZMI;
public class StateMessage extends ResponseMessage {
private ZMI zmi;
private Map<Attribute, Entry<ValueQuery, ValueTime>> queries;
+ private Set<ValueContact> contacts;
- public StateMessage(String messageId, ModuleType destinationModule, long timestamp, long requestId, ZMI zmi, Map<Attribute, Entry<ValueQuery, ValueTime>> queries) {
+ public StateMessage(String messageId, ModuleType destinationModule, long timestamp, long requestId, ZMI zmi, Map<Attribute, Entry<ValueQuery, ValueTime>> queries, Set<ValueContact> contacts) {
super(messageId, destinationModule, timestamp, Type.STATE, requestId);
this.zmi = zmi;
this.queries = queries;
+ this.contacts = contacts;
}
public StateMessage() {}
@@ -28,4 +32,8 @@ public class StateMessage extends ResponseMessage {
public Map<Attribute, Entry<ValueQuery, ValueTime>> getQueries() {
return queries;
}
+
+ public Set<ValueContact> getContacts() {
+ return contacts;
+ }
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
index c713827..999c193 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
@@ -53,8 +53,9 @@ public class Stanik extends Module {
break;
case UPDATE_CONTACTS:
handleUpdateContacts((UpdateContactsMessage) message);
+ break;
default:
- throw new InvalidMessageType("This type of message cannot be handled by Stanik");
+ throw new InvalidMessageType("This type of message cannot be handled by Stanik" + message.getType().toString());
}
}
@@ -66,7 +67,8 @@ public class Stanik extends Module {
0,
message.getRequestId(),
hierarchy.clone(),
- (HashMap<Attribute, Entry<ValueQuery, ValueTime>>) queries.clone()
+ (HashMap<Attribute, Entry<ValueQuery, ValueTime>>) queries.clone(),
+ contacts
);
sendMessage(response);
}