package pl.edu.mimuw.cloudatlas.agent; import java.net.Inet4Address; import java.net.InetAddress; import java.net.UnknownHostException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; import java.rmi.server.UnicastRemoteObject; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.function.Supplier; import pl.edu.mimuw.cloudatlas.agent.NewApiImplementation; import pl.edu.mimuw.cloudatlas.agent.messages.RunQueriesMessage; import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; import pl.edu.mimuw.cloudatlas.agent.modules.*; import pl.edu.mimuw.cloudatlas.agent.modules.Module; import pl.edu.mimuw.cloudatlas.api.Api; import pl.edu.mimuw.cloudatlas.interpreter.Main; import pl.edu.mimuw.cloudatlas.model.PathName; import pl.edu.mimuw.cloudatlas.model.ZMI; public class Agent { private static EventBus eventBus; public static void runRegistry() { try { NewApiImplementation api = new NewApiImplementation(eventBus); Api apiStub = (Api) UnicastRemoteObject.exportObject(api, 0); Registry registry = LocateRegistry.getRegistry(); registry.rebind("Api", apiStub); System.out.println("Agent: api bound"); } catch (Exception e) { System.err.println("Agent registry initialization exception:"); e.printStackTrace(); } } public static HashMap initializeModules() { HashMap modules = new HashMap(); modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER)); modules.put(ModuleType.RMI, new Remik()); Long freshnessPeriod = new Long(System.getProperty("freshness_period")); modules.put(ModuleType.STATE, new Stanik(freshnessPeriod)); modules.put(ModuleType.QUERY, new Qurnik()); try { modules.put(ModuleType.UDP, new UDUP(InetAddress.getByName("127.0.0.1"), 5988, 5000, 20000)); } catch (UnknownHostException e) { e.printStackTrace(); } // TODO add modules as we implement them return modules; } public static HashMap initializeExecutors( HashMap modules) { HashMap executors = new HashMap(); for (Map.Entry moduleEntry : modules.entrySet()) { Module module = moduleEntry.getValue(); Executor executor = new Executor(module); executors.put(moduleEntry.getKey(), executor); } return executors; } public static ArrayList initializeExecutorThreads(HashMap executors) { ArrayList executorThreads = new ArrayList(); for (Map.Entry executorEntry : executors.entrySet()) { Thread thread = new Thread(executorEntry.getValue()); thread.setDaemon(true); System.out.println("Initializing executor " + executorEntry.getKey()); thread.start(); executorThreads.add(thread); } return executorThreads; } public static void closeExecutors(ArrayList executorThreads) { for (Thread executorThread : executorThreads) { executorThread.interrupt(); } } public static void runModulesAsThreads() { HashMap modules = initializeModules(); HashMap executors = initializeExecutors(modules); ArrayList executorThreads = initializeExecutorThreads(executors); eventBus = new EventBus(executors); Thread eventBusThread = new Thread(eventBus); System.out.println("Initializing event bus"); eventBusThread.start(); } private static void initZones() { try { ZMI root = Main.createTestHierarchy2(); addZoneAndChildren(root, new PathName("")); System.out.println("Initialized with test hierarchy"); } catch (Exception e) { System.out.println("ERROR: failed to create test hierarchy"); } } private static void startQueries(long queriesPeriod) { Supplier taskSupplier = () -> new TimerScheduledTask() { public void run() { try { sendMessage(new RunQueriesMessage("", 0)); } catch (InterruptedException e) { System.out.println("Interrupted while triggering queries"); } } }; TimerScheduledTask timerTask = new RecursiveScheduledTask(queriesPeriod, taskSupplier); try { eventBus.addMessage(new TimerSchedulerMessage("", 0, "", queriesPeriod, 0, timerTask)); } catch (InterruptedException e) { System.out.println("Interrupted while starting queries"); } } private static void addZoneAndChildren(ZMI zmi, PathName pathName) { try { UpdateAttributesMessage message = new UpdateAttributesMessage("", 0, pathName.toString(), zmi.getAttributes()); eventBus.addMessage(message); for (ZMI son : zmi.getSons()) { addZoneAndChildren(son, pathName.levelDown(son.getAttributes().getOrNull("name").toString())); } } catch (Exception e) { System.out.println("ERROR: failed to add zone"); } } public static void main(String[] args) { runModulesAsThreads(); runRegistry(); initZones(); // TODO: make query period confiurable with config file and from tests startQueries(100l); } }