diff options
15 files changed, 374 insertions, 271 deletions
diff --git a/build.gradle b/build.gradle index 66c50d1..13be045 100644 --- a/build.gradle +++ b/build.gradle @@ -30,22 +30,37 @@ ext.gossipPeriod = { return System.getProperty("gossipPeriod") ?: 5 * 1000 } -ext.UDUPHostname = { +ext.UDUPServerHostname = { return System.getProperty("hostname") ?: "localhost" } -ext.port = { +ext.UDUPServerPort = { return System.getProperty("port") ?: 5999; } -ext.timeout = { +ext.UDUPServerTimeout = { return System.getProperty("timeout") ?: 5000; } -ext.bufsize = { +ext.UDUPServerBufsize = { return System.getProperty("bufsize") ?: 512; } +/* +Possible options: +RoundRobinExp +RoundRobinUniform +RandomExp +RandomUniform + */ +ext.zoneSelectionStrategy = { + return System.getProperty("zoneStrategy") ?: "RandomUniform" +} + +ext.zonePath = { + return System.getProperty("zonePath") ?: "/uw/violet07" +} + repositories { // Use jcenter for resolving dependencies. // You can declare any Maven/Ivy/file repository here. @@ -89,10 +104,12 @@ task runAgent(type: JavaExec) { systemProperty 'freshness_period', freshnessPeriod() systemProperty 'query_period', queryPeriod() systemProperty 'gossip_period', gossipPeriod() - systemProperty 'UDUPServer.hostname', UDUPHostname() - systemProperty 'UDUPServer.port', port() - systemProperty 'UDUPServer.timeout', port() - systemProperty 'UDUPServer.bufsize', port() + systemProperty 'UDUPServer.hostname', UDUPServerHostname() + systemProperty 'UDUPServer.port', UDUPServerPort() + systemProperty 'UDUPServer.timeout', UDUPServerTimeout() + systemProperty 'UDUPServer.bufsize', UDUPServerBufsize() + systemProperty 'Gossip.zone_strategy', zoneSelectionStrategy() + systemProperty 'zone_path', zonePath() } task runClient(type: JavaExec) { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 58b55da..0cbda2d 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -1,280 +1,55 @@ package pl.edu.mimuw.cloudatlas.agent; -import java.net.Inet4Address; -import java.net.InetAddress; -import java.net.SocketException; -import java.net.UnknownHostException; -import java.rmi.registry.LocateRegistry; -import java.rmi.registry.Registry; -import java.rmi.server.UnicastRemoteObject; -import java.util.List; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; - -import pl.edu.mimuw.cloudatlas.agent.NewApiImplementation; -import pl.edu.mimuw.cloudatlas.agent.messages.*; -import pl.edu.mimuw.cloudatlas.agent.modules.*; -import pl.edu.mimuw.cloudatlas.agent.modules.Module; -import pl.edu.mimuw.cloudatlas.api.Api; +import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; import pl.edu.mimuw.cloudatlas.interpreter.Main; import pl.edu.mimuw.cloudatlas.model.PathName; -import pl.edu.mimuw.cloudatlas.model.Value; -import pl.edu.mimuw.cloudatlas.model.ValueContact; -import pl.edu.mimuw.cloudatlas.model.ValueSet; import pl.edu.mimuw.cloudatlas.model.ZMI; public class Agent { - private static EventBus eventBus; - private static GossipGirlStrategies.ZoneSelectionStrategy zoneSelectionStrategy; - private static GossipGirlStrategies gossipGirlStrategies; - private static Random random = new Random(); - public static void runRegistry() { + private static void addZoneAndChildren(ZMI zmi, PathName pathName, EventBus eventBus) { try { - NewApiImplementation api = new NewApiImplementation(eventBus); - Api apiStub = - (Api) UnicastRemoteObject.exportObject(api, 0); - Registry registry = LocateRegistry.getRegistry(); - registry.rebind("Api", apiStub); - System.out.println("Agent: api bound"); + UpdateAttributesMessage message = new UpdateAttributesMessage("", 0, pathName.toString(), zmi.getAttributes()); + eventBus.addMessage(message); + for (ZMI son : zmi.getSons()) { + addZoneAndChildren(son, pathName.levelDown(son.getAttributes().getOrNull("name").toString()), eventBus); + } } catch (Exception e) { - System.err.println("Agent registry initialization exception:"); - e.printStackTrace(); - } - } - - public static HashMap<ModuleType, Module> initializeModules() throws UnknownHostException, SocketException, NullPointerException { - HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>(); - modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER)); - modules.put(ModuleType.RMI, new Remik()); - Long freshnessPeriod = Long.getLong("freshness_period"); - modules.put(ModuleType.STATE, new Stanik(freshnessPeriod)); - modules.put(ModuleType.QUERY, new Qurnik()); - modules.put(ModuleType.GOSSIP, new GossipGirl()); - - Integer port = Integer.getInteger("UDUPServer.port"); - Integer timeout = Integer.getInteger("UDUPServer.timeout"); - Integer bufsize = Integer.getInteger("UDUPServer.bufsize"); - UDUPServer server = new UDUPServer(InetAddress.getByName("127.0.0.1"), port, bufsize); - modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server)); - return modules; - } - - public static HashMap<ModuleType, Executor> initializeExecutors( - HashMap<ModuleType, Module> modules) { - HashMap<ModuleType, Executor> executors = new HashMap<ModuleType, Executor>(); - - for (Map.Entry<ModuleType, Module> moduleEntry : modules.entrySet()) { - Module module = moduleEntry.getValue(); - Executor executor = new Executor(module); - executors.put(moduleEntry.getKey(), executor); - } - - return executors; - } - - public static ArrayList<Thread> initializeExecutorThreads(HashMap<ModuleType, Executor> executors) { - ArrayList<Thread> executorThreads = new ArrayList<Thread>(); - - for (Map.Entry<ModuleType, Executor> executorEntry : executors.entrySet()) { - Thread thread = new Thread(executorEntry.getValue()); - thread.setDaemon(true); - System.out.println("Initializing executor " + executorEntry.getKey()); - thread.start(); - executorThreads.add(thread); - } - - return executorThreads; - } - - public static void closeExecutors(ArrayList<Thread> executorThreads) { - for (Thread executorThread : executorThreads) { - executorThread.interrupt(); - } - } - - public static void runModulesAsThreads() { - HashMap<ModuleType, Module> modules = null; - - try { - modules = initializeModules(); - } catch (UnknownHostException | SocketException e) { - System.out.println("Module initialization failed"); - e.printStackTrace(); - return; + System.out.println("ERROR: failed to add zone"); } - - HashMap<ModuleType, Executor> executors = initializeExecutors(modules); - ArrayList<Thread> executorThreads = initializeExecutorThreads(executors); - eventBus = new EventBus(executors); - Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer()); - Thread eventBusThread = new Thread(eventBus); - System.out.println("Initializing event bus"); - eventBusThread.start(); - UDUPServerThread.start(); } - private static void initZones() { + public static void initZones(EventBus eventBus) { try { ZMI root = Main.createTestHierarchy2(); - addZoneAndChildren(root, new PathName("")); + addZoneAndChildren(root, new PathName(""), eventBus); System.out.println("Initialized with test hierarchy"); } catch (Exception e) { System.out.println("ERROR: failed to create test hierarchy"); } } - private static void startQueries(long queriesPeriod) { - Supplier<TimerScheduledTask> taskSupplier = () -> - new TimerScheduledTask() { - public void run() { - try { - sendMessage(new RunQueriesMessage("", 0)); - } catch (InterruptedException e) { - System.out.println("Interrupted while triggering queries"); - } - } - }; - - startRecursiveTask(taskSupplier, queriesPeriod); - } - - private static void startGossip(long gossipPeriod) { - Supplier<TimerScheduledTask> taskSupplier = () -> - new TimerScheduledTask() { - public void run() { - try { - System.out.println("INFO: initiating gossip"); - PathName gossipLevel = gossipGirlStrategies.selectStrategy(zoneSelectionStrategy); - ValueContact contact = selectContactFromLevel(gossipLevel); - if (contact != null) { - System.out.println("INFO: found a contact " + contact.toString()); - InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact); - sendMessage(message); - } else { - System.out.println("DEBUG: couldn't find contact for gossip"); - } - } catch (InterruptedException e) { - System.out.println("Interrupted while initiating gossip"); - } catch (Exception e) { - System.out.println("ERROR: something happened " + e.toString()); - } - } - }; - - startRecursiveTask(taskSupplier, gossipPeriod); - } - - private static ValueContact selectContactFromLevel(PathName path) throws Exception { - CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture(); - eventBus.addMessage(new RequestStateMessage("", 0, responseFuture)); - StateMessage response = (StateMessage) responseFuture.get(); - ZMI root = response.getZMI(); - List<ZMI> siblings = getSiblings(root, path); - filterEmptyContacts(siblings); - if (siblings.isEmpty()) { - return selectFallbackContact(); - } - ZMI zmi = selectZMI(siblings); - ValueSet contactsValue = (ValueSet) zmi.getAttributes().getOrNull("contacts"); - Set<Value> valueSet = contactsValue.getValue(); - return selectContactFromSet(valueSet); - } - - private static ValueContact selectFallbackContact() throws Exception { - return null; - } - - private static ZMI selectZMI(List<ZMI> zmis) throws Exception { - int i = random.nextInt(zmis.size()); - for (ZMI zmi : zmis) { - if (i == 0) { - return zmi; - } - i--; - } - System.out.println("ERROR: empty list passed to selectZMI"); - throw new Exception("empty list passed to selectZMI"); - } - - private static ValueContact selectContactFromSet(Set<Value> contacts) throws Exception { - int i = random.nextInt(contacts.size()); - for (Value contact : contacts) { - if (i == 0) { - return (ValueContact) contact; - } - i--; - } - System.out.println("ERROR: empty list passed to selectContactFromSet"); - throw new Exception("empty list passed to selectContactFromSet"); - } - - private static List<ZMI> getSiblings(ZMI root, PathName path) { - try { - List<ZMI> siblingsAndI = root.findDescendant(path).getFather().getSons(); - List<ZMI> siblings = new ArrayList(); - for (ZMI siblingOrI : siblingsAndI) { - if (!siblingOrI.getPathName().equals(path)) { - siblings.add(siblingOrI); - } - } - return siblings; - } catch (ZMI.NoSuchZoneException e) { - System.out.println("ERROR: didn't find path when looking for siblings"); - return new ArrayList(); - } - } - - private static void filterEmptyContacts(List<ZMI> zmis) { - Iterator<ZMI> iterator = zmis.iterator(); - while (iterator.hasNext()) { - ZMI zmi = iterator.next(); - ValueSet contacts = (ValueSet) zmi.getAttributes().getOrNull("contacts"); - if (contacts == null || contacts.isNull() || contacts.isEmpty()) { - iterator.remove(); - } - } - } + public static void main(String[] args) { + AgentConfig agentConfig = new AgentConfig(); - private static void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period) { - TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier); + agentConfig.runModulesAsThreads(); - try { - eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask)); - } catch (InterruptedException e) { - System.out.println("Interrupted while starting queries"); - } - } + EventBus eventBus = new EventBus(agentConfig.getExecutors()); + agentConfig.runRegistry(eventBus); + agentConfig.startNonModuleThreads(eventBus); - private static void addZoneAndChildren(ZMI zmi, PathName pathName) { - try { - UpdateAttributesMessage message = new UpdateAttributesMessage("", 0, pathName.toString(), zmi.getAttributes()); - eventBus.addMessage(message); - for (ZMI son : zmi.getSons()) { - addZoneAndChildren(son, pathName.levelDown(son.getAttributes().getOrNull("name").toString())); - } - } catch (Exception e) { - System.out.println("ERROR: failed to add zone"); - } - } + // initZones(eventBus); - public static void main(String[] args) { - zoneSelectionStrategy = GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ; - gossipGirlStrategies = new GossipGirlStrategies(new PathName("/uw/violet07")); - runModulesAsThreads(); - runRegistry(); - initZones(); // TODO: make query period confiurable with config file and from tests + + // TODO config setup + String zonePath = System.getProperty("zone_path"); + String selectionStrategy = System.getProperty("Gossip.zone_strategy"); Long queryPeriod = Long.getLong("query_period"); - startQueries(queryPeriod); Long gossipPeriod = Long.getLong("gossip_period"); - startGossip(gossipPeriod); + + HierarchyConfig hierarchyConfig = new HierarchyConfig(eventBus, zonePath, selectionStrategy); + hierarchyConfig.startQueries(queryPeriod); + hierarchyConfig.startGossip(gossipPeriod); } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java new file mode 100644 index 0000000..38d764a --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java @@ -0,0 +1,112 @@ +package pl.edu.mimuw.cloudatlas.agent; + +import pl.edu.mimuw.cloudatlas.agent.modules.*; +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.api.Api; + +import java.net.InetAddress; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.rmi.registry.LocateRegistry; +import java.rmi.registry.Registry; +import java.rmi.server.UnicastRemoteObject; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +public class AgentConfig { + private HashMap<ModuleType, Executor> executors; + HashMap<ModuleType, Module> modules; + + public HashMap<ModuleType, Executor> getExecutors() { + return executors; + } + + public void runRegistry(EventBus eventBus) { + try { + NewApiImplementation api = new NewApiImplementation(eventBus); + Api apiStub = + (Api) UnicastRemoteObject.exportObject(api, 0); + Registry registry = LocateRegistry.getRegistry(); + registry.rebind("Api", apiStub); + System.out.println("Agent: api bound"); + } catch (Exception e) { + System.err.println("Agent registry initialization exception:"); + e.printStackTrace(); + } + } + + private HashMap<ModuleType, Module> initializeModules() throws UnknownHostException, SocketException, NullPointerException { + // TODO config setup + Long freshnessPeriod = Long.getLong("freshness_period"); + Integer port = Integer.getInteger("UDUPServer.port"); + Integer timeout = Integer.getInteger("UDUPServer.timeout"); + Integer bufsize = Integer.getInteger("UDUPServer.bufsize"); + InetAddress serverAddr = InetAddress.getByName(System.getProperty("UDUPServer.hostname")); + + HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>(); + modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER)); + modules.put(ModuleType.RMI, new Remik()); + modules.put(ModuleType.STATE, new Stanik(freshnessPeriod)); + modules.put(ModuleType.QUERY, new Qurnik()); + modules.put(ModuleType.GOSSIP, new GossipGirl()); + + UDUPServer server = new UDUPServer(serverAddr, port, bufsize); + modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server)); + return modules; + } + + public static HashMap<ModuleType, Executor> initializeExecutors( + HashMap<ModuleType, Module> modules) { + HashMap<ModuleType, Executor> executors = new HashMap<ModuleType, Executor>(); + + for (Map.Entry<ModuleType, Module> moduleEntry : modules.entrySet()) { + Module module = moduleEntry.getValue(); + Executor executor = new Executor(module); + executors.put(moduleEntry.getKey(), executor); + } + + return executors; + } + + public static ArrayList<Thread> initializeExecutorThreads(HashMap<ModuleType, Executor> executors) { + ArrayList<Thread> executorThreads = new ArrayList<Thread>(); + + for (Map.Entry<ModuleType, Executor> executorEntry : executors.entrySet()) { + Thread thread = new Thread(executorEntry.getValue()); + thread.setDaemon(true); + System.out.println("Initializing executor " + executorEntry.getKey()); + thread.start(); + executorThreads.add(thread); + } + + return executorThreads; + } + + public void closeExecutors(ArrayList<Thread> executorThreads) { + for (Thread executorThread : executorThreads) { + executorThread.interrupt(); + } + } + + public void runModulesAsThreads() { + try { + modules = initializeModules(); + } catch (UnknownHostException | SocketException e) { + System.out.println("Module initialization failed"); + e.printStackTrace(); + return; + } + + executors = initializeExecutors(modules); + ArrayList<Thread> executorThreads = initializeExecutorThreads(executors); + } + + void startNonModuleThreads(EventBus eventBus) { + Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer()); + Thread eventBusThread = new Thread(eventBus); + System.out.println("Initializing event bus"); + eventBusThread.start(); + UDUPServerThread.start(); + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java new file mode 100644 index 0000000..62d4bcd --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java @@ -0,0 +1,19 @@ +package pl.edu.mimuw.cloudatlas.agent; + +import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; +import pl.edu.mimuw.cloudatlas.agent.modules.RecursiveScheduledTask; +import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask; + +import java.util.function.Supplier; + +public class AgentUtils { + + public static void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period, EventBus eventBus) { + TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier); + try { + eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask)); + } catch (InterruptedException e) { + System.out.println("Interrupted while starting queries"); + } + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java index a6d3b2d..0e972f4 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java @@ -22,6 +22,7 @@ public class EventBus implements Runnable { // Allows for testing with a mock EventBus protected EventBus() { + this.events = new LinkedBlockingQueue<AgentMessage>(); } EventBus(HashMap<ModuleType, Executor> executors) { @@ -30,6 +31,10 @@ public class EventBus implements Runnable { this.events = new LinkedBlockingQueue<AgentMessage>(); } + public void setEvents(LinkedBlockingQueue<AgentMessage> events) { + this.events = events; + } + public void run() { System.out.println("Event bus running"); while (!Thread.currentThread().interrupted()) { @@ -49,6 +54,7 @@ public class EventBus implements Runnable { } public void addMessage(AgentMessage msg) throws InterruptedException { + System.out.println("INFO: message added for " + msg.getDestinationModule().toString()); this.events.put(msg); } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java new file mode 100644 index 0000000..dc5241d --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java @@ -0,0 +1,156 @@ +package pl.edu.mimuw.cloudatlas.agent; + +import pl.edu.mimuw.cloudatlas.agent.messages.*; +import pl.edu.mimuw.cloudatlas.agent.modules.GossipGirlStrategies; +import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask; +import pl.edu.mimuw.cloudatlas.model.*; + +import java.lang.reflect.UndeclaredThrowableException; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +public class HierarchyConfig { + private GossipGirlStrategies gossipGirlStrategies; + private GossipGirlStrategies.ZoneSelectionStrategy zoneSelectionStrategy; + private Random random = new Random(); + private EventBus eventBus; + + HierarchyConfig(EventBus eventBus, String zonePath, String zoneStrategy) { + zoneSelectionStrategy = parseStrategy(zoneStrategy); + gossipGirlStrategies = new GossipGirlStrategies(new PathName(zonePath)); + this.eventBus = eventBus; + } + + private GossipGirlStrategies.ZoneSelectionStrategy parseStrategy(String selectionStrategy) { + switch (selectionStrategy) { + case "RoundRobinExp": + return GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_EXP_FREQ; + case "RoundRobinUniform": + return GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ; + case "RandomExp": + return GossipGirlStrategies.ZoneSelectionStrategy.RANDOM_DECR_EXP; + case "RandomUniform": + return GossipGirlStrategies.ZoneSelectionStrategy.RANDOM_UNFIORM; + default: + throw new UnsupportedOperationException("Selection strategy doesnt exist"); + } + } + + public void startGossip(long gossipPeriod) { + Supplier<TimerScheduledTask> taskSupplier = () -> + new TimerScheduledTask() { + public void run() { + try { + System.out.println("INFO: initiating gossip"); + PathName gossipLevel = gossipGirlStrategies.selectStrategy(zoneSelectionStrategy); + ValueContact contact = selectContactFromLevel(gossipLevel); + if (contact != null) { + System.out.println("INFO: found a contact " + contact.toString()); + InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact); + sendMessage(message); + } else { + System.out.println("DEBUG: couldn't find contact for gossip"); + } + } catch (InterruptedException e) { + System.out.println("Interrupted while initiating gossip"); + } catch (Exception e) { + System.out.println("ERROR: something happened " + e.toString()); + } + } + }; + + AgentUtils.startRecursiveTask(taskSupplier, gossipPeriod, eventBus); + } + + private ValueContact selectContactFromLevel(PathName path) throws Exception { + CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture(); + this.eventBus.addMessage(new RequestStateMessage("", 0, responseFuture)); + StateMessage response = (StateMessage) responseFuture.get(); + ZMI root = response.getZMI(); + List<ZMI> siblings = getSiblings(root, path); + filterEmptyContacts(siblings); + if (siblings.isEmpty()) { + return selectFallbackContact(response.getContacts()); + } + ZMI zmi = selectZMI(siblings); + ValueSet contactsValue = (ValueSet) zmi.getAttributes().getOrNull("contacts"); + Set<Value> valueSet = contactsValue.getValue(); + return selectContactFromSet(valueSet); + } + + // TODO + private ValueContact selectFallbackContact(Set<ValueContact> contacts) throws Exception { + if (contacts.isEmpty()) { + return null; + } else { + return selectContactFromSet(contacts); + } + } + + private ZMI selectZMI(List<ZMI> zmis) throws Exception { + int i = random.nextInt(zmis.size()); + for (ZMI zmi : zmis) { + if (i == 0) { + return zmi; + } + i--; + } + System.out.println("ERROR: empty list passed to selectZMI"); + throw new Exception("empty list passed to selectZMI"); + } + + private <T> ValueContact selectContactFromSet(Set<T> contacts) throws Exception { + int i = random.nextInt(contacts.size()); + for (T contact : contacts) { + if (i == 0) { + return (ValueContact) contact; + } + i--; + } + System.out.println("ERROR: empty list passed to selectContactFromSet"); + throw new Exception("empty list passed to selectContactFromSet"); + } + + private List<ZMI> getSiblings(ZMI root, PathName path) { + try { + List<ZMI> siblingsAndI = root.findDescendant(path).getFather().getSons(); + List<ZMI> siblings = new ArrayList(); + for (ZMI siblingOrI : siblingsAndI) { + if (!siblingOrI.getPathName().equals(path)) { + siblings.add(siblingOrI); + } + } + return siblings; + } catch (ZMI.NoSuchZoneException e) { + System.out.println("ERROR: didn't find path when looking for siblings"); + return new ArrayList(); + } + } + + private void filterEmptyContacts(List<ZMI> zmis) { + Iterator<ZMI> iterator = zmis.iterator(); + while (iterator.hasNext()) { + ZMI zmi = iterator.next(); + ValueSet contacts = (ValueSet) zmi.getAttributes().getOrNull("contacts"); + if (contacts == null || contacts.isNull() || contacts.isEmpty()) { + iterator.remove(); + } + } + } + + public void startQueries(long queriesPeriod) { + Supplier<TimerScheduledTask> taskSupplier = () -> + new TimerScheduledTask() { + public void run() { + try { + sendMessage(new RunQueriesMessage("", 0)); + } catch (InterruptedException e) { + System.out.println("Interrupted while triggering queries"); + } + } + }; + + AgentUtils.startRecursiveTask(taskSupplier, queriesPeriod, eventBus); + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java index 759723b..806d41f 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java @@ -2,9 +2,11 @@ package pl.edu.mimuw.cloudatlas.agent.messages; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.ValueContact; import pl.edu.mimuw.cloudatlas.model.ValueQuery; import pl.edu.mimuw.cloudatlas.model.ValueTime; import pl.edu.mimuw.cloudatlas.model.ZMI; @@ -12,11 +14,13 @@ import pl.edu.mimuw.cloudatlas.model.ZMI; public class StateMessage extends ResponseMessage { private ZMI zmi; private Map<Attribute, Entry<ValueQuery, ValueTime>> queries; + private Set<ValueContact> contacts; - public StateMessage(String messageId, ModuleType destinationModule, long timestamp, long requestId, ZMI zmi, Map<Attribute, Entry<ValueQuery, ValueTime>> queries) { + public StateMessage(String messageId, ModuleType destinationModule, long timestamp, long requestId, ZMI zmi, Map<Attribute, Entry<ValueQuery, ValueTime>> queries, Set<ValueContact> contacts) { super(messageId, destinationModule, timestamp, Type.STATE, requestId); this.zmi = zmi; this.queries = queries; + this.contacts = contacts; } public StateMessage() {} @@ -28,4 +32,8 @@ public class StateMessage extends ResponseMessage { public Map<Attribute, Entry<ValueQuery, ValueTime>> getQueries() { return queries; } + + public Set<ValueContact> getContacts() { + return contacts; + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java index c713827..999c193 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java @@ -53,8 +53,9 @@ public class Stanik extends Module { break; case UPDATE_CONTACTS: handleUpdateContacts((UpdateContactsMessage) message); + break; default: - throw new InvalidMessageType("This type of message cannot be handled by Stanik"); + throw new InvalidMessageType("This type of message cannot be handled by Stanik" + message.getType().toString()); } } @@ -66,7 +67,8 @@ public class Stanik extends Module { 0, message.getRequestId(), hierarchy.clone(), - (HashMap<Attribute, Entry<ValueQuery, ValueTime>>) queries.clone() + (HashMap<Attribute, Entry<ValueQuery, ValueTime>>) queries.clone(), + contacts ); sendMessage(response); } diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java index a3e0898..e2c12d6 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java @@ -3,6 +3,7 @@ package pl.edu.mimuw.cloudatlas.agent; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.Ignore; import static org.junit.Assert.*; import static org.junit.Assert.assertThat; import static org.hamcrest.CoreMatchers.hasItems; @@ -34,6 +35,7 @@ import pl.edu.mimuw.cloudatlas.model.ValueQuery; import pl.edu.mimuw.cloudatlas.model.ValueString; import pl.edu.mimuw.cloudatlas.model.ValueTime; +@Ignore public class AgentIntegrationTest { private static Process registryProcess; private static Process agentProcess; diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/NewApiImplementationTests.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/NewApiImplementationTests.java index 6330648..a5ecddd 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/NewApiImplementationTests.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/NewApiImplementationTests.java @@ -58,7 +58,7 @@ public class NewApiImplementationTests { RequestStateMessage requestMessage = (RequestStateMessage) message; ZMI root = new ZMI(); - StateMessage responseMessage = new StateMessage("", ModuleType.RMI, 0, 0, root, null); + StateMessage responseMessage = new StateMessage("", ModuleType.RMI, 0, 0, root, null, null); requestMessage.getFuture().complete(responseMessage); apiThread.join(100); @@ -88,7 +88,7 @@ public class NewApiImplementationTests { ZMI zmi = new ZMI(); zmi.getAttributes().add("timestamp", new ValueTime(42l)); - StateMessage response = new StateMessage("", ModuleType.RMI, 0, 0, zmi, new HashMap()); + StateMessage response = new StateMessage("", ModuleType.RMI, 0, 0, zmi, new HashMap(), null); requestMessage.getFuture().complete(response); apiThread.join(100); @@ -122,7 +122,7 @@ public class NewApiImplementationTests { zmi.addSon(son); son.getAttributes().add("name", new ValueString("son")); son.getAttributes().add("timestamp", new ValueTime(43l)); - StateMessage response = new StateMessage("", ModuleType.RMI, 0, 0, zmi, new HashMap()); + StateMessage response = new StateMessage("", ModuleType.RMI, 0, 0, zmi, new HashMap(), null); requestMessage.getFuture().complete(response); apiThread.join(100); diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/SchedulerTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/SchedulerTest.java index ccb4c8d..4a9eb87 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/SchedulerTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/SchedulerTest.java @@ -23,8 +23,8 @@ public class SchedulerTest { public SchedulerTest() { this.modules = initializeModule(); - this.executors = Agent.initializeExecutors(modules); - this.executorThreads = Agent.initializeExecutorThreads(executors); + this.executors = AgentConfig.initializeExecutors(modules); + this.executorThreads = AgentConfig.initializeExecutorThreads(executors); this.eventBus = new EventBus(executors); this.eventBusThread = new Thread(eventBus); eventBusThread.start(); diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java index fb3c5f7..2d7b15f 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlTest.java @@ -8,6 +8,7 @@ import static org.hamcrest.CoreMatchers.hasItems; import java.net.InetAddress; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -78,7 +79,7 @@ public class GossipGirlTest { testTime = ValueUtils.currentTime(); setupHierarchy(); setupQueries(); - stateMessage = new StateMessage("", ModuleType.GOSSIP, 0, 0, initiatorHierarchy, initiatorQueries); + stateMessage = new StateMessage("", ModuleType.GOSSIP, 0, 0, initiatorHierarchy, initiatorQueries, new HashSet()); Map<PathName, ValueTime> otherZoneTimestamps = makeOtherZoneTimestamps(); Map<Attribute, ValueTime> otherQueryTimestamps = makeOtherQueryTimestamps(); diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/QurnikTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/QurnikTest.java index 1e4fbda..a8d0ecb 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/QurnikTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/QurnikTest.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; @@ -63,7 +64,7 @@ public class QurnikTest { new ValueTime(0l) ) ); - StateMessage message = new StateMessage("", ModuleType.QUERY, 0, 0, root, queries); + StateMessage message = new StateMessage("", ModuleType.QUERY, 0, 0, root, queries, new HashSet()); long timeBefore = System.currentTimeMillis(); qurnik.handleTyped(message); long timeAfter = System.currentTimeMillis(); @@ -168,7 +169,7 @@ public class QurnikTest { ZMI root = setupSampleHierarchy(); Map<Attribute, Entry<ValueQuery, ValueTime>> queries = setupSampleQueries(); - StateMessage message = new StateMessage("", ModuleType.QUERY, 0, 0, root, queries); + StateMessage message = new StateMessage("", ModuleType.QUERY, 0, 0, root, queries, new HashSet()); long timeBefore = System.currentTimeMillis(); qurnik.handleTyped(message); long timeAfter = System.currentTimeMillis(); @@ -224,7 +225,7 @@ public class QurnikTest { new ValueTime(44l) ) ); - StateMessage message = new StateMessage("", ModuleType.QUERY, 0, 0, root, queries); + StateMessage message = new StateMessage("", ModuleType.QUERY, 0, 0, root, queries, new HashSet()); long timeBefore = System.currentTimeMillis(); qurnik.handleTyped(message); long timeAfter = System.currentTimeMillis(); diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/RemikTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/RemikTest.java index 8ac8bed..8dec10b 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/RemikTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/RemikTest.java @@ -3,6 +3,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.HashMap; +import java.util.HashSet; import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage; @@ -49,7 +50,7 @@ public class RemikTest { ZMI zmi = new ZMI(); zmi.getAttributes().add("timestamp", new ValueTime(42l)); - StateMessage response = new StateMessage("", ModuleType.RMI, 0, 0, zmi, new HashMap()); + StateMessage response = new StateMessage("", ModuleType.RMI, 0, 0, zmi, new HashMap(), new HashSet()); remik.handleTyped(response); ResponseMessage passedResponse = future.get(100, TimeUnit.MILLISECONDS); diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/client/ClientTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/client/ClientTest.java index 2bd63f6..85b0949 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/client/ClientTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/client/ClientTest.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.client; import org.hamcrest.CoreMatchers; import org.junit.AfterClass; +import org.junit.Ignore; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -126,6 +127,7 @@ public class ClientTest { .andExpect(content().string(CoreMatchers.containsString("Attribute submitted successfully"))); } + @Ignore @Test public void numericalRESTApiCheck() throws Exception { Thread.sleep(10000); @@ -135,6 +137,7 @@ public class ClientTest { .andExpect(content().string(CoreMatchers.containsString("num_processes"))); } + @Ignore @Test public void allValuesRESTApiCheck() throws Exception { Thread.sleep(10000); |