m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
authorMarcin Chrzanowski <marcin.j.chrzanowski@gmail.com>2020-01-11 16:30:20 +0100
committerMarcin Chrzanowski <marcin.j.chrzanowski@gmail.com>2020-01-11 16:30:20 +0100
commitaccbba812813ee29de1a33afe20f4536b85f8a91 (patch)
tree1c6324ef9a5c00c7c00cf377b39f8a1b4e5c2acd /src/main/java
parentc125370268544a1321aaa2152e200b5a74664880 (diff)
Create repeating task for initiating gossip
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java127
1 files changed, 120 insertions, 7 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
index 62cd544..f79684e 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
@@ -7,24 +7,34 @@ import java.net.UnknownHostException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
+import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import pl.edu.mimuw.cloudatlas.agent.NewApiImplementation;
-import pl.edu.mimuw.cloudatlas.agent.messages.RunQueriesMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.*;
import pl.edu.mimuw.cloudatlas.agent.modules.*;
import pl.edu.mimuw.cloudatlas.agent.modules.Module;
import pl.edu.mimuw.cloudatlas.api.Api;
import pl.edu.mimuw.cloudatlas.interpreter.Main;
import pl.edu.mimuw.cloudatlas.model.PathName;
+import pl.edu.mimuw.cloudatlas.model.Value;
+import pl.edu.mimuw.cloudatlas.model.ValueContact;
+import pl.edu.mimuw.cloudatlas.model.ValueSet;
import pl.edu.mimuw.cloudatlas.model.ZMI;
public class Agent {
private static EventBus eventBus;
+ private static GossipGirlStrategies.ZoneSelectionStrategy zoneSelectionStrategy;
+ private static GossipGirlStrategies gossipGirlStrategies;
+ private static Random random = new Random();
public static void runRegistry() {
try {
@@ -47,13 +57,13 @@ public class Agent {
Long freshnessPeriod = Long.getLong("freshness_period");
modules.put(ModuleType.STATE, new Stanik(freshnessPeriod));
modules.put(ModuleType.QUERY, new Qurnik());
+ modules.put(ModuleType.GOSSIP, new GossipGirl());
Integer port = Integer.getInteger("UDUPServer.port");
Integer timeout = Integer.getInteger("UDUPServer.timeout");
Integer bufsize = Integer.getInteger("UDUPServer.bufsize");
UDUPServer server = new UDUPServer(InetAddress.getByName("127.0.0.1"), port, bufsize);
modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server));
- // TODO add modules as we implement them
return modules;
}
@@ -133,10 +143,110 @@ public class Agent {
}
};
- TimerScheduledTask timerTask = new RecursiveScheduledTask(queriesPeriod, taskSupplier);
+ startRecursiveTask(taskSupplier, queriesPeriod);
+ }
+
+ private static void startGossip(long gossipPeriod) {
+ Supplier<TimerScheduledTask> taskSupplier = () ->
+ new TimerScheduledTask() {
+ public void run() {
+ try {
+ System.out.println("INFO: initiating gossip");
+ PathName gossipLevel = gossipGirlStrategies.selectStrategy(zoneSelectionStrategy);
+ ValueContact contact = selectContactFromLevel(gossipLevel);
+ if (contact != null) {
+ InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact);
+ sendMessage(message);
+ } else {
+ System.out.println("DEBUG: couldn't find contact for gossip");
+ }
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted while initiating gossip");
+ } catch (Exception e) {
+ System.out.println("ERROR: something happened");
+ }
+ }
+ };
+
+ startRecursiveTask(taskSupplier, gossipPeriod);
+ }
+
+ private static ValueContact selectContactFromLevel(PathName path) throws Exception {
+ CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture();
+ eventBus.addMessage(new RequestStateMessage("", 0, responseFuture));
+ StateMessage response = (StateMessage) responseFuture.get();
+ ZMI root = response.getZMI();
+ List<ZMI> siblings = getSiblings(root, path);
+ filterEmptyContacts(siblings);
+ if (siblings.isEmpty()) {
+ return selectFallbackContact();
+ }
+ ZMI zmi = selectZMI(siblings);
+ ValueSet contactsValue = (ValueSet) zmi.getAttributes().getOrNull("contacts");
+ Set<Value> valueSet = contactsValue.getValue();
+ return selectContactFromSet(valueSet);
+ }
+
+ private static ValueContact selectFallbackContact() throws Exception {
+ return selectContactFromSet(new HashSet());
+ }
+
+ private static ZMI selectZMI(List<ZMI> zmis) throws Exception {
+ int i = random.nextInt(zmis.size());
+ for (ZMI zmi : zmis) {
+ if (i == 0) {
+ return zmi;
+ }
+ i--;
+ }
+ System.out.println("ERROR: empty list passed to selectZMI");
+ throw new Exception("empty list passed to selectZMI");
+ }
+
+ private static ValueContact selectContactFromSet(Set<Value> contacts) throws Exception {
+ int i = random.nextInt(contacts.size());
+ for (Value contact : contacts) {
+ if (i == 0) {
+ return (ValueContact) contact;
+ }
+ i--;
+ }
+ System.out.println("ERROR: empty list passed to selectContactFromSet");
+ throw new Exception("empty list passed to selectContactFromSet");
+ }
+
+ private static List<ZMI> getSiblings(ZMI root, PathName path) {
+ try {
+ List<ZMI> siblingsAndI = root.findDescendant(path).getFather().getSons();
+ List<ZMI> siblings = new ArrayList();
+ for (ZMI siblingOrI : siblingsAndI) {
+ if (!siblingOrI.getPathName().equals(path)) {
+ siblings.add(siblingOrI);
+ }
+ }
+ return siblings;
+ } catch (ZMI.NoSuchZoneException e) {
+ System.out.println("ERROR: didn't find path when looking for siblings");
+ return new ArrayList();
+ }
+ }
+
+ private static void filterEmptyContacts(List<ZMI> zmis) {
+ Iterator<ZMI> iterator = zmis.iterator();
+ while (iterator.hasNext()) {
+ ZMI zmi = iterator.next();
+ ValueSet contacts = (ValueSet) zmi.getAttributes().getOrNull("contacts");
+ if (contacts == null || contacts.isNull() || contacts.isEmpty()) {
+ iterator.remove();
+ }
+ }
+ }
+
+ private static void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period) {
+ TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier);
try {
- eventBus.addMessage(new TimerSchedulerMessage("", 0, "", queriesPeriod, 0, timerTask));
+ eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask));
} catch (InterruptedException e) {
System.out.println("Interrupted while starting queries");
}
@@ -155,10 +265,13 @@ public class Agent {
}
public static void main(String[] args) {
+ zoneSelectionStrategy = GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ;
+ gossipGirlStrategies = new GossipGirlStrategies(new PathName("/uw/violet07"));
runModulesAsThreads();
runRegistry();
initZones();
// TODO: make query period confiurable with config file and from tests
- startQueries(100l);
+ startQueries(6000l);
+ startGossip(5000l);
}
}