From accbba812813ee29de1a33afe20f4536b85f8a91 Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sat, 11 Jan 2020 16:30:20 +0100 Subject: Create repeating task for initiating gossip --- .../java/pl/edu/mimuw/cloudatlas/agent/Agent.java | 127 +++++++++++++++++++-- 1 file changed, 120 insertions(+), 7 deletions(-) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 62cd544..f79684e 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -7,24 +7,34 @@ import java.net.UnknownHostException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; import java.rmi.server.UnicastRemoteObject; +import java.util.List; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import pl.edu.mimuw.cloudatlas.agent.NewApiImplementation; -import pl.edu.mimuw.cloudatlas.agent.messages.RunQueriesMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.*; import pl.edu.mimuw.cloudatlas.agent.modules.*; import pl.edu.mimuw.cloudatlas.agent.modules.Module; import pl.edu.mimuw.cloudatlas.api.Api; import pl.edu.mimuw.cloudatlas.interpreter.Main; import pl.edu.mimuw.cloudatlas.model.PathName; +import pl.edu.mimuw.cloudatlas.model.Value; +import pl.edu.mimuw.cloudatlas.model.ValueContact; +import pl.edu.mimuw.cloudatlas.model.ValueSet; import pl.edu.mimuw.cloudatlas.model.ZMI; public class Agent { private static EventBus eventBus; + private static GossipGirlStrategies.ZoneSelectionStrategy zoneSelectionStrategy; + private static GossipGirlStrategies gossipGirlStrategies; + private static Random random = new Random(); public static void runRegistry() { try { @@ -47,13 +57,13 @@ public class Agent { Long freshnessPeriod = Long.getLong("freshness_period"); modules.put(ModuleType.STATE, new Stanik(freshnessPeriod)); modules.put(ModuleType.QUERY, new Qurnik()); + modules.put(ModuleType.GOSSIP, new GossipGirl()); Integer port = Integer.getInteger("UDUPServer.port"); Integer timeout = Integer.getInteger("UDUPServer.timeout"); Integer bufsize = Integer.getInteger("UDUPServer.bufsize"); UDUPServer server = new UDUPServer(InetAddress.getByName("127.0.0.1"), port, bufsize); modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server)); - // TODO add modules as we implement them return modules; } @@ -133,10 +143,110 @@ public class Agent { } }; - TimerScheduledTask timerTask = new RecursiveScheduledTask(queriesPeriod, taskSupplier); + startRecursiveTask(taskSupplier, queriesPeriod); + } + + private static void startGossip(long gossipPeriod) { + Supplier taskSupplier = () -> + new TimerScheduledTask() { + public void run() { + try { + System.out.println("INFO: initiating gossip"); + PathName gossipLevel = gossipGirlStrategies.selectStrategy(zoneSelectionStrategy); + ValueContact contact = selectContactFromLevel(gossipLevel); + if (contact != null) { + InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact); + sendMessage(message); + } else { + System.out.println("DEBUG: couldn't find contact for gossip"); + } + } catch (InterruptedException e) { + System.out.println("Interrupted while initiating gossip"); + } catch (Exception e) { + System.out.println("ERROR: something happened"); + } + } + }; + + startRecursiveTask(taskSupplier, gossipPeriod); + } + + private static ValueContact selectContactFromLevel(PathName path) throws Exception { + CompletableFuture responseFuture = new CompletableFuture(); + eventBus.addMessage(new RequestStateMessage("", 0, responseFuture)); + StateMessage response = (StateMessage) responseFuture.get(); + ZMI root = response.getZMI(); + List siblings = getSiblings(root, path); + filterEmptyContacts(siblings); + if (siblings.isEmpty()) { + return selectFallbackContact(); + } + ZMI zmi = selectZMI(siblings); + ValueSet contactsValue = (ValueSet) zmi.getAttributes().getOrNull("contacts"); + Set valueSet = contactsValue.getValue(); + return selectContactFromSet(valueSet); + } + + private static ValueContact selectFallbackContact() throws Exception { + return selectContactFromSet(new HashSet()); + } + + private static ZMI selectZMI(List zmis) throws Exception { + int i = random.nextInt(zmis.size()); + for (ZMI zmi : zmis) { + if (i == 0) { + return zmi; + } + i--; + } + System.out.println("ERROR: empty list passed to selectZMI"); + throw new Exception("empty list passed to selectZMI"); + } + + private static ValueContact selectContactFromSet(Set contacts) throws Exception { + int i = random.nextInt(contacts.size()); + for (Value contact : contacts) { + if (i == 0) { + return (ValueContact) contact; + } + i--; + } + System.out.println("ERROR: empty list passed to selectContactFromSet"); + throw new Exception("empty list passed to selectContactFromSet"); + } + + private static List getSiblings(ZMI root, PathName path) { + try { + List siblingsAndI = root.findDescendant(path).getFather().getSons(); + List siblings = new ArrayList(); + for (ZMI siblingOrI : siblingsAndI) { + if (!siblingOrI.getPathName().equals(path)) { + siblings.add(siblingOrI); + } + } + return siblings; + } catch (ZMI.NoSuchZoneException e) { + System.out.println("ERROR: didn't find path when looking for siblings"); + return new ArrayList(); + } + } + + private static void filterEmptyContacts(List zmis) { + Iterator iterator = zmis.iterator(); + while (iterator.hasNext()) { + ZMI zmi = iterator.next(); + ValueSet contacts = (ValueSet) zmi.getAttributes().getOrNull("contacts"); + if (contacts == null || contacts.isNull() || contacts.isEmpty()) { + iterator.remove(); + } + } + } + + private static void startRecursiveTask(Supplier taskSupplier, long period) { + TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier); try { - eventBus.addMessage(new TimerSchedulerMessage("", 0, "", queriesPeriod, 0, timerTask)); + eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask)); } catch (InterruptedException e) { System.out.println("Interrupted while starting queries"); } @@ -155,10 +265,13 @@ public class Agent { } public static void main(String[] args) { + zoneSelectionStrategy = GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ; + gossipGirlStrategies = new GossipGirlStrategies(new PathName("/uw/violet07")); runModulesAsThreads(); runRegistry(); initZones(); // TODO: make query period confiurable with config file and from tests - startQueries(100l); + startQueries(6000l); + startGossip(5000l); } } -- cgit v1.2.3 From f055032fa683ab1cb1e92627849eb9fa7eeed903 Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sat, 11 Jan 2020 17:52:41 +0100 Subject: Fix test --- src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index f79684e..8652a3d 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -271,7 +271,7 @@ public class Agent { runRegistry(); initZones(); // TODO: make query period confiurable with config file and from tests - startQueries(6000l); + startQueries(100l); startGossip(5000l); } } -- cgit v1.2.3 From f4e8b8a24d3bb0a3bfb382c3b487e48817060ca1 Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sat, 11 Jan 2020 18:08:33 +0100 Subject: Make query and gossip periods configurable --- build.gradle | 10 ++++++++++ src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java | 6 ++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index e2174a7..66c50d1 100644 --- a/build.gradle +++ b/build.gradle @@ -22,6 +22,14 @@ ext.freshnessPeriod = { return System.getProperty("freshnessPeriod") ?: 60 * 1000 } +ext.queryPeriod = { + return System.getProperty("queryPeriod") ?: 5 * 1000 +} + +ext.gossipPeriod = { + return System.getProperty("gossipPeriod") ?: 5 * 1000 +} + ext.UDUPHostname = { return System.getProperty("hostname") ?: "localhost" } @@ -79,6 +87,8 @@ task runAgent(type: JavaExec) { main = 'pl.edu.mimuw.cloudatlas.agent.Agent' systemProperty 'java.rmi.server.hostname', hostname() systemProperty 'freshness_period', freshnessPeriod() + systemProperty 'query_period', queryPeriod() + systemProperty 'gossip_period', gossipPeriod() systemProperty 'UDUPServer.hostname', UDUPHostname() systemProperty 'UDUPServer.port', port() systemProperty 'UDUPServer.timeout', port() diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 8652a3d..85faa9c 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -271,7 +271,9 @@ public class Agent { runRegistry(); initZones(); // TODO: make query period confiurable with config file and from tests - startQueries(100l); - startGossip(5000l); + Long queryPeriod = Long.getLong("query_period"); + startQueries(queryPeriod); + Long gossipPeriod = Long.getLong("gossip_period"); + startGossip(gossipPeriod); } } -- cgit v1.2.3 From 88ff9c333fce04943f57b98aa2ac8948696afbd9 Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sat, 11 Jan 2020 18:12:35 +0100 Subject: Configure query period in test --- src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java index f69ed8f..a6398ed 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java @@ -47,7 +47,7 @@ public class AgentIntegrationTest { public static void bindApi() throws Exception { registryProcess = Runtime.getRuntime().exec("./scripts/registry"); Thread.sleep(10000); - agentProcess = Runtime.getRuntime().exec("./gradlew runAgent -Dhostname=localhost -DfreshnessPeriod=10000000"); + agentProcess = Runtime.getRuntime().exec("./gradlew runAgent -Dhostname=localhost -DfreshnessPeriod=10000000 -DqueryPreiod=100"); Thread.sleep(10000); registry = LocateRegistry.getRegistry("localhost"); -- cgit v1.2.3 From b4e25a3cb8a9c54cd1d4ec4cbe78dd44f2873eeb Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sat, 11 Jan 2020 18:24:48 +0100 Subject: Fix typo --- src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java | 2 +- src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 85faa9c..b478f1f 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -163,7 +163,7 @@ public class Agent { } catch (InterruptedException e) { System.out.println("Interrupted while initiating gossip"); } catch (Exception e) { - System.out.println("ERROR: something happened"); + System.out.println("ERROR: something happened " + e.toString()); } } }; diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java index a6398ed..a3e0898 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/AgentIntegrationTest.java @@ -47,7 +47,7 @@ public class AgentIntegrationTest { public static void bindApi() throws Exception { registryProcess = Runtime.getRuntime().exec("./scripts/registry"); Thread.sleep(10000); - agentProcess = Runtime.getRuntime().exec("./gradlew runAgent -Dhostname=localhost -DfreshnessPeriod=10000000 -DqueryPreiod=100"); + agentProcess = Runtime.getRuntime().exec("./gradlew runAgent -Dhostname=localhost -DfreshnessPeriod=10000000 -DqueryPeriod=100"); Thread.sleep(10000); registry = LocateRegistry.getRegistry("localhost"); -- cgit v1.2.3