diff options
Diffstat (limited to 'src/main/java')
3 files changed, 292 insertions, 270 deletions
| diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 7f2ee1f..690198b 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -1,281 +1,19 @@  package pl.edu.mimuw.cloudatlas.agent; -import java.net.Inet4Address; -import java.net.InetAddress; -import java.net.SocketException; -import java.net.UnknownHostException; -import java.rmi.registry.LocateRegistry; -import java.rmi.registry.Registry; -import java.rmi.server.UnicastRemoteObject; -import java.util.List; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; - -import pl.edu.mimuw.cloudatlas.agent.NewApiImplementation; -import pl.edu.mimuw.cloudatlas.agent.messages.*; -import pl.edu.mimuw.cloudatlas.agent.modules.*; -import pl.edu.mimuw.cloudatlas.agent.modules.Module; -import pl.edu.mimuw.cloudatlas.api.Api; -import pl.edu.mimuw.cloudatlas.interpreter.Main; -import pl.edu.mimuw.cloudatlas.model.PathName; -import pl.edu.mimuw.cloudatlas.model.Value; -import pl.edu.mimuw.cloudatlas.model.ValueContact; -import pl.edu.mimuw.cloudatlas.model.ValueSet; -import pl.edu.mimuw.cloudatlas.model.ZMI; -  public class Agent { -    private static EventBus eventBus; -    private static GossipGirlStrategies.ZoneSelectionStrategy zoneSelectionStrategy; -    private static GossipGirlStrategies gossipGirlStrategies; -    private static Random random = new Random(); - -    public static void runRegistry() { -        try { -            NewApiImplementation api = new NewApiImplementation(eventBus); -            Api apiStub = -                    (Api) UnicastRemoteObject.exportObject(api, 0); -            Registry registry = LocateRegistry.getRegistry(); -            registry.rebind("Api", apiStub); -            System.out.println("Agent: api bound"); -        } catch (Exception e) { -            System.err.println("Agent registry initialization exception:"); -            e.printStackTrace(); -        } -    } - -    public static HashMap<ModuleType, Module> initializeModules() throws UnknownHostException, SocketException, NullPointerException { -        HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>(); -        modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER)); -        modules.put(ModuleType.RMI, new Remik()); -        Long freshnessPeriod = Long.getLong("freshness_period"); -        modules.put(ModuleType.STATE, new Stanik(freshnessPeriod)); -        modules.put(ModuleType.QUERY, new Qurnik()); -        modules.put(ModuleType.GOSSIP, new GossipGirl()); - -        Integer port = Integer.getInteger("UDUPServer.port"); -        Integer timeout = Integer.getInteger("UDUPServer.timeout"); -        Integer bufsize = Integer.getInteger("UDUPServer.bufsize"); -        InetAddress serverAddr = InetAddress.getByName(System.getProperty("UDUPServer.hostname")); -        UDUPServer server = new UDUPServer(serverAddr, port, bufsize); -        modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server)); -        return modules; -    } - -    public static HashMap<ModuleType, Executor> initializeExecutors( -            HashMap<ModuleType, Module> modules) { -        HashMap<ModuleType, Executor> executors = new HashMap<ModuleType, Executor>(); - -        for (Map.Entry<ModuleType, Module> moduleEntry : modules.entrySet()) { -            Module module = moduleEntry.getValue(); -            Executor executor = new Executor(module); -            executors.put(moduleEntry.getKey(), executor); -        } - -        return executors; -    } - -    public static ArrayList<Thread>  initializeExecutorThreads(HashMap<ModuleType, Executor> executors) { -        ArrayList<Thread> executorThreads = new ArrayList<Thread>(); - -        for (Map.Entry<ModuleType, Executor> executorEntry : executors.entrySet()) { -            Thread thread = new Thread(executorEntry.getValue()); -            thread.setDaemon(true); -            System.out.println("Initializing executor " + executorEntry.getKey()); -            thread.start(); -            executorThreads.add(thread); -        } - -        return executorThreads; -    } - -    public static void closeExecutors(ArrayList<Thread> executorThreads) { -        for (Thread executorThread : executorThreads) { -            executorThread.interrupt(); -        } -    } - -    public static void runModulesAsThreads() { -        HashMap<ModuleType, Module> modules = null; - -        try { -            modules = initializeModules(); -        } catch (UnknownHostException | SocketException e) { -            System.out.println("Module initialization failed"); -            e.printStackTrace(); -            return; -        } - -        HashMap<ModuleType, Executor> executors = initializeExecutors(modules); -        ArrayList<Thread> executorThreads = initializeExecutorThreads(executors); -        eventBus = new EventBus(executors); -        Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer()); -        Thread eventBusThread = new Thread(eventBus); -        System.out.println("Initializing event bus"); -        eventBusThread.start(); -        UDUPServerThread.start(); -    } - -    private static void initZones() { -        try { -            ZMI root = Main.createTestHierarchy2(); -            addZoneAndChildren(root, new PathName("")); -            System.out.println("Initialized with test hierarchy"); -        } catch (Exception e) { -            System.out.println("ERROR: failed to create test hierarchy"); -        } -    } -    private static void startQueries(long queriesPeriod) { -        Supplier<TimerScheduledTask> taskSupplier = () -> -            new TimerScheduledTask() { -                public void run() { -                    try { -                        sendMessage(new RunQueriesMessage("", 0)); -                    } catch (InterruptedException e) { -                        System.out.println("Interrupted while triggering queries"); -                    } -                } -            }; - -        startRecursiveTask(taskSupplier, queriesPeriod); -    } - -    private static void startGossip(long gossipPeriod) { -        Supplier<TimerScheduledTask> taskSupplier = () -> -            new TimerScheduledTask() { -                public void run() { -                    try { -                        System.out.println("INFO: initiating gossip"); -                        PathName gossipLevel = gossipGirlStrategies.selectStrategy(zoneSelectionStrategy); -                        ValueContact contact = selectContactFromLevel(gossipLevel); -                        if (contact != null) { -                            System.out.println("INFO: found a contact " + contact.toString()); -                            InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact); -                            sendMessage(message); -                        } else { -                            System.out.println("DEBUG: couldn't find contact for gossip"); -                        } -                    } catch (InterruptedException e) { -                        System.out.println("Interrupted while initiating gossip"); -                    } catch (Exception e) { -                        System.out.println("ERROR: something happened " + e.toString()); -                    } -                } -            }; - -        startRecursiveTask(taskSupplier, gossipPeriod); -    } - -    private static ValueContact selectContactFromLevel(PathName path) throws Exception { -        CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture(); -        eventBus.addMessage(new RequestStateMessage("", 0, responseFuture)); -        StateMessage response = (StateMessage) responseFuture.get(); -        ZMI root = response.getZMI(); -        List<ZMI> siblings = getSiblings(root, path); -        filterEmptyContacts(siblings); -        if (siblings.isEmpty()) { -            return selectFallbackContact(); -        } -        ZMI zmi = selectZMI(siblings); -        ValueSet contactsValue = (ValueSet) zmi.getAttributes().getOrNull("contacts"); -        Set<Value> valueSet = contactsValue.getValue(); -        return selectContactFromSet(valueSet); -    } - -    private static ValueContact selectFallbackContact() throws Exception { -        return null; -    } - -    private static ZMI selectZMI(List<ZMI> zmis) throws Exception { -        int i = random.nextInt(zmis.size()); -        for (ZMI zmi : zmis) { -            if (i == 0) { -                return zmi; -            } -            i--; -        } -        System.out.println("ERROR: empty list passed to selectZMI"); -        throw new Exception("empty list passed to selectZMI"); -    } - -    private static ValueContact selectContactFromSet(Set<Value> contacts) throws Exception { -        int i = random.nextInt(contacts.size()); -        for (Value contact : contacts) { -            if (i == 0) { -                return (ValueContact) contact; -            } -            i--; -        } -        System.out.println("ERROR: empty list passed to selectContactFromSet"); -        throw new Exception("empty list passed to selectContactFromSet"); -    } - -    private static List<ZMI> getSiblings(ZMI root, PathName path) { -        try { -            List<ZMI> siblingsAndI = root.findDescendant(path).getFather().getSons(); -            List<ZMI> siblings = new ArrayList(); -            for (ZMI siblingOrI : siblingsAndI) { -                if (!siblingOrI.getPathName().equals(path)) { -                    siblings.add(siblingOrI); -                } -            } -            return siblings; -        } catch (ZMI.NoSuchZoneException e) { -            System.out.println("ERROR: didn't find path when looking for siblings"); -            return new ArrayList(); -        } -    } - -    private static void filterEmptyContacts(List<ZMI> zmis) { -        Iterator<ZMI> iterator = zmis.iterator(); -        while (iterator.hasNext()) { -            ZMI zmi = iterator.next(); -            ValueSet contacts = (ValueSet) zmi.getAttributes().getOrNull("contacts"); -            if (contacts == null || contacts.isNull() || contacts.isEmpty()) { -                iterator.remove(); -            } -        } -    } - -    private static void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period) { -        TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier); +    public static void main(String[] args) { +        AgentConfig agentConfig = new AgentConfig(); -        try { -            eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask)); -        } catch (InterruptedException e) { -            System.out.println("Interrupted while starting queries"); -        } -    } +        agentConfig.runModulesAsThreads(); +        agentConfig.runRegistry(); -    private static void addZoneAndChildren(ZMI zmi, PathName pathName) { -        try { -            UpdateAttributesMessage message = new UpdateAttributesMessage("", 0, pathName.toString(), zmi.getAttributes()); -            eventBus.addMessage(message); -            for (ZMI son : zmi.getSons()) { -                addZoneAndChildren(son, pathName.levelDown(son.getAttributes().getOrNull("name").toString())); -            } -        } catch (Exception e) { -            System.out.println("ERROR: failed to add zone"); -        } -    } - -    public static void main(String[] args) { -        zoneSelectionStrategy = GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ; -        gossipGirlStrategies = new GossipGirlStrategies(new PathName("/uw/violet07")); -        runModulesAsThreads(); -        runRegistry(); -        initZones(); +        HierarchyConfig hierarchyConfig = new HierarchyConfig(agentConfig.getEventBus()); +        hierarchyConfig.initZones();          // TODO: make query period confiurable with config file and from tests          Long queryPeriod = Long.getLong("query_period"); -        startQueries(queryPeriod); +        hierarchyConfig.startQueries(queryPeriod);          Long gossipPeriod = Long.getLong("gossip_period"); -        startGossip(gossipPeriod); +        hierarchyConfig.startGossip(gossipPeriod);      }  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java new file mode 100644 index 0000000..b49525e --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/AgentConfig.java @@ -0,0 +1,113 @@ +package pl.edu.mimuw.cloudatlas.agent; + +import pl.edu.mimuw.cloudatlas.agent.modules.*; +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.api.Api; + +import java.net.InetAddress; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.rmi.registry.LocateRegistry; +import java.rmi.registry.Registry; +import java.rmi.server.UnicastRemoteObject; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +public class AgentConfig { +    private EventBus eventBus; + +    AgentConfig() {} + +    public EventBus getEventBus() { +        return eventBus; +    } + +    public void runRegistry() { +        try { +            NewApiImplementation api = new NewApiImplementation(eventBus); +            Api apiStub = +                    (Api) UnicastRemoteObject.exportObject(api, 0); +            Registry registry = LocateRegistry.getRegistry(); +            registry.rebind("Api", apiStub); +            System.out.println("Agent: api bound"); +        } catch (Exception e) { +            System.err.println("Agent registry initialization exception:"); +            e.printStackTrace(); +        } +    } + +    private HashMap<ModuleType, Module> initializeModules() throws UnknownHostException, SocketException, NullPointerException { +        HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>(); +        modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER)); +        modules.put(ModuleType.RMI, new Remik()); +        Long freshnessPeriod = Long.getLong("freshness_period"); +        modules.put(ModuleType.STATE, new Stanik(freshnessPeriod)); +        modules.put(ModuleType.QUERY, new Qurnik()); +        modules.put(ModuleType.GOSSIP, new GossipGirl()); + +        Integer port = Integer.getInteger("UDUPServer.port"); +        Integer timeout = Integer.getInteger("UDUPServer.timeout"); +        Integer bufsize = Integer.getInteger("UDUPServer.bufsize"); +        InetAddress serverAddr = InetAddress.getByName(System.getProperty("UDUPServer.hostname")); +        UDUPServer server = new UDUPServer(serverAddr, port, bufsize); +        modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server)); +        return modules; +    } + +    private HashMap<ModuleType, Executor> initializeExecutors( +            HashMap<ModuleType, Module> modules) { +        HashMap<ModuleType, Executor> executors = new HashMap<ModuleType, Executor>(); + +        for (Map.Entry<ModuleType, Module> moduleEntry : modules.entrySet()) { +            Module module = moduleEntry.getValue(); +            Executor executor = new Executor(module); +            executors.put(moduleEntry.getKey(), executor); +        } + +        return executors; +    } + +    private ArrayList<Thread> initializeExecutorThreads(HashMap<ModuleType, Executor> executors) { +        ArrayList<Thread> executorThreads = new ArrayList<Thread>(); + +        for (Map.Entry<ModuleType, Executor> executorEntry : executors.entrySet()) { +            Thread thread = new Thread(executorEntry.getValue()); +            thread.setDaemon(true); +            System.out.println("Initializing executor " + executorEntry.getKey()); +            thread.start(); +            executorThreads.add(thread); +        } + +        return executorThreads; +    } + +    public void closeExecutors(ArrayList<Thread> executorThreads) { +        for (Thread executorThread : executorThreads) { +            executorThread.interrupt(); +        } +    } + +    public void runModulesAsThreads() { +        HashMap<ModuleType, Module> modules = null; + +        try { +            modules = initializeModules(); +        } catch (UnknownHostException | SocketException e) { +            System.out.println("Module initialization failed"); +            e.printStackTrace(); +            return; +        } + +        HashMap<ModuleType, Executor> executors = initializeExecutors(modules); +        ArrayList<Thread> executorThreads = initializeExecutorThreads(executors); +        eventBus = new EventBus(executors); +        Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer()); +        Thread eventBusThread = new Thread(eventBus); +        System.out.println("Initializing event bus"); +        eventBusThread.start(); +        UDUPServerThread.start(); +    } + + +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java new file mode 100644 index 0000000..1ee9ac2 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java @@ -0,0 +1,171 @@ +package pl.edu.mimuw.cloudatlas.agent; + +import pl.edu.mimuw.cloudatlas.agent.messages.*; +import pl.edu.mimuw.cloudatlas.agent.modules.GossipGirl; +import pl.edu.mimuw.cloudatlas.agent.modules.GossipGirlStrategies; +import pl.edu.mimuw.cloudatlas.agent.modules.RecursiveScheduledTask; +import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask; +import pl.edu.mimuw.cloudatlas.interpreter.Main; +import pl.edu.mimuw.cloudatlas.model.*; + +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +public class HierarchyConfig { +    private GossipGirlStrategies gossipGirlStrategies; +    private GossipGirlStrategies.ZoneSelectionStrategy zoneSelectionStrategy; +    private Random random = new Random(); +    private EventBus eventBus; + +    HierarchyConfig(EventBus eventBus) { +        zoneSelectionStrategy = GossipGirlStrategies.ZoneSelectionStrategy.ROUND_ROBIN_SAME_FREQ; +        gossipGirlStrategies = new GossipGirlStrategies(new PathName("/uw/violet07")); +        this.eventBus = eventBus; +    } + +    public void startGossip(long gossipPeriod) { +        Supplier<TimerScheduledTask> taskSupplier = () -> +                new TimerScheduledTask() { +                    public void run() { +                        try { +                            System.out.println("INFO: initiating gossip"); +                            PathName gossipLevel = gossipGirlStrategies.selectStrategy(zoneSelectionStrategy); +                            ValueContact contact = selectContactFromLevel(gossipLevel); +                            if (contact != null) { +                                System.out.println("INFO: found a contact " + contact.toString()); +                                InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact); +                                sendMessage(message); +                            } else { +                                System.out.println("DEBUG: couldn't find contact for gossip"); +                            } +                        } catch (InterruptedException e) { +                            System.out.println("Interrupted while initiating gossip"); +                        } catch (Exception e) { +                            System.out.println("ERROR: something happened " + e.toString()); +                        } +                    } +                }; + +        startRecursiveTask(taskSupplier, gossipPeriod); +    } + +    private ValueContact selectContactFromLevel(PathName path) throws Exception { +        CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture(); +        this.eventBus.addMessage(new RequestStateMessage("", 0, responseFuture)); +        StateMessage response = (StateMessage) responseFuture.get(); +        ZMI root = response.getZMI(); +        List<ZMI> siblings = getSiblings(root, path); +        filterEmptyContacts(siblings); +        if (siblings.isEmpty()) { +            return selectFallbackContact(); +        } +        ZMI zmi = selectZMI(siblings); +        ValueSet contactsValue = (ValueSet) zmi.getAttributes().getOrNull("contacts"); +        Set<Value> valueSet = contactsValue.getValue(); +        return selectContactFromSet(valueSet); +    } + +    // TODO +    private ValueContact selectFallbackContact() throws Exception { +        return null; +    } + +    private ZMI selectZMI(List<ZMI> zmis) throws Exception { +        int i = random.nextInt(zmis.size()); +        for (ZMI zmi : zmis) { +            if (i == 0) { +                return zmi; +            } +            i--; +        } +        System.out.println("ERROR: empty list passed to selectZMI"); +        throw new Exception("empty list passed to selectZMI"); +    } + +    private ValueContact selectContactFromSet(Set<Value> contacts) throws Exception { +        int i = random.nextInt(contacts.size()); +        for (Value contact : contacts) { +            if (i == 0) { +                return (ValueContact) contact; +            } +            i--; +        } +        System.out.println("ERROR: empty list passed to selectContactFromSet"); +        throw new Exception("empty list passed to selectContactFromSet"); +    } + +    private List<ZMI> getSiblings(ZMI root, PathName path) { +        try { +            List<ZMI> siblingsAndI = root.findDescendant(path).getFather().getSons(); +            List<ZMI> siblings = new ArrayList(); +            for (ZMI siblingOrI : siblingsAndI) { +                if (!siblingOrI.getPathName().equals(path)) { +                    siblings.add(siblingOrI); +                } +            } +            return siblings; +        } catch (ZMI.NoSuchZoneException e) { +            System.out.println("ERROR: didn't find path when looking for siblings"); +            return new ArrayList(); +        } +    } + +    private void filterEmptyContacts(List<ZMI> zmis) { +        Iterator<ZMI> iterator = zmis.iterator(); +        while (iterator.hasNext()) { +            ZMI zmi = iterator.next(); +            ValueSet contacts = (ValueSet) zmi.getAttributes().getOrNull("contacts"); +            if (contacts == null || contacts.isNull() || contacts.isEmpty()) { +                iterator.remove(); +            } +        } +    } + +    public void startRecursiveTask(Supplier<TimerScheduledTask> taskSupplier, long period) { +        TimerScheduledTask timerTask = new RecursiveScheduledTask(period, taskSupplier); + +        try { +            this.eventBus.addMessage(new TimerSchedulerMessage("", 0, "", period, 0, timerTask)); +        } catch (InterruptedException e) { +            System.out.println("Interrupted while starting queries"); +        } +    } + +    private void addZoneAndChildren(ZMI zmi, PathName pathName) { +        try { +            UpdateAttributesMessage message = new UpdateAttributesMessage("", 0, pathName.toString(), zmi.getAttributes()); +            this.eventBus.addMessage(message); +            for (ZMI son : zmi.getSons()) { +                addZoneAndChildren(son, pathName.levelDown(son.getAttributes().getOrNull("name").toString())); +            } +        } catch (Exception e) { +            System.out.println("ERROR: failed to add zone"); +        } +    } + +    public void initZones() { +        try { +            ZMI root = Main.createTestHierarchy2(); +            addZoneAndChildren(root, new PathName("")); +            System.out.println("Initialized with test hierarchy"); +        } catch (Exception e) { +            System.out.println("ERROR: failed to create test hierarchy"); +        } +    } + +    public void startQueries(long queriesPeriod) { +        Supplier<TimerScheduledTask> taskSupplier = () -> +                new TimerScheduledTask() { +                    public void run() { +                        try { +                            sendMessage(new RunQueriesMessage("", 0)); +                        } catch (InterruptedException e) { +                            System.out.println("Interrupted while triggering queries"); +                        } +                    } +                }; + +        startRecursiveTask(taskSupplier, queriesPeriod); +    } +} |