diff options
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas')
| -rw-r--r-- | src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java | 127 | 
1 files changed, 120 insertions, 7 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 62cd544..f79684e 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -7,24 +7,34 @@ import java.net.UnknownHostException;  import java.rmi.registry.LocateRegistry;  import java.rmi.registry.Registry;  import java.rmi.server.UnicastRemoteObject; +import java.util.List;  import java.util.ArrayList;  import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator;  import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture;  import java.util.function.Supplier;  import pl.edu.mimuw.cloudatlas.agent.NewApiImplementation; -import pl.edu.mimuw.cloudatlas.agent.messages.RunQueriesMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.*;  import pl.edu.mimuw.cloudatlas.agent.modules.*;  import pl.edu.mimuw.cloudatlas.agent.modules.Module;  import pl.edu.mimuw.cloudatlas.api.Api;  import pl.edu.mimuw.cloudatlas.interpreter.Main;  import pl.edu.mimuw.cloudatlas.model.PathName; +import pl.edu.mimuw.cloudatlas.model.Value; +import pl.edu.mimuw.cloudatlas.model.ValueContact; +import pl.edu.mimuw.cloudatlas.model.ValueSet;  import pl.edu.mimuw.cloudatlas.model.ZMI;  public class Agent {      private static EventBus eventBus; +    private static GossipGirlStrategies.ZoneSelectionStrategy zoneSelectionStrategy; +    private static GossipGirlStrategies gossipGirlStrategies; +    private static Random random = new Random();      public static void runRegistry() {          try { @@ -47,13 +57,13 @@ public class Agent {          Long freshnessPeriod = Long.getLong("freshness_period");          modules.put(ModuleType.STATE, new Stanik(freshnessPeriod));          modules.put(ModuleType.QUERY, new Qurnik()); +        modules.put(ModuleType.GOSSIP, new GossipGirl());          Integer port = Integer.getInteger("UDUPServer.port");          Integer timeout = Integer.getInteger("UDUPServer.timeout");          Integer bufsize = Integer.getInteger("UDUPServer.bufsize");          UDUPServer server = new UDUPServer(InetAddress.getByName("127.0.0.1"), port, bufsize);          modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server)); -        // TODO add modules as we implement them          return modules;      } @@ -133,10 +143,110 @@ public class Agent {                  }              }; -        TimerScheduledTask timerTask = new RecursiveScheduledTask(queriesPeriod, taskSupplier); +        startRecursiveTask(taskSupplier, queriesPeriod); +    } + +    private static void startGossip(long gossipPeriod) { +        Supplier<TimerScheduledTask> taskSupplier = () -> +            new TimerScheduledTask() { +                public void run() { +                    try { +                        System.out.println("INFO: initiating gossip"); +                        PathName gossipLevel = gossipGirlStrategies.selectStrategy(zoneSelectionStrategy); +                        ValueContact contact = selectContactFromLevel(gossipLevel); +                        if (contact != null) { +                            InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact); +                            sendMessage(message); +                        } else { +                            System.out.println("DEBUG: couldn't find contact for gossip"); +                        } +                    } catch (InterruptedException e) { +                        System.out.println("Interrupted while initiating gossip"); +                    } catch (Exception e) { +                        System.out.println("ERROR: something happened"); +                    } +                } +            }; + +        startRecursiveTask(taskSupplier, gossipPeriod); +    } + +    private static ValueContact selectContactFromLevel(PathName path) throws Exception { +        CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture(); +        eventBus.addMessage(new RequestStateMessage("", 0, responseFuture)); +        StateMessage response = (StateMessage) responseFuture.get(); +        ZMI root = response.getZMI(); +        List<ZMI> siblings = getSiblings(root, path); +        filterEmptyContacts(siblings); +        if (siblings.isEmpty()) { +            return selectFallbackContact(); +        } +        ZMI zmi = selectZMI(siblings); +        ValueSet contactsValue = (ValueSet) zmi.getAttributes().getOrNull("contacts"); +        Set<Value> valueSet = contactsValue.getValue(); +        return selectContactFromSet(valueSet); +    } + +    private static ValueContact selectFallbackContact() throws Exception { +        return selectContactFromSet(new HashSet()); +    } + +    private static ZMI selectZMI(List<ZMI> zmis) throws Exception { +        int i = random.nextInt(zmis.size()); +        for (ZMI zmi : zmis) { +            if (i == 0) { +                return zmi; +            } +            i--; +        } +        System.out.println("ERROR: empty list passed to selectZMI"); +        throw new Exception("empty list passed to selectZMI"); +    } + +    private static ValueContact selectContactFromSet(Set<Value> contacts) throws Exception { +        int i = random.nextInt(contacts.size()); +        for (Value contact : contacts) { +            if (i == 0) { +                return (ValueContact) contact; +            } +            i--; +        } +        System.out.println("ERROR: empty list passed to selectContactFromSet"); +        throw new Exception("empty list passed to selectContactFromSet"); +    } + +    private static List<ZMI> getSiblings(ZMI root, PathName path) { +        try { +            List<ZMI> siblingsAndI = root.findDescendant(path).getFather().getSons(); +            List<ZMI> siblings = new ArrayList(); +            for (ZMI siblingOrI : siblingsAndI) { +                if (!siblingOrI.getPathName().equals(path)) { +                    siblings.add(siblingOrI); +                } +            } +            return siblings; +        } catch (ZMI.NoSuchZoneException e) { +            System.out.println("ERROR: didn't find path when looking for siblings"); +            return new ArrayList(); +        } +    } + +    private static void filterEmptyContacts(List<ZMI> zmis) { +        Iterator<ZMI> iterator = zmis.iterator(); +        while (iterator.hasNext()) { +            ZMI zmi = iterator.next(); +            ValueSet contacts = (ValueSet) zmi.getAttributes().getOrNull("contacts"); +            if (contacts == null || contacts.isNull() || contacts.isEmpty()) { +                iterator.remove(); +            } +        } +    } + +    private static void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period) { +        TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier);          try { -            eventBus.addMessage(new TimerSchedulerMessage("", 0, "", queriesPeriod, 0, timerTask)); +            eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask));          } catch (InterruptedException e) {              System.out.println("Interrupted while starting queries");          } @@ -155,10 +265,13 @@ public class Agent {      }      public static void main(String[] args) { +        zoneSelectionStrategy = GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ; +        gossipGirlStrategies = new GossipGirlStrategies(new PathName("/uw/violet07"));          runModulesAsThreads();          runRegistry();          initZones();          // TODO: make query period confiurable with config file and from tests -        startQueries(100l); +        startQueries(6000l); +        startGossip(5000l);      }  }  |