m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcin Chrzanowski <marcin.j.chrzanowski@gmail.com>2020-01-12 16:06:34 +0100
committerMarcin Chrzanowski <marcin.j.chrzanowski@gmail.com>2020-01-12 16:06:34 +0100
commit6710ea855694dfc6486604d06f046a90f69f1373 (patch)
treee6751ace086a53631109f2b8f5d458c2bed45ac5
parentfd554f89eeff6ffb3dcd80447c4284c976090e9c (diff)
parent100e3d23b47d9d772d64dd0c7e596cd20de218b9 (diff)
Merge branch 'master' into gossip-gc
-rw-r--r--build.gradle33
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java275
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java112
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java19
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java6
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java156
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java10
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java6
-rw-r--r--src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java2
-rw-r--r--src/test/java/pl/edu/mimuw/cloudatlas/agent/NewApiImplementationTests.java6
-rw-r--r--src/test/java/pl/edu/mimuw/cloudatlas/agent/SchedulerTest.java4
-rw-r--r--src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java3
-rw-r--r--src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/QurnikTest.java7
-rw-r--r--src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/RemikTest.java3
-rw-r--r--src/test/java/pl/edu/mimuw/cloudatlas/client/ClientTest.java3
15 files changed, 374 insertions, 271 deletions
diff --git a/build.gradle b/build.gradle
index 66c50d1..13be045 100644
--- a/build.gradle
+++ b/build.gradle
@@ -30,22 +30,37 @@ ext.gossipPeriod = {
return System.getProperty("gossipPeriod") ?: 5 * 1000
}
-ext.UDUPHostname = {
+ext.UDUPServerHostname = {
return System.getProperty("hostname") ?: "localhost"
}
-ext.port = {
+ext.UDUPServerPort = {
return System.getProperty("port") ?: 5999;
}
-ext.timeout = {
+ext.UDUPServerTimeout = {
return System.getProperty("timeout") ?: 5000;
}
-ext.bufsize = {
+ext.UDUPServerBufsize = {
return System.getProperty("bufsize") ?: 512;
}
+/*
+Possible options:
+RoundRobinExp
+RoundRobinUniform
+RandomExp
+RandomUniform
+ */
+ext.zoneSelectionStrategy = {
+ return System.getProperty("zoneStrategy") ?: "RandomUniform"
+}
+
+ext.zonePath = {
+ return System.getProperty("zonePath") ?: "/uw/violet07"
+}
+
repositories {
// Use jcenter for resolving dependencies.
// You can declare any Maven/Ivy/file repository here.
@@ -89,10 +104,12 @@ task runAgent(type: JavaExec) {
systemProperty 'freshness_period', freshnessPeriod()
systemProperty 'query_period', queryPeriod()
systemProperty 'gossip_period', gossipPeriod()
- systemProperty 'UDUPServer.hostname', UDUPHostname()
- systemProperty 'UDUPServer.port', port()
- systemProperty 'UDUPServer.timeout', port()
- systemProperty 'UDUPServer.bufsize', port()
+ systemProperty 'UDUPServer.hostname', UDUPServerHostname()
+ systemProperty 'UDUPServer.port', UDUPServerPort()
+ systemProperty 'UDUPServer.timeout', UDUPServerTimeout()
+ systemProperty 'UDUPServer.bufsize', UDUPServerBufsize()
+ systemProperty 'Gossip.zone_strategy', zoneSelectionStrategy()
+ systemProperty 'zone_path', zonePath()
}
task runClient(type: JavaExec) {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
index 58b55da..0cbda2d 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
@@ -1,280 +1,55 @@
package pl.edu.mimuw.cloudatlas.agent;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.rmi.server.UnicastRemoteObject;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
-
-import pl.edu.mimuw.cloudatlas.agent.NewApiImplementation;
-import pl.edu.mimuw.cloudatlas.agent.messages.*;
-import pl.edu.mimuw.cloudatlas.agent.modules.*;
-import pl.edu.mimuw.cloudatlas.agent.modules.Module;
-import pl.edu.mimuw.cloudatlas.api.Api;
+import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage;
import pl.edu.mimuw.cloudatlas.interpreter.Main;
import pl.edu.mimuw.cloudatlas.model.PathName;
-import pl.edu.mimuw.cloudatlas.model.Value;
-import pl.edu.mimuw.cloudatlas.model.ValueContact;
-import pl.edu.mimuw.cloudatlas.model.ValueSet;
import pl.edu.mimuw.cloudatlas.model.ZMI;
public class Agent {
- private static EventBus eventBus;
- private static GossipGirlStrategies.ZoneSelectionStrategy zoneSelectionStrategy;
- private static GossipGirlStrategies gossipGirlStrategies;
- private static Random random = new Random();
- public static void runRegistry() {
+ private static void addZoneAndChildren(ZMI zmi, PathName pathName, EventBus eventBus) {
try {
- NewApiImplementation api = new NewApiImplementation(eventBus);
- Api apiStub =
- (Api) UnicastRemoteObject.exportObject(api, 0);
- Registry registry = LocateRegistry.getRegistry();
- registry.rebind("Api", apiStub);
- System.out.println("Agent: api bound");
+ UpdateAttributesMessage message = new UpdateAttributesMessage("", 0, pathName.toString(), zmi.getAttributes());
+ eventBus.addMessage(message);
+ for (ZMI son : zmi.getSons()) {
+ addZoneAndChildren(son, pathName.levelDown(son.getAttributes().getOrNull("name").toString()), eventBus);
+ }
} catch (Exception e) {
- System.err.println("Agent registry initialization exception:");
- e.printStackTrace();
- }
- }
-
- public static HashMap<ModuleType, Module> initializeModules() throws UnknownHostException, SocketException, NullPointerException {
- HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>();
- modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER));
- modules.put(ModuleType.RMI, new Remik());
- Long freshnessPeriod = Long.getLong("freshness_period");
- modules.put(ModuleType.STATE, new Stanik(freshnessPeriod));
- modules.put(ModuleType.QUERY, new Qurnik());
- modules.put(ModuleType.GOSSIP, new GossipGirl());
-
- Integer port = Integer.getInteger("UDUPServer.port");
- Integer timeout = Integer.getInteger("UDUPServer.timeout");
- Integer bufsize = Integer.getInteger("UDUPServer.bufsize");
- UDUPServer server = new UDUPServer(InetAddress.getByName("127.0.0.1"), port, bufsize);
- modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server));
- return modules;
- }
-
- public static HashMap<ModuleType, Executor> initializeExecutors(
- HashMap<ModuleType, Module> modules) {
- HashMap<ModuleType, Executor> executors = new HashMap<ModuleType, Executor>();
-
- for (Map.Entry<ModuleType, Module> moduleEntry : modules.entrySet()) {
- Module module = moduleEntry.getValue();
- Executor executor = new Executor(module);
- executors.put(moduleEntry.getKey(), executor);
- }
-
- return executors;
- }
-
- public static ArrayList<Thread> initializeExecutorThreads(HashMap<ModuleType, Executor> executors) {
- ArrayList<Thread> executorThreads = new ArrayList<Thread>();
-
- for (Map.Entry<ModuleType, Executor> executorEntry : executors.entrySet()) {
- Thread thread = new Thread(executorEntry.getValue());
- thread.setDaemon(true);
- System.out.println("Initializing executor " + executorEntry.getKey());
- thread.start();
- executorThreads.add(thread);
- }
-
- return executorThreads;
- }
-
- public static void closeExecutors(ArrayList<Thread> executorThreads) {
- for (Thread executorThread : executorThreads) {
- executorThread.interrupt();
- }
- }
-
- public static void runModulesAsThreads() {
- HashMap<ModuleType, Module> modules = null;
-
- try {
- modules = initializeModules();
- } catch (UnknownHostException | SocketException e) {
- System.out.println("Module initialization failed");
- e.printStackTrace();
- return;
+ System.out.println("ERROR: failed to add zone");
}
-
- HashMap<ModuleType, Executor> executors = initializeExecutors(modules);
- ArrayList<Thread> executorThreads = initializeExecutorThreads(executors);
- eventBus = new EventBus(executors);
- Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer());
- Thread eventBusThread = new Thread(eventBus);
- System.out.println("Initializing event bus");
- eventBusThread.start();
- UDUPServerThread.start();
}
- private static void initZones() {
+ public static void initZones(EventBus eventBus) {
try {
ZMI root = Main.createTestHierarchy2();
- addZoneAndChildren(root, new PathName(""));
+ addZoneAndChildren(root, new PathName(""), eventBus);
System.out.println("Initialized with test hierarchy");
} catch (Exception e) {
System.out.println("ERROR: failed to create test hierarchy");
}
}
- private static void startQueries(long queriesPeriod) {
- Supplier<TimerScheduledTask> taskSupplier = () ->
- new TimerScheduledTask() {
- public void run() {
- try {
- sendMessage(new RunQueriesMessage("", 0));
- } catch (InterruptedException e) {
- System.out.println("Interrupted while triggering queries");
- }
- }
- };
-
- startRecursiveTask(taskSupplier, queriesPeriod);
- }
-
- private static void startGossip(long gossipPeriod) {
- Supplier<TimerScheduledTask> taskSupplier = () ->
- new TimerScheduledTask() {
- public void run() {
- try {
- System.out.println("INFO: initiating gossip");
- PathName gossipLevel = gossipGirlStrategies.selectStrategy(zoneSelectionStrategy);
- ValueContact contact = selectContactFromLevel(gossipLevel);
- if (contact != null) {
- System.out.println("INFO: found a contact " + contact.toString());
- InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact);
- sendMessage(message);
- } else {
- System.out.println("DEBUG: couldn't find contact for gossip");
- }
- } catch (InterruptedException e) {
- System.out.println("Interrupted while initiating gossip");
- } catch (Exception e) {
- System.out.println("ERROR: something happened " + e.toString());
- }
- }
- };
-
- startRecursiveTask(taskSupplier, gossipPeriod);
- }
-
- private static ValueContact selectContactFromLevel(PathName path) throws Exception {
- CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture();
- eventBus.addMessage(new RequestStateMessage("", 0, responseFuture));
- StateMessage response = (StateMessage) responseFuture.get();
- ZMI root = response.getZMI();
- List<ZMI> siblings = getSiblings(root, path);
- filterEmptyContacts(siblings);
- if (siblings.isEmpty()) {
- return selectFallbackContact();
- }
- ZMI zmi = selectZMI(siblings);
- ValueSet contactsValue = (ValueSet) zmi.getAttributes().getOrNull("contacts");
- Set<Value> valueSet = contactsValue.getValue();
- return selectContactFromSet(valueSet);
- }
-
- private static ValueContact selectFallbackContact() throws Exception {
- return null;
- }
-
- private static ZMI selectZMI(List<ZMI> zmis) throws Exception {
- int i = random.nextInt(zmis.size());
- for (ZMI zmi : zmis) {
- if (i == 0) {
- return zmi;
- }
- i--;
- }
- System.out.println("ERROR: empty list passed to selectZMI");
- throw new Exception("empty list passed to selectZMI");
- }
-
- private static ValueContact selectContactFromSet(Set<Value> contacts) throws Exception {
- int i = random.nextInt(contacts.size());
- for (Value contact : contacts) {
- if (i == 0) {
- return (ValueContact) contact;
- }
- i--;
- }
- System.out.println("ERROR: empty list passed to selectContactFromSet");
- throw new Exception("empty list passed to selectContactFromSet");
- }
-
- private static List<ZMI> getSiblings(ZMI root, PathName path) {
- try {
- List<ZMI> siblingsAndI = root.findDescendant(path).getFather().getSons();
- List<ZMI> siblings = new ArrayList();
- for (ZMI siblingOrI : siblingsAndI) {
- if (!siblingOrI.getPathName().equals(path)) {
- siblings.add(siblingOrI);
- }
- }
- return siblings;
- } catch (ZMI.NoSuchZoneException e) {
- System.out.println("ERROR: didn't find path when looking for siblings");
- return new ArrayList();
- }
- }
-
- private static void filterEmptyContacts(List<ZMI> zmis) {
- Iterator<ZMI> iterator = zmis.iterator();
- while (iterator.hasNext()) {
- ZMI zmi = iterator.next();
- ValueSet contacts = (ValueSet) zmi.getAttributes().getOrNull("contacts");
- if (contacts == null || contacts.isNull() || contacts.isEmpty()) {
- iterator.remove();
- }
- }
- }
+ public static void main(String[] args) {
+ AgentConfig agentConfig = new AgentConfig();
- private static void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period) {
- TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier);
+ agentConfig.runModulesAsThreads();
- try {
- eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask));
- } catch (InterruptedException e) {
- System.out.println("Interrupted while starting queries");
- }
- }
+ EventBus eventBus = new EventBus(agentConfig.getExecutors());
+ agentConfig.runRegistry(eventBus);
+ agentConfig.startNonModuleThreads(eventBus);
- private static void addZoneAndChildren(ZMI zmi, PathName pathName) {
- try {
- UpdateAttributesMessage message = new UpdateAttributesMessage("", 0, pathName.toString(), zmi.getAttributes());
- eventBus.addMessage(message);
- for (ZMI son : zmi.getSons()) {
- addZoneAndChildren(son, pathName.levelDown(son.getAttributes().getOrNull("name").toString()));
- }
- } catch (Exception e) {
- System.out.println("ERROR: failed to add zone");
- }
- }
+ // initZones(eventBus);
- public static void main(String[] args) {
- zoneSelectionStrategy = GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ;
- gossipGirlStrategies = new GossipGirlStrategies(new PathName("/uw/violet07"));
- runModulesAsThreads();
- runRegistry();
- initZones();
// TODO: make query period confiurable with config file and from tests
+
+ // TODO config setup
+ String zonePath = System.getProperty("zone_path");
+ String selectionStrategy = System.getProperty("Gossip.zone_strategy");
Long queryPeriod = Long.getLong("query_period");
- startQueries(queryPeriod);
Long gossipPeriod = Long.getLong("gossip_period");
- startGossip(gossipPeriod);
+
+ HierarchyConfig hierarchyConfig = new HierarchyConfig(eventBus, zonePath, selectionStrategy);
+ hierarchyConfig.startQueries(queryPeriod);
+ hierarchyConfig.startGossip(gossipPeriod);
}
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
new file mode 100644
index 0000000..38d764a
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java
@@ -0,0 +1,112 @@
+package pl.edu.mimuw.cloudatlas.agent;
+
+import pl.edu.mimuw.cloudatlas.agent.modules.*;
+import pl.edu.mimuw.cloudatlas.agent.modules.Module;
+import pl.edu.mimuw.cloudatlas.api.Api;
+
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+public class AgentConfig {
+ private HashMap<ModuleType, Executor> executors;
+ HashMap<ModuleType, Module> modules;
+
+ public HashMap<ModuleType, Executor> getExecutors() {
+ return executors;
+ }
+
+ public void runRegistry(EventBus eventBus) {
+ try {
+ NewApiImplementation api = new NewApiImplementation(eventBus);
+ Api apiStub =
+ (Api) UnicastRemoteObject.exportObject(api, 0);
+ Registry registry = LocateRegistry.getRegistry();
+ registry.rebind("Api", apiStub);
+ System.out.println("Agent: api bound");
+ } catch (Exception e) {
+ System.err.println("Agent registry initialization exception:");
+ e.printStackTrace();
+ }
+ }
+
+ private HashMap<ModuleType, Module> initializeModules() throws UnknownHostException, SocketException, NullPointerException {
+ // TODO config setup
+ Long freshnessPeriod = Long.getLong("freshness_period");
+ Integer port = Integer.getInteger("UDUPServer.port");
+ Integer timeout = Integer.getInteger("UDUPServer.timeout");
+ Integer bufsize = Integer.getInteger("UDUPServer.bufsize");
+ InetAddress serverAddr = InetAddress.getByName(System.getProperty("UDUPServer.hostname"));
+
+ HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>();
+ modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER));
+ modules.put(ModuleType.RMI, new Remik());
+ modules.put(ModuleType.STATE, new Stanik(freshnessPeriod));
+ modules.put(ModuleType.QUERY, new Qurnik());
+ modules.put(ModuleType.GOSSIP, new GossipGirl());
+
+ UDUPServer server = new UDUPServer(serverAddr, port, bufsize);
+ modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server));
+ return modules;
+ }
+
+ public static HashMap<ModuleType, Executor> initializeExecutors(
+ HashMap<ModuleType, Module> modules) {
+ HashMap<ModuleType, Executor> executors = new HashMap<ModuleType, Executor>();
+
+ for (Map.Entry<ModuleType, Module> moduleEntry : modules.entrySet()) {
+ Module module = moduleEntry.getValue();
+ Executor executor = new Executor(module);
+ executors.put(moduleEntry.getKey(), executor);
+ }
+
+ return executors;
+ }
+
+ public static ArrayList<Thread> initializeExecutorThreads(HashMap<ModuleType, Executor> executors) {
+ ArrayList<Thread> executorThreads = new ArrayList<Thread>();
+
+ for (Map.Entry<ModuleType, Executor> executorEntry : executors.entrySet()) {
+ Thread thread = new Thread(executorEntry.getValue());
+ thread.setDaemon(true);
+ System.out.println("Initializing executor " + executorEntry.getKey());
+ thread.start();
+ executorThreads.add(thread);
+ }
+
+ return executorThreads;
+ }
+
+ public void closeExecutors(ArrayList<Thread> executorThreads) {
+ for (Thread executorThread : executorThreads) {
+ executorThread.interrupt();
+ }
+ }
+
+ public void runModulesAsThreads() {
+ try {
+ modules = initializeModules();
+ } catch (UnknownHostException | SocketException e) {
+ System.out.println("Module initialization failed");
+ e.printStackTrace();
+ return;
+ }
+
+ executors = initializeExecutors(modules);
+ ArrayList<Thread> executorThreads = initializeExecutorThreads(executors);
+ }
+
+ void startNonModuleThreads(EventBus eventBus) {
+ Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer());
+ Thread eventBusThread = new Thread(eventBus);
+ System.out.println("Initializing event bus");
+ eventBusThread.start();
+ UDUPServerThread.start();
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java
new file mode 100644
index 0000000..62d4bcd
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java
@@ -0,0 +1,19 @@
+package pl.edu.mimuw.cloudatlas.agent;
+
+import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
+import pl.edu.mimuw.cloudatlas.agent.modules.RecursiveScheduledTask;
+import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask;
+
+import java.util.function.Supplier;
+
+public class AgentUtils {
+
+ public static void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period, EventBus eventBus) {
+ TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier);
+ try {
+ eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask));
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted while starting queries");
+ }
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
index a6d3b2d..0e972f4 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
@@ -22,6 +22,7 @@ public class EventBus implements Runnable {
// Allows for testing with a mock EventBus
protected EventBus() {
+ this.events = new LinkedBlockingQueue<AgentMessage>();
}
EventBus(HashMap<ModuleType, Executor> executors) {
@@ -30,6 +31,10 @@ public class EventBus implements Runnable {
this.events = new LinkedBlockingQueue<AgentMessage>();
}
+ public void setEvents(LinkedBlockingQueue<AgentMessage> events) {
+ this.events = events;
+ }
+
public void run() {
System.out.println("Event bus running");
while (!Thread.currentThread().interrupted()) {
@@ -49,6 +54,7 @@ public class EventBus implements Runnable {
}
public void addMessage(AgentMessage msg) throws InterruptedException {
+ System.out.println("INFO: message added for " + msg.getDestinationModule().toString());
this.events.put(msg);
}
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java
new file mode 100644
index 0000000..dc5241d
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java
@@ -0,0 +1,156 @@
+package pl.edu.mimuw.cloudatlas.agent;
+
+import pl.edu.mimuw.cloudatlas.agent.messages.*;
+import pl.edu.mimuw.cloudatlas.agent.modules.GossipGirlStrategies;
+import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask;
+import pl.edu.mimuw.cloudatlas.model.*;
+
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+public class HierarchyConfig {
+ private GossipGirlStrategies gossipGirlStrategies;
+ private GossipGirlStrategies.ZoneSelectionStrategy zoneSelectionStrategy;
+ private Random random = new Random();
+ private EventBus eventBus;
+
+ HierarchyConfig(EventBus eventBus, String zonePath, String zoneStrategy) {
+ zoneSelectionStrategy = parseStrategy(zoneStrategy);
+ gossipGirlStrategies = new GossipGirlStrategies(new PathName(zonePath));
+ this.eventBus = eventBus;
+ }
+
+ private GossipGirlStrategies.ZoneSelectionStrategy parseStrategy(String selectionStrategy) {
+ switch (selectionStrategy) {
+ case "RoundRobinExp":
+ return GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_EXP_FREQ;
+ case "RoundRobinUniform":
+ return GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ;
+ case "RandomExp":
+ return GossipGirlStrategies.ZoneSelectionStrategy.RANDOM_DECR_EXP;
+ case "RandomUniform":
+ return GossipGirlStrategies.ZoneSelectionStrategy.RANDOM_UNFIORM;
+ default:
+ throw new UnsupportedOperationException("Selection strategy doesnt exist");
+ }
+ }
+
+ public void startGossip(long gossipPeriod) {
+ Supplier<TimerScheduledTask> taskSupplier = () ->
+ new TimerScheduledTask() {
+ public void run() {
+ try {
+ System.out.println("INFO: initiating gossip");
+ PathName gossipLevel = gossipGirlStrategies.selectStrategy(zoneSelectionStrategy);
+ ValueContact contact = selectContactFromLevel(gossipLevel);
+ if (contact != null) {
+ System.out.println("INFO: found a contact " + contact.toString());
+ InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact);
+ sendMessage(message);
+ } else {
+ System.out.println("DEBUG: couldn't find contact for gossip");
+ }
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted while initiating gossip");
+ } catch (Exception e) {
+ System.out.println("ERROR: something happened " + e.toString());
+ }
+ }
+ };
+
+ AgentUtils.startRecursiveTask(taskSupplier, gossipPeriod, eventBus);
+ }
+
+ private ValueContact selectContactFromLevel(PathName path) throws Exception {
+ CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture();
+ this.eventBus.addMessage(new RequestStateMessage("", 0, responseFuture));
+ StateMessage response = (StateMessage) responseFuture.get();
+ ZMI root = response.getZMI();
+ List<ZMI> siblings = getSiblings(root, path);
+ filterEmptyContacts(siblings);
+ if (siblings.isEmpty()) {
+ return selectFallbackContact(response.getContacts());
+ }
+ ZMI zmi = selectZMI(siblings);
+ ValueSet contactsValue = (ValueSet) zmi.getAttributes().getOrNull("contacts");
+ Set<Value> valueSet = contactsValue.getValue();
+ return selectContactFromSet(valueSet);
+ }
+
+ // TODO
+ private ValueContact selectFallbackContact(Set<ValueContact> contacts) throws Exception {
+ if (contacts.isEmpty()) {
+ return null;
+ } else {
+ return selectContactFromSet(contacts);
+ }
+ }
+
+ private ZMI selectZMI(List<ZMI> zmis) throws Exception {
+ int i = random.nextInt(zmis.size());
+ for (ZMI zmi : zmis) {
+ if (i == 0) {
+ return zmi;
+ }
+ i--;
+ }
+ System.out.println("ERROR: empty list passed to selectZMI");
+ throw new Exception("empty list passed to selectZMI");
+ }
+
+ private <T> ValueContact selectContactFromSet(Set<T> contacts) throws Exception {
+ int i = random.nextInt(contacts.size());
+ for (T contact : contacts) {
+ if (i == 0) {
+ return (ValueContact) contact;
+ }
+ i--;
+ }
+ System.out.println("ERROR: empty list passed to selectContactFromSet");
+ throw new Exception("empty list passed to selectContactFromSet");
+ }
+
+ private List<ZMI> getSiblings(ZMI root, PathName path) {
+ try {
+ List<ZMI> siblingsAndI = root.findDescendant(path).getFather().getSons();
+ List<ZMI> siblings = new ArrayList();
+ for (ZMI siblingOrI : siblingsAndI) {
+ if (!siblingOrI.getPathName().equals(path)) {
+ siblings.add(siblingOrI);
+ }
+ }
+ return siblings;
+ } catch (ZMI.NoSuchZoneException e) {
+ System.out.println("ERROR: didn't find path when looking for siblings");
+ return new ArrayList();
+ }
+ }
+
+ private void filterEmptyContacts(List<ZMI> zmis) {
+ Iterator<ZMI> iterator = zmis.iterator();
+ while (iterator.hasNext()) {
+ ZMI zmi = iterator.next();
+ ValueSet contacts = (ValueSet) zmi.getAttributes().getOrNull("contacts");
+ if (contacts == null || contacts.isNull() || contacts.isEmpty()) {
+ iterator.remove();
+ }
+ }
+ }
+
+ public void startQueries(long queriesPeriod) {
+ Supplier<TimerScheduledTask> taskSupplier = () ->
+ new TimerScheduledTask() {
+ public void run() {
+ try {
+ sendMessage(new RunQueriesMessage("", 0));
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted while triggering queries");
+ }
+ }
+ };
+
+ AgentUtils.startRecursiveTask(taskSupplier, queriesPeriod, eventBus);
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java
index 759723b..806d41f 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java
@@ -2,9 +2,11 @@ package pl.edu.mimuw.cloudatlas.agent.messages;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
import pl.edu.mimuw.cloudatlas.model.Attribute;
+import pl.edu.mimuw.cloudatlas.model.ValueContact;
import pl.edu.mimuw.cloudatlas.model.ValueQuery;
import pl.edu.mimuw.cloudatlas.model.ValueTime;
import pl.edu.mimuw.cloudatlas.model.ZMI;
@@ -12,11 +14,13 @@ import pl.edu.mimuw.cloudatlas.model.ZMI;
public class StateMessage extends ResponseMessage {
private ZMI zmi;
private Map<Attribute, Entry<ValueQuery, ValueTime>> queries;
+ private Set<ValueContact> contacts;
- public StateMessage(String messageId, ModuleType destinationModule, long timestamp, long requestId, ZMI zmi, Map<Attribute, Entry<ValueQuery, ValueTime>> queries) {
+ public StateMessage(String messageId, ModuleType destinationModule, long timestamp, long requestId, ZMI zmi, Map<Attribute, Entry<ValueQuery, ValueTime>> queries, Set<ValueContact> contacts) {
super(messageId, destinationModule, timestamp, Type.STATE, requestId);
this.zmi = zmi;
this.queries = queries;
+ this.contacts = contacts;
}
public StateMessage() {}
@@ -28,4 +32,8 @@ public class StateMessage extends ResponseMessage {
public Map<Attribute, Entry<ValueQuery, ValueTime>> getQueries() {
return queries;
}
+
+ public Set<ValueContact> getContacts() {
+ return contacts;
+ }
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
index c713827..999c193 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
@@ -53,8 +53,9 @@ public class Stanik extends Module {
break;
case UPDATE_CONTACTS:
handleUpdateContacts((UpdateContactsMessage) message);
+ break;
default:
- throw new InvalidMessageType("This type of message cannot be handled by Stanik");
+ throw new InvalidMessageType("This type of message cannot be handled by Stanik" + message.getType().toString());
}
}
@@ -66,7 +67,8 @@ public class Stanik extends Module {
0,
message.getRequestId(),
hierarchy.clone(),
- (HashMap<Attribute, Entry<ValueQuery, ValueTime>>) queries.clone()
+ (HashMap<Attribute, Entry<ValueQuery, ValueTime>>) queries.clone(),
+ contacts
);
sendMessage(response);
}
diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java
index a3e0898..e2c12d6 100644
--- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java
+++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java
@@ -3,6 +3,7 @@ package pl.edu.mimuw.cloudatlas.agent;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.Ignore;
import static org.junit.Assert.*;
import static org.junit.Assert.assertThat;
import static org.hamcrest.CoreMatchers.hasItems;
@@ -34,6 +35,7 @@ import pl.edu.mimuw.cloudatlas.model.ValueQuery;
import pl.edu.mimuw.cloudatlas.model.ValueString;
import pl.edu.mimuw.cloudatlas.model.ValueTime;
+@Ignore
public class AgentIntegrationTest {
private static Process registryProcess;
private static Process agentProcess;
diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/NewApiImplementationTests.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/NewApiImplementationTests.java
index 6330648..a5ecddd 100644
--- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/NewApiImplementationTests.java
+++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/NewApiImplementationTests.java
@@ -58,7 +58,7 @@ public class NewApiImplementationTests {
RequestStateMessage requestMessage = (RequestStateMessage) message;
ZMI root = new ZMI();
- StateMessage responseMessage = new StateMessage("", ModuleType.RMI, 0, 0, root, null);
+ StateMessage responseMessage = new StateMessage("", ModuleType.RMI, 0, 0, root, null, null);
requestMessage.getFuture().complete(responseMessage);
apiThread.join(100);
@@ -88,7 +88,7 @@ public class NewApiImplementationTests {
ZMI zmi = new ZMI();
zmi.getAttributes().add("timestamp", new ValueTime(42l));
- StateMessage response = new StateMessage("", ModuleType.RMI, 0, 0, zmi, new HashMap());
+ StateMessage response = new StateMessage("", ModuleType.RMI, 0, 0, zmi, new HashMap(), null);
requestMessage.getFuture().complete(response);
apiThread.join(100);
@@ -122,7 +122,7 @@ public class NewApiImplementationTests {
zmi.addSon(son);
son.getAttributes().add("name", new ValueString("son"));
son.getAttributes().add("timestamp", new ValueTime(43l));
- StateMessage response = new StateMessage("", ModuleType.RMI, 0, 0, zmi, new HashMap());
+ StateMessage response = new StateMessage("", ModuleType.RMI, 0, 0, zmi, new HashMap(), null);
requestMessage.getFuture().complete(response);
apiThread.join(100);
diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/SchedulerTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/SchedulerTest.java
index ccb4c8d..4a9eb87 100644
--- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/SchedulerTest.java
+++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/SchedulerTest.java
@@ -23,8 +23,8 @@ public class SchedulerTest {
public SchedulerTest() {
this.modules = initializeModule();
- this.executors = Agent.initializeExecutors(modules);
- this.executorThreads = Agent.initializeExecutorThreads(executors);
+ this.executors = AgentConfig.initializeExecutors(modules);
+ this.executorThreads = AgentConfig.initializeExecutorThreads(executors);
this.eventBus = new EventBus(executors);
this.eventBusThread = new Thread(eventBus);
eventBusThread.start();
diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java
index fb3c5f7..2d7b15f 100644
--- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java
+++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java
@@ -8,6 +8,7 @@ import static org.hamcrest.CoreMatchers.hasItems;
import java.net.InetAddress;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -78,7 +79,7 @@ public class GossipGirlTest {
testTime = ValueUtils.currentTime();
setupHierarchy();
setupQueries();
- stateMessage = new StateMessage("", ModuleType.GOSSIP, 0, 0, initiatorHierarchy, initiatorQueries);
+ stateMessage = new StateMessage("", ModuleType.GOSSIP, 0, 0, initiatorHierarchy, initiatorQueries, new HashSet());
Map<PathName, ValueTime> otherZoneTimestamps = makeOtherZoneTimestamps();
Map<Attribute, ValueTime> otherQueryTimestamps = makeOtherQueryTimestamps();
diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/QurnikTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/QurnikTest.java
index 1e4fbda..a8d0ecb 100644
--- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/QurnikTest.java
+++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/QurnikTest.java
@@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
@@ -63,7 +64,7 @@ public class QurnikTest {
new ValueTime(0l)
)
);
- StateMessage message = new StateMessage("", ModuleType.QUERY, 0, 0, root, queries);
+ StateMessage message = new StateMessage("", ModuleType.QUERY, 0, 0, root, queries, new HashSet());
long timeBefore = System.currentTimeMillis();
qurnik.handleTyped(message);
long timeAfter = System.currentTimeMillis();
@@ -168,7 +169,7 @@ public class QurnikTest {
ZMI root = setupSampleHierarchy();
Map<Attribute, Entry<ValueQuery, ValueTime>> queries = setupSampleQueries();
- StateMessage message = new StateMessage("", ModuleType.QUERY, 0, 0, root, queries);
+ StateMessage message = new StateMessage("", ModuleType.QUERY, 0, 0, root, queries, new HashSet());
long timeBefore = System.currentTimeMillis();
qurnik.handleTyped(message);
long timeAfter = System.currentTimeMillis();
@@ -224,7 +225,7 @@ public class QurnikTest {
new ValueTime(44l)
)
);
- StateMessage message = new StateMessage("", ModuleType.QUERY, 0, 0, root, queries);
+ StateMessage message = new StateMessage("", ModuleType.QUERY, 0, 0, root, queries, new HashSet());
long timeBefore = System.currentTimeMillis();
qurnik.handleTyped(message);
long timeAfter = System.currentTimeMillis();
diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/RemikTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/RemikTest.java
index 8ac8bed..8dec10b 100644
--- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/RemikTest.java
+++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/RemikTest.java
@@ -3,6 +3,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.HashMap;
+import java.util.HashSet;
import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage;
@@ -49,7 +50,7 @@ public class RemikTest {
ZMI zmi = new ZMI();
zmi.getAttributes().add("timestamp", new ValueTime(42l));
- StateMessage response = new StateMessage("", ModuleType.RMI, 0, 0, zmi, new HashMap());
+ StateMessage response = new StateMessage("", ModuleType.RMI, 0, 0, zmi, new HashMap(), new HashSet());
remik.handleTyped(response);
ResponseMessage passedResponse = future.get(100, TimeUnit.MILLISECONDS);
diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/client/ClientTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/client/ClientTest.java
index 2bd63f6..85b0949 100644
--- a/src/test/java/pl/edu/mimuw/cloudatlas/client/ClientTest.java
+++ b/src/test/java/pl/edu/mimuw/cloudatlas/client/ClientTest.java
@@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.client;
import org.hamcrest.CoreMatchers;
import org.junit.AfterClass;
+import org.junit.Ignore;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -126,6 +127,7 @@ public class ClientTest {
.andExpect(content().string(CoreMatchers.containsString("Attribute submitted successfully")));
}
+ @Ignore
@Test
public void numericalRESTApiCheck() throws Exception {
Thread.sleep(10000);
@@ -135,6 +137,7 @@ public class ClientTest {
.andExpect(content().string(CoreMatchers.containsString("num_processes")));
}
+ @Ignore
@Test
public void allValuesRESTApiCheck() throws Exception {
Thread.sleep(10000);