diff options
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent')
7 files changed, 331 insertions, 253 deletions
| diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 58b55da..0cbda2d 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -1,280 +1,55 @@  package pl.edu.mimuw.cloudatlas.agent; -import java.net.Inet4Address; -import java.net.InetAddress; -import java.net.SocketException; -import java.net.UnknownHostException; -import java.rmi.registry.LocateRegistry; -import java.rmi.registry.Registry; -import java.rmi.server.UnicastRemoteObject; -import java.util.List; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; - -import pl.edu.mimuw.cloudatlas.agent.NewApiImplementation; -import pl.edu.mimuw.cloudatlas.agent.messages.*; -import pl.edu.mimuw.cloudatlas.agent.modules.*; -import pl.edu.mimuw.cloudatlas.agent.modules.Module; -import pl.edu.mimuw.cloudatlas.api.Api; +import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage;  import pl.edu.mimuw.cloudatlas.interpreter.Main;  import pl.edu.mimuw.cloudatlas.model.PathName; -import pl.edu.mimuw.cloudatlas.model.Value; -import pl.edu.mimuw.cloudatlas.model.ValueContact; -import pl.edu.mimuw.cloudatlas.model.ValueSet;  import pl.edu.mimuw.cloudatlas.model.ZMI;  public class Agent { -    private static EventBus eventBus; -    private static GossipGirlStrategies.ZoneSelectionStrategy zoneSelectionStrategy; -    private static GossipGirlStrategies gossipGirlStrategies; -    private static Random random = new Random(); -    public static void runRegistry() { +    private static void addZoneAndChildren(ZMI zmi, PathName pathName, EventBus eventBus) {          try { -            NewApiImplementation api = new NewApiImplementation(eventBus); -            Api apiStub = -                    (Api) UnicastRemoteObject.exportObject(api, 0); -            Registry registry = LocateRegistry.getRegistry(); -            registry.rebind("Api", apiStub); -            System.out.println("Agent: api bound"); +            UpdateAttributesMessage message = new UpdateAttributesMessage("", 0, pathName.toString(), zmi.getAttributes()); +            eventBus.addMessage(message); +            for (ZMI son : zmi.getSons()) { +                addZoneAndChildren(son, pathName.levelDown(son.getAttributes().getOrNull("name").toString()), eventBus); +            }          } catch (Exception e) { -            System.err.println("Agent registry initialization exception:"); -            e.printStackTrace(); -        } -    } - -    public static HashMap<ModuleType, Module> initializeModules() throws UnknownHostException, SocketException, NullPointerException { -        HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>(); -        modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER)); -        modules.put(ModuleType.RMI, new Remik()); -        Long freshnessPeriod = Long.getLong("freshness_period"); -        modules.put(ModuleType.STATE, new Stanik(freshnessPeriod)); -        modules.put(ModuleType.QUERY, new Qurnik()); -        modules.put(ModuleType.GOSSIP, new GossipGirl()); - -        Integer port = Integer.getInteger("UDUPServer.port"); -        Integer timeout = Integer.getInteger("UDUPServer.timeout"); -        Integer bufsize = Integer.getInteger("UDUPServer.bufsize"); -        UDUPServer server = new UDUPServer(InetAddress.getByName("127.0.0.1"), port, bufsize); -        modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server)); -        return modules; -    } - -    public static HashMap<ModuleType, Executor> initializeExecutors( -            HashMap<ModuleType, Module> modules) { -        HashMap<ModuleType, Executor> executors = new HashMap<ModuleType, Executor>(); - -        for (Map.Entry<ModuleType, Module> moduleEntry : modules.entrySet()) { -            Module module = moduleEntry.getValue(); -            Executor executor = new Executor(module); -            executors.put(moduleEntry.getKey(), executor); -        } - -        return executors; -    } - -    public static ArrayList<Thread>  initializeExecutorThreads(HashMap<ModuleType, Executor> executors) { -        ArrayList<Thread> executorThreads = new ArrayList<Thread>(); - -        for (Map.Entry<ModuleType, Executor> executorEntry : executors.entrySet()) { -            Thread thread = new Thread(executorEntry.getValue()); -            thread.setDaemon(true); -            System.out.println("Initializing executor " + executorEntry.getKey()); -            thread.start(); -            executorThreads.add(thread); -        } - -        return executorThreads; -    } - -    public static void closeExecutors(ArrayList<Thread> executorThreads) { -        for (Thread executorThread : executorThreads) { -            executorThread.interrupt(); -        } -    } - -    public static void runModulesAsThreads() { -        HashMap<ModuleType, Module> modules = null; - -        try { -            modules = initializeModules(); -        } catch (UnknownHostException | SocketException e) { -            System.out.println("Module initialization failed"); -            e.printStackTrace(); -            return; +            System.out.println("ERROR: failed to add zone");          } - -        HashMap<ModuleType, Executor> executors = initializeExecutors(modules); -        ArrayList<Thread> executorThreads = initializeExecutorThreads(executors); -        eventBus = new EventBus(executors); -        Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer()); -        Thread eventBusThread = new Thread(eventBus); -        System.out.println("Initializing event bus"); -        eventBusThread.start(); -        UDUPServerThread.start();      } -    private static void initZones() { +    public static void initZones(EventBus eventBus) {          try {              ZMI root = Main.createTestHierarchy2(); -            addZoneAndChildren(root, new PathName("")); +            addZoneAndChildren(root, new PathName(""), eventBus);              System.out.println("Initialized with test hierarchy");          } catch (Exception e) {              System.out.println("ERROR: failed to create test hierarchy");          }      } -    private static void startQueries(long queriesPeriod) { -        Supplier<TimerScheduledTask> taskSupplier = () -> -            new TimerScheduledTask() { -                public void run() { -                    try { -                        sendMessage(new RunQueriesMessage("", 0)); -                    } catch (InterruptedException e) { -                        System.out.println("Interrupted while triggering queries"); -                    } -                } -            }; - -        startRecursiveTask(taskSupplier, queriesPeriod); -    } - -    private static void startGossip(long gossipPeriod) { -        Supplier<TimerScheduledTask> taskSupplier = () -> -            new TimerScheduledTask() { -                public void run() { -                    try { -                        System.out.println("INFO: initiating gossip"); -                        PathName gossipLevel = gossipGirlStrategies.selectStrategy(zoneSelectionStrategy); -                        ValueContact contact = selectContactFromLevel(gossipLevel); -                        if (contact != null) { -                            System.out.println("INFO: found a contact " + contact.toString()); -                            InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact); -                            sendMessage(message); -                        } else { -                            System.out.println("DEBUG: couldn't find contact for gossip"); -                        } -                    } catch (InterruptedException e) { -                        System.out.println("Interrupted while initiating gossip"); -                    } catch (Exception e) { -                        System.out.println("ERROR: something happened " + e.toString()); -                    } -                } -            }; - -        startRecursiveTask(taskSupplier, gossipPeriod); -    } - -    private static ValueContact selectContactFromLevel(PathName path) throws Exception { -        CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture(); -        eventBus.addMessage(new RequestStateMessage("", 0, responseFuture)); -        StateMessage response = (StateMessage) responseFuture.get(); -        ZMI root = response.getZMI(); -        List<ZMI> siblings = getSiblings(root, path); -        filterEmptyContacts(siblings); -        if (siblings.isEmpty()) { -            return selectFallbackContact(); -        } -        ZMI zmi = selectZMI(siblings); -        ValueSet contactsValue = (ValueSet) zmi.getAttributes().getOrNull("contacts"); -        Set<Value> valueSet = contactsValue.getValue(); -        return selectContactFromSet(valueSet); -    } - -    private static ValueContact selectFallbackContact() throws Exception { -        return null; -    } - -    private static ZMI selectZMI(List<ZMI> zmis) throws Exception { -        int i = random.nextInt(zmis.size()); -        for (ZMI zmi : zmis) { -            if (i == 0) { -                return zmi; -            } -            i--; -        } -        System.out.println("ERROR: empty list passed to selectZMI"); -        throw new Exception("empty list passed to selectZMI"); -    } - -    private static ValueContact selectContactFromSet(Set<Value> contacts) throws Exception { -        int i = random.nextInt(contacts.size()); -        for (Value contact : contacts) { -            if (i == 0) { -                return (ValueContact) contact; -            } -            i--; -        } -        System.out.println("ERROR: empty list passed to selectContactFromSet"); -        throw new Exception("empty list passed to selectContactFromSet"); -    } - -    private static List<ZMI> getSiblings(ZMI root, PathName path) { -        try { -            List<ZMI> siblingsAndI = root.findDescendant(path).getFather().getSons(); -            List<ZMI> siblings = new ArrayList(); -            for (ZMI siblingOrI : siblingsAndI) { -                if (!siblingOrI.getPathName().equals(path)) { -                    siblings.add(siblingOrI); -                } -            } -            return siblings; -        } catch (ZMI.NoSuchZoneException e) { -            System.out.println("ERROR: didn't find path when looking for siblings"); -            return new ArrayList(); -        } -    } - -    private static void filterEmptyContacts(List<ZMI> zmis) { -        Iterator<ZMI> iterator = zmis.iterator(); -        while (iterator.hasNext()) { -            ZMI zmi = iterator.next(); -            ValueSet contacts = (ValueSet) zmi.getAttributes().getOrNull("contacts"); -            if (contacts == null || contacts.isNull() || contacts.isEmpty()) { -                iterator.remove(); -            } -        } -    } +    public static void main(String[] args) { +        AgentConfig agentConfig = new AgentConfig(); -    private static void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period) { -        TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier); +        agentConfig.runModulesAsThreads(); -        try { -            eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask)); -        } catch (InterruptedException e) { -            System.out.println("Interrupted while starting queries"); -        } -    } +        EventBus eventBus = new EventBus(agentConfig.getExecutors()); +        agentConfig.runRegistry(eventBus); +        agentConfig.startNonModuleThreads(eventBus); -    private static void addZoneAndChildren(ZMI zmi, PathName pathName) { -        try { -            UpdateAttributesMessage message = new UpdateAttributesMessage("", 0, pathName.toString(), zmi.getAttributes()); -            eventBus.addMessage(message); -            for (ZMI son : zmi.getSons()) { -                addZoneAndChildren(son, pathName.levelDown(son.getAttributes().getOrNull("name").toString())); -            } -        } catch (Exception e) { -            System.out.println("ERROR: failed to add zone"); -        } -    } +        // initZones(eventBus); -    public static void main(String[] args) { -        zoneSelectionStrategy = GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ; -        gossipGirlStrategies = new GossipGirlStrategies(new PathName("/uw/violet07")); -        runModulesAsThreads(); -        runRegistry(); -        initZones();          // TODO: make query period confiurable with config file and from tests + +        // TODO config setup +        String zonePath = System.getProperty("zone_path"); +        String selectionStrategy = System.getProperty("Gossip.zone_strategy");          Long queryPeriod = Long.getLong("query_period"); -        startQueries(queryPeriod);          Long gossipPeriod = Long.getLong("gossip_period"); -        startGossip(gossipPeriod); + +        HierarchyConfig hierarchyConfig = new HierarchyConfig(eventBus, zonePath, selectionStrategy); +        hierarchyConfig.startQueries(queryPeriod); +        hierarchyConfig.startGossip(gossipPeriod);      }  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java new file mode 100644 index 0000000..38d764a --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java @@ -0,0 +1,112 @@ +package pl.edu.mimuw.cloudatlas.agent; + +import pl.edu.mimuw.cloudatlas.agent.modules.*; +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.api.Api; + +import java.net.InetAddress; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.rmi.registry.LocateRegistry; +import java.rmi.registry.Registry; +import java.rmi.server.UnicastRemoteObject; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +public class AgentConfig { +    private HashMap<ModuleType, Executor> executors; +    HashMap<ModuleType, Module> modules; + +    public HashMap<ModuleType, Executor> getExecutors() { +        return executors; +    } + +    public void runRegistry(EventBus eventBus) { +        try { +            NewApiImplementation api = new NewApiImplementation(eventBus); +            Api apiStub = +                    (Api) UnicastRemoteObject.exportObject(api, 0); +            Registry registry = LocateRegistry.getRegistry(); +            registry.rebind("Api", apiStub); +            System.out.println("Agent: api bound"); +        } catch (Exception e) { +            System.err.println("Agent registry initialization exception:"); +            e.printStackTrace(); +        } +    } + +    private HashMap<ModuleType, Module> initializeModules() throws UnknownHostException, SocketException, NullPointerException { +        // TODO config setup +        Long freshnessPeriod = Long.getLong("freshness_period"); +        Integer port = Integer.getInteger("UDUPServer.port"); +        Integer timeout = Integer.getInteger("UDUPServer.timeout"); +        Integer bufsize = Integer.getInteger("UDUPServer.bufsize"); +        InetAddress serverAddr = InetAddress.getByName(System.getProperty("UDUPServer.hostname")); + +        HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>(); +        modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER)); +        modules.put(ModuleType.RMI, new Remik()); +        modules.put(ModuleType.STATE, new Stanik(freshnessPeriod)); +        modules.put(ModuleType.QUERY, new Qurnik()); +        modules.put(ModuleType.GOSSIP, new GossipGirl()); + +        UDUPServer server = new UDUPServer(serverAddr, port, bufsize); +        modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server)); +        return modules; +    } + +    public static HashMap<ModuleType, Executor> initializeExecutors( +            HashMap<ModuleType, Module> modules) { +        HashMap<ModuleType, Executor> executors = new HashMap<ModuleType, Executor>(); + +        for (Map.Entry<ModuleType, Module> moduleEntry : modules.entrySet()) { +            Module module = moduleEntry.getValue(); +            Executor executor = new Executor(module); +            executors.put(moduleEntry.getKey(), executor); +        } + +        return executors; +    } + +    public static ArrayList<Thread> initializeExecutorThreads(HashMap<ModuleType, Executor> executors) { +        ArrayList<Thread> executorThreads = new ArrayList<Thread>(); + +        for (Map.Entry<ModuleType, Executor> executorEntry : executors.entrySet()) { +            Thread thread = new Thread(executorEntry.getValue()); +            thread.setDaemon(true); +            System.out.println("Initializing executor " + executorEntry.getKey()); +            thread.start(); +            executorThreads.add(thread); +        } + +        return executorThreads; +    } + +    public void closeExecutors(ArrayList<Thread> executorThreads) { +        for (Thread executorThread : executorThreads) { +            executorThread.interrupt(); +        } +    } + +    public void runModulesAsThreads() { +        try { +            modules = initializeModules(); +        } catch (UnknownHostException | SocketException e) { +            System.out.println("Module initialization failed"); +            e.printStackTrace(); +            return; +        } + +        executors = initializeExecutors(modules); +        ArrayList<Thread> executorThreads = initializeExecutorThreads(executors); +    } + +    void startNonModuleThreads(EventBus eventBus) { +        Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer()); +        Thread eventBusThread = new Thread(eventBus); +        System.out.println("Initializing event bus"); +        eventBusThread.start(); +        UDUPServerThread.start(); +    } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java new file mode 100644 index 0000000..62d4bcd --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentUtils.java @@ -0,0 +1,19 @@ +package pl.edu.mimuw.cloudatlas.agent; + +import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; +import pl.edu.mimuw.cloudatlas.agent.modules.RecursiveScheduledTask; +import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask; + +import java.util.function.Supplier; + +public class AgentUtils { + +    public static void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period, EventBus eventBus) { +        TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier); +        try { +            eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask)); +        } catch (InterruptedException e) { +            System.out.println("Interrupted while starting queries"); +        } +    } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java index a6d3b2d..0e972f4 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java @@ -22,6 +22,7 @@ public class EventBus implements Runnable {      // Allows for testing with a mock EventBus      protected EventBus() { +        this.events = new LinkedBlockingQueue<AgentMessage>();      }      EventBus(HashMap<ModuleType, Executor> executors) { @@ -30,6 +31,10 @@ public class EventBus implements Runnable {          this.events = new LinkedBlockingQueue<AgentMessage>();      } +    public void setEvents(LinkedBlockingQueue<AgentMessage> events) { +        this.events = events; +    } +      public void run() {          System.out.println("Event bus running");          while (!Thread.currentThread().interrupted()) { @@ -49,6 +54,7 @@ public class EventBus implements Runnable {      }      public void addMessage(AgentMessage msg) throws InterruptedException { +        System.out.println("INFO: message added for " + msg.getDestinationModule().toString());          this.events.put(msg);      }  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java new file mode 100644 index 0000000..dc5241d --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java @@ -0,0 +1,156 @@ +package pl.edu.mimuw.cloudatlas.agent; + +import pl.edu.mimuw.cloudatlas.agent.messages.*; +import pl.edu.mimuw.cloudatlas.agent.modules.GossipGirlStrategies; +import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask; +import pl.edu.mimuw.cloudatlas.model.*; + +import java.lang.reflect.UndeclaredThrowableException; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +public class HierarchyConfig { +    private GossipGirlStrategies gossipGirlStrategies; +    private GossipGirlStrategies.ZoneSelectionStrategy zoneSelectionStrategy; +    private Random random = new Random(); +    private EventBus eventBus; + +    HierarchyConfig(EventBus eventBus, String zonePath, String zoneStrategy) { +        zoneSelectionStrategy = parseStrategy(zoneStrategy); +        gossipGirlStrategies = new GossipGirlStrategies(new PathName(zonePath)); +        this.eventBus = eventBus; +    } + +    private GossipGirlStrategies.ZoneSelectionStrategy parseStrategy(String selectionStrategy) { +        switch (selectionStrategy) { +            case "RoundRobinExp": +                return GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_EXP_FREQ; +            case "RoundRobinUniform": +                return GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ; +            case "RandomExp": +                return GossipGirlStrategies.ZoneSelectionStrategy.RANDOM_DECR_EXP; +            case "RandomUniform": +                return GossipGirlStrategies.ZoneSelectionStrategy.RANDOM_UNFIORM; +            default: +                throw new UnsupportedOperationException("Selection strategy doesnt exist"); +        } +    } + +    public void startGossip(long gossipPeriod) { +        Supplier<TimerScheduledTask> taskSupplier = () -> +                new TimerScheduledTask() { +                    public void run() { +                        try { +                            System.out.println("INFO: initiating gossip"); +                            PathName gossipLevel = gossipGirlStrategies.selectStrategy(zoneSelectionStrategy); +                            ValueContact contact = selectContactFromLevel(gossipLevel); +                            if (contact != null) { +                                System.out.println("INFO: found a contact " + contact.toString()); +                                InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact); +                                sendMessage(message); +                            } else { +                                System.out.println("DEBUG: couldn't find contact for gossip"); +                            } +                        } catch (InterruptedException e) { +                            System.out.println("Interrupted while initiating gossip"); +                        } catch (Exception e) { +                            System.out.println("ERROR: something happened " + e.toString()); +                        } +                    } +                }; + +        AgentUtils.startRecursiveTask(taskSupplier, gossipPeriod, eventBus); +    } + +    private ValueContact selectContactFromLevel(PathName path) throws Exception { +        CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture(); +        this.eventBus.addMessage(new RequestStateMessage("", 0, responseFuture)); +        StateMessage response = (StateMessage) responseFuture.get(); +        ZMI root = response.getZMI(); +        List<ZMI> siblings = getSiblings(root, path); +        filterEmptyContacts(siblings); +        if (siblings.isEmpty()) { +            return selectFallbackContact(response.getContacts()); +        } +        ZMI zmi = selectZMI(siblings); +        ValueSet contactsValue = (ValueSet) zmi.getAttributes().getOrNull("contacts"); +        Set<Value> valueSet = contactsValue.getValue(); +        return selectContactFromSet(valueSet); +    } + +    // TODO +    private ValueContact selectFallbackContact(Set<ValueContact> contacts) throws Exception { +        if (contacts.isEmpty()) { +            return null; +        } else { +            return selectContactFromSet(contacts); +        } +    } + +    private ZMI selectZMI(List<ZMI> zmis) throws Exception { +        int i = random.nextInt(zmis.size()); +        for (ZMI zmi : zmis) { +            if (i == 0) { +                return zmi; +            } +            i--; +        } +        System.out.println("ERROR: empty list passed to selectZMI"); +        throw new Exception("empty list passed to selectZMI"); +    } + +    private <T> ValueContact selectContactFromSet(Set<T> contacts) throws Exception { +        int i = random.nextInt(contacts.size()); +        for (T contact : contacts) { +            if (i == 0) { +                return (ValueContact) contact; +            } +            i--; +        } +        System.out.println("ERROR: empty list passed to selectContactFromSet"); +        throw new Exception("empty list passed to selectContactFromSet"); +    } + +    private List<ZMI> getSiblings(ZMI root, PathName path) { +        try { +            List<ZMI> siblingsAndI = root.findDescendant(path).getFather().getSons(); +            List<ZMI> siblings = new ArrayList(); +            for (ZMI siblingOrI : siblingsAndI) { +                if (!siblingOrI.getPathName().equals(path)) { +                    siblings.add(siblingOrI); +                } +            } +            return siblings; +        } catch (ZMI.NoSuchZoneException e) { +            System.out.println("ERROR: didn't find path when looking for siblings"); +            return new ArrayList(); +        } +    } + +    private void filterEmptyContacts(List<ZMI> zmis) { +        Iterator<ZMI> iterator = zmis.iterator(); +        while (iterator.hasNext()) { +            ZMI zmi = iterator.next(); +            ValueSet contacts = (ValueSet) zmi.getAttributes().getOrNull("contacts"); +            if (contacts == null || contacts.isNull() || contacts.isEmpty()) { +                iterator.remove(); +            } +        } +    } + +    public void startQueries(long queriesPeriod) { +        Supplier<TimerScheduledTask> taskSupplier = () -> +                new TimerScheduledTask() { +                    public void run() { +                        try { +                            sendMessage(new RunQueriesMessage("", 0)); +                        } catch (InterruptedException e) { +                            System.out.println("Interrupted while triggering queries"); +                        } +                    } +                }; + +        AgentUtils.startRecursiveTask(taskSupplier, queriesPeriod, eventBus); +    } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java index 759723b..806d41f 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java @@ -2,9 +2,11 @@ package pl.edu.mimuw.cloudatlas.agent.messages;  import java.util.Map;  import java.util.Map.Entry; +import java.util.Set;  import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;  import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.ValueContact;  import pl.edu.mimuw.cloudatlas.model.ValueQuery;  import pl.edu.mimuw.cloudatlas.model.ValueTime;  import pl.edu.mimuw.cloudatlas.model.ZMI; @@ -12,11 +14,13 @@ import pl.edu.mimuw.cloudatlas.model.ZMI;  public class StateMessage extends ResponseMessage {      private ZMI zmi;      private Map<Attribute, Entry<ValueQuery, ValueTime>> queries; +    private Set<ValueContact> contacts; -    public StateMessage(String messageId, ModuleType destinationModule, long timestamp, long requestId, ZMI zmi, Map<Attribute, Entry<ValueQuery, ValueTime>> queries) { +    public StateMessage(String messageId, ModuleType destinationModule, long timestamp, long requestId, ZMI zmi, Map<Attribute, Entry<ValueQuery, ValueTime>> queries, Set<ValueContact> contacts) {          super(messageId, destinationModule, timestamp, Type.STATE, requestId);          this.zmi = zmi;          this.queries = queries; +        this.contacts = contacts;      }      public StateMessage() {} @@ -28,4 +32,8 @@ public class StateMessage extends ResponseMessage {      public Map<Attribute, Entry<ValueQuery, ValueTime>> getQueries() {          return queries;      } + +    public Set<ValueContact> getContacts() { +        return contacts; +    }  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java index c713827..999c193 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java @@ -53,8 +53,9 @@ public class Stanik extends Module {                  break;              case UPDATE_CONTACTS:                  handleUpdateContacts((UpdateContactsMessage) message); +                break;              default: -                throw new InvalidMessageType("This type of message cannot be handled by Stanik"); +                throw new InvalidMessageType("This type of message cannot be handled by Stanik" + message.getType().toString());          }      } @@ -66,7 +67,8 @@ public class Stanik extends Module {              0,              message.getRequestId(),              hierarchy.clone(), -            (HashMap<Attribute, Entry<ValueQuery, ValueTime>>) queries.clone() +            (HashMap<Attribute, Entry<ValueQuery, ValueTime>>) queries.clone(), +            contacts          );          sendMessage(response);      } |