From accbba812813ee29de1a33afe20f4536b85f8a91 Mon Sep 17 00:00:00 2001
From: Marcin Chrzanowski <marcin.j.chrzanowski@gmail.com>
Date: Sat, 11 Jan 2020 16:30:20 +0100
Subject: Create repeating task for initiating gossip

---
 .../java/pl/edu/mimuw/cloudatlas/agent/Agent.java  | 127 +++++++++++++++++++--
 1 file changed, 120 insertions(+), 7 deletions(-)

diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
index 62cd544..f79684e 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
@@ -7,24 +7,34 @@ import java.net.UnknownHostException;
 import java.rmi.registry.LocateRegistry;
 import java.rmi.registry.Registry;
 import java.rmi.server.UnicastRemoteObject;
+import java.util.List;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
 import pl.edu.mimuw.cloudatlas.agent.NewApiImplementation;
-import pl.edu.mimuw.cloudatlas.agent.messages.RunQueriesMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.*;
 import pl.edu.mimuw.cloudatlas.agent.modules.*;
 import pl.edu.mimuw.cloudatlas.agent.modules.Module;
 import pl.edu.mimuw.cloudatlas.api.Api;
 import pl.edu.mimuw.cloudatlas.interpreter.Main;
 import pl.edu.mimuw.cloudatlas.model.PathName;
+import pl.edu.mimuw.cloudatlas.model.Value;
+import pl.edu.mimuw.cloudatlas.model.ValueContact;
+import pl.edu.mimuw.cloudatlas.model.ValueSet;
 import pl.edu.mimuw.cloudatlas.model.ZMI;
 
 public class Agent {
     private static EventBus eventBus;
+    private static GossipGirlStrategies.ZoneSelectionStrategy zoneSelectionStrategy;
+    private static GossipGirlStrategies gossipGirlStrategies;
+    private static Random random = new Random();
 
     public static void runRegistry() {
         try {
@@ -47,13 +57,13 @@ public class Agent {
         Long freshnessPeriod = Long.getLong("freshness_period");
         modules.put(ModuleType.STATE, new Stanik(freshnessPeriod));
         modules.put(ModuleType.QUERY, new Qurnik());
+        modules.put(ModuleType.GOSSIP, new GossipGirl());
 
         Integer port = Integer.getInteger("UDUPServer.port");
         Integer timeout = Integer.getInteger("UDUPServer.timeout");
         Integer bufsize = Integer.getInteger("UDUPServer.bufsize");
         UDUPServer server = new UDUPServer(InetAddress.getByName("127.0.0.1"), port, bufsize);
         modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server));
-        // TODO add modules as we implement them
         return modules;
     }
 
@@ -133,10 +143,110 @@ public class Agent {
                 }
             };
 
-        TimerScheduledTask timerTask = new RecursiveScheduledTask(queriesPeriod, taskSupplier);
+        startRecursiveTask(taskSupplier, queriesPeriod);
+    }
+
+    private static void startGossip(long gossipPeriod) {
+        Supplier<TimerScheduledTask> taskSupplier = () ->
+            new TimerScheduledTask() {
+                public void run() {
+                    try {
+                        System.out.println("INFO: initiating gossip");
+                        PathName gossipLevel = gossipGirlStrategies.selectStrategy(zoneSelectionStrategy);
+                        ValueContact contact = selectContactFromLevel(gossipLevel);
+                        if (contact != null) {
+                            InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact);
+                            sendMessage(message);
+                        } else {
+                            System.out.println("DEBUG: couldn't find contact for gossip");
+                        }
+                    } catch (InterruptedException e) {
+                        System.out.println("Interrupted while initiating gossip");
+                    } catch (Exception e) {
+                        System.out.println("ERROR: something happened");
+                    }
+                }
+            };
+
+        startRecursiveTask(taskSupplier, gossipPeriod);
+    }
+
+    private static ValueContact selectContactFromLevel(PathName path) throws Exception {
+        CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture();
+        eventBus.addMessage(new RequestStateMessage("", 0, responseFuture));
+        StateMessage response = (StateMessage) responseFuture.get();
+        ZMI root = response.getZMI();
+        List<ZMI> siblings = getSiblings(root, path);
+        filterEmptyContacts(siblings);
+        if (siblings.isEmpty()) {
+            return selectFallbackContact();
+        }
+        ZMI zmi = selectZMI(siblings);
+        ValueSet contactsValue = (ValueSet) zmi.getAttributes().getOrNull("contacts");
+        Set<Value> valueSet = contactsValue.getValue();
+        return selectContactFromSet(valueSet);
+    }
+
+    private static ValueContact selectFallbackContact() throws Exception {
+        return selectContactFromSet(new HashSet());
+    }
+
+    private static ZMI selectZMI(List<ZMI> zmis) throws Exception {
+        int i = random.nextInt(zmis.size());
+        for (ZMI zmi : zmis) {
+            if (i == 0) {
+                return zmi;
+            }
+            i--;
+        }
+        System.out.println("ERROR: empty list passed to selectZMI");
+        throw new Exception("empty list passed to selectZMI");
+    }
+
+    private static ValueContact selectContactFromSet(Set<Value> contacts) throws Exception {
+        int i = random.nextInt(contacts.size());
+        for (Value contact : contacts) {
+            if (i == 0) {
+                return (ValueContact) contact;
+            }
+            i--;
+        }
+        System.out.println("ERROR: empty list passed to selectContactFromSet");
+        throw new Exception("empty list passed to selectContactFromSet");
+    }
+
+    private static List<ZMI> getSiblings(ZMI root, PathName path) {
+        try {
+            List<ZMI> siblingsAndI = root.findDescendant(path).getFather().getSons();
+            List<ZMI> siblings = new ArrayList();
+            for (ZMI siblingOrI : siblingsAndI) {
+                if (!siblingOrI.getPathName().equals(path)) {
+                    siblings.add(siblingOrI);
+                }
+            }
+            return siblings;
+        } catch (ZMI.NoSuchZoneException e) {
+            System.out.println("ERROR: didn't find path when looking for siblings");
+            return new ArrayList();
+        }
+    }
+
+    private static void filterEmptyContacts(List<ZMI> zmis) {
+        Iterator<ZMI> iterator = zmis.iterator();
+        while (iterator.hasNext()) {
+            ZMI zmi = iterator.next();
+            ValueSet contacts = (ValueSet) zmi.getAttributes().getOrNull("contacts");
+            if (contacts == null || contacts.isNull() || contacts.isEmpty()) {
+                iterator.remove();
+            }
+        }
+    }
+
+    private static void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period) {
+        TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier);
 
         try {
-            eventBus.addMessage(new TimerSchedulerMessage("", 0, "", queriesPeriod, 0, timerTask));
+            eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask));
         } catch (InterruptedException e) {
             System.out.println("Interrupted while starting queries");
         }
@@ -155,10 +265,13 @@ public class Agent {
     }
 
     public static void main(String[] args) {
+        zoneSelectionStrategy = GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ;
+        gossipGirlStrategies = new GossipGirlStrategies(new PathName("/uw/violet07"));
         runModulesAsThreads();
         runRegistry();
         initZones();
         // TODO: make query period confiurable with config file and from tests
-        startQueries(100l);
+        startQueries(6000l);
+        startGossip(5000l);
     }
 }
-- 
cgit v1.2.3