m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/pl/edu/mimuw/cloudatlas
diff options
context:
space:
mode:
authorMartin <marcin.j.chrzanowski@gmail.com>2020-01-11 18:25:10 +0100
committerGitHub <noreply@github.com>2020-01-11 18:25:10 +0100
commit0c94ea210579436f5a1723117f83ced974517b55 (patch)
tree6af826662de612f5b91644845845630753dac856 /src/main/java/pl/edu/mimuw/cloudatlas
parent2b8b0560346fe7e55c293550efb003d9bd25db98 (diff)
parentb4e25a3cb8a9c54cd1d4ec4cbe78dd44f2873eeb (diff)
Merge pull request #96 from m-chrzan/start-gossip-girl
Create repeating task for initiating gossip
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java129
1 files changed, 122 insertions, 7 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
index 62cd544..b478f1f 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
@@ -7,24 +7,34 @@ import java.net.UnknownHostException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
+import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import pl.edu.mimuw.cloudatlas.agent.NewApiImplementation;
-import pl.edu.mimuw.cloudatlas.agent.messages.RunQueriesMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.*;
import pl.edu.mimuw.cloudatlas.agent.modules.*;
import pl.edu.mimuw.cloudatlas.agent.modules.Module;
import pl.edu.mimuw.cloudatlas.api.Api;
import pl.edu.mimuw.cloudatlas.interpreter.Main;
import pl.edu.mimuw.cloudatlas.model.PathName;
+import pl.edu.mimuw.cloudatlas.model.Value;
+import pl.edu.mimuw.cloudatlas.model.ValueContact;
+import pl.edu.mimuw.cloudatlas.model.ValueSet;
import pl.edu.mimuw.cloudatlas.model.ZMI;
public class Agent {
private static EventBus eventBus;
+ private static GossipGirlStrategies.ZoneSelectionStrategy zoneSelectionStrategy;
+ private static GossipGirlStrategies gossipGirlStrategies;
+ private static Random random = new Random();
public static void runRegistry() {
try {
@@ -47,13 +57,13 @@ public class Agent {
Long freshnessPeriod = Long.getLong("freshness_period");
modules.put(ModuleType.STATE, new Stanik(freshnessPeriod));
modules.put(ModuleType.QUERY, new Qurnik());
+ modules.put(ModuleType.GOSSIP, new GossipGirl());
Integer port = Integer.getInteger("UDUPServer.port");
Integer timeout = Integer.getInteger("UDUPServer.timeout");
Integer bufsize = Integer.getInteger("UDUPServer.bufsize");
UDUPServer server = new UDUPServer(InetAddress.getByName("127.0.0.1"), port, bufsize);
modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server));
- // TODO add modules as we implement them
return modules;
}
@@ -133,10 +143,110 @@ public class Agent {
}
};
- TimerScheduledTask timerTask = new RecursiveScheduledTask(queriesPeriod, taskSupplier);
+ startRecursiveTask(taskSupplier, queriesPeriod);
+ }
+
+ private static void startGossip(long gossipPeriod) {
+ Supplier<TimerScheduledTask> taskSupplier = () ->
+ new TimerScheduledTask() {
+ public void run() {
+ try {
+ System.out.println("INFO: initiating gossip");
+ PathName gossipLevel = gossipGirlStrategies.selectStrategy(zoneSelectionStrategy);
+ ValueContact contact = selectContactFromLevel(gossipLevel);
+ if (contact != null) {
+ InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact);
+ sendMessage(message);
+ } else {
+ System.out.println("DEBUG: couldn't find contact for gossip");
+ }
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted while initiating gossip");
+ } catch (Exception e) {
+ System.out.println("ERROR: something happened " + e.toString());
+ }
+ }
+ };
+
+ startRecursiveTask(taskSupplier, gossipPeriod);
+ }
+
+ private static ValueContact selectContactFromLevel(PathName path) throws Exception {
+ CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture();
+ eventBus.addMessage(new RequestStateMessage("", 0, responseFuture));
+ StateMessage response = (StateMessage) responseFuture.get();
+ ZMI root = response.getZMI();
+ List<ZMI> siblings = getSiblings(root, path);
+ filterEmptyContacts(siblings);
+ if (siblings.isEmpty()) {
+ return selectFallbackContact();
+ }
+ ZMI zmi = selectZMI(siblings);
+ ValueSet contactsValue = (ValueSet) zmi.getAttributes().getOrNull("contacts");
+ Set<Value> valueSet = contactsValue.getValue();
+ return selectContactFromSet(valueSet);
+ }
+
+ private static ValueContact selectFallbackContact() throws Exception {
+ return selectContactFromSet(new HashSet());
+ }
+
+ private static ZMI selectZMI(List<ZMI> zmis) throws Exception {
+ int i = random.nextInt(zmis.size());
+ for (ZMI zmi : zmis) {
+ if (i == 0) {
+ return zmi;
+ }
+ i--;
+ }
+ System.out.println("ERROR: empty list passed to selectZMI");
+ throw new Exception("empty list passed to selectZMI");
+ }
+
+ private static ValueContact selectContactFromSet(Set<Value> contacts) throws Exception {
+ int i = random.nextInt(contacts.size());
+ for (Value contact : contacts) {
+ if (i == 0) {
+ return (ValueContact) contact;
+ }
+ i--;
+ }
+ System.out.println("ERROR: empty list passed to selectContactFromSet");
+ throw new Exception("empty list passed to selectContactFromSet");
+ }
+
+ private static List<ZMI> getSiblings(ZMI root, PathName path) {
+ try {
+ List<ZMI> siblingsAndI = root.findDescendant(path).getFather().getSons();
+ List<ZMI> siblings = new ArrayList();
+ for (ZMI siblingOrI : siblingsAndI) {
+ if (!siblingOrI.getPathName().equals(path)) {
+ siblings.add(siblingOrI);
+ }
+ }
+ return siblings;
+ } catch (ZMI.NoSuchZoneException e) {
+ System.out.println("ERROR: didn't find path when looking for siblings");
+ return new ArrayList();
+ }
+ }
+
+ private static void filterEmptyContacts(List<ZMI> zmis) {
+ Iterator<ZMI> iterator = zmis.iterator();
+ while (iterator.hasNext()) {
+ ZMI zmi = iterator.next();
+ ValueSet contacts = (ValueSet) zmi.getAttributes().getOrNull("contacts");
+ if (contacts == null || contacts.isNull() || contacts.isEmpty()) {
+ iterator.remove();
+ }
+ }
+ }
+
+ private static void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period) {
+ TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier);
try {
- eventBus.addMessage(new TimerSchedulerMessage("", 0, "", queriesPeriod, 0, timerTask));
+ eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask));
} catch (InterruptedException e) {
System.out.println("Interrupted while starting queries");
}
@@ -155,10 +265,15 @@ public class Agent {
}
public static void main(String[] args) {
+ zoneSelectionStrategy = GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ;
+ gossipGirlStrategies = new GossipGirlStrategies(new PathName("/uw/violet07"));
runModulesAsThreads();
runRegistry();
initZones();
// TODO: make query period confiurable with config file and from tests
- startQueries(100l);
+ Long queryPeriod = Long.getLong("query_period");
+ startQueries(queryPeriod);
+ Long gossipPeriod = Long.getLong("gossip_period");
+ startGossip(gossipPeriod);
}
}