m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/pl/edu/mimuw/cloudatlas/agent
diff options
context:
space:
mode:
authorMarcin Chrzanowski <marcin.j.chrzanowski@gmail.com>2019-12-30 22:16:46 +0100
committerMarcin Chrzanowski <marcin.j.chrzanowski@gmail.com>2019-12-30 22:16:46 +0100
commitca2f1ab4a37f0d590d5c116cd1fa7c341c77fad3 (patch)
treefb3b8ff1629e5480086f43065156625985a2e034 /src/main/java/pl/edu/mimuw/cloudatlas/agent
parentfda20e7aa496926d4f4d78921925025040414d9b (diff)
Implement modular getZoneSet
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java31
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java4
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/NewApiImplementation.java178
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java (renamed from src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RMIMessage.java)15
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java16
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java6
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/RMI.java37
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Remik.java54
8 files changed, 296 insertions, 45 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
index 392e0a1..52bd395 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
@@ -1,22 +1,45 @@
package pl.edu.mimuw.cloudatlas.agent;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import pl.edu.mimuw.cloudatlas.agent.ApiImplementation;
import pl.edu.mimuw.cloudatlas.agent.modules.Module;
import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
import pl.edu.mimuw.cloudatlas.agent.modules.Qurnik;
-import pl.edu.mimuw.cloudatlas.agent.modules.RMI;
+import pl.edu.mimuw.cloudatlas.agent.modules.Remik;
import pl.edu.mimuw.cloudatlas.agent.modules.Stanik;
import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduler;
+import pl.edu.mimuw.cloudatlas.api.Api;
+import pl.edu.mimuw.cloudatlas.interpreter.Main;
+import pl.edu.mimuw.cloudatlas.model.ZMI;
public class Agent {
+ private static EventBus eventBus;
+
+ public static void runRegistry() {
+ try {
+ ZMI root = Main.createTestHierarchy2();
+ ApiImplementation api = new ApiImplementation(root);
+ Api apiStub =
+ (Api) UnicastRemoteObject.exportObject(api, 0);
+ Registry registry = LocateRegistry.getRegistry();
+ registry.rebind("Api", apiStub);
+ System.out.println("Agent: api bound");
+ } catch (Exception e) {
+ System.err.println("Agent registry initialization exception:");
+ e.printStackTrace();
+ }
+ }
public static HashMap<ModuleType, Module> initializeModules() {
HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>();
modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER));
- modules.put(ModuleType.RMI, new RMI(ModuleType.RMI));
+ modules.put(ModuleType.RMI, new Remik());
modules.put(ModuleType.STATE, new Stanik());
modules.put(ModuleType.QUERY, new Qurnik());
// TODO add modules as we implement them
@@ -61,7 +84,8 @@ public class Agent {
HashMap<ModuleType, Executor> executors = initializeExecutors(modules);
ArrayList<Thread> executorThreads = initializeExecutorThreads(executors);
- Thread eventBusThread = new Thread(new EventBus(executors));
+ eventBus = new EventBus(executors);
+ Thread eventBusThread = new Thread(eventBus);
System.out.println("Initializing event bus");
eventBusThread.start();
@@ -71,5 +95,6 @@ public class Agent {
public static void main(String[] args) {
runModulesAsThreads();
+ runRegistry();
}
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
index 5b09253..a6d3b2d 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/EventBus.java
@@ -20,6 +20,10 @@ public class EventBus implements Runnable {
}
}
+ // Allows for testing with a mock EventBus
+ protected EventBus() {
+ }
+
EventBus(HashMap<ModuleType, Executor> executors) {
this.executors = executors;
setEventBusReference();
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/NewApiImplementation.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/NewApiImplementation.java
new file mode 100644
index 0000000..4aa5148
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/NewApiImplementation.java
@@ -0,0 +1,178 @@
+package pl.edu.mimuw.cloudatlas.agent;
+
+import java.io.PrintStream;
+
+import java.rmi.RemoteException;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.List;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+
+import pl.edu.mimuw.cloudatlas.agent.messages.RequestStateMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.StateMessage;
+import pl.edu.mimuw.cloudatlas.interpreter.Interpreter;
+import pl.edu.mimuw.cloudatlas.interpreter.InterpreterException;
+import pl.edu.mimuw.cloudatlas.interpreter.Main;
+import pl.edu.mimuw.cloudatlas.interpreter.QueryResult;
+import pl.edu.mimuw.cloudatlas.model.Attribute;
+import pl.edu.mimuw.cloudatlas.model.AttributesMap;
+import pl.edu.mimuw.cloudatlas.model.PathName;
+import pl.edu.mimuw.cloudatlas.model.ValueContact;
+import pl.edu.mimuw.cloudatlas.model.Value;
+import pl.edu.mimuw.cloudatlas.model.ValueQuery;
+import pl.edu.mimuw.cloudatlas.model.ValueSet;
+import pl.edu.mimuw.cloudatlas.model.ValueNull;
+import pl.edu.mimuw.cloudatlas.model.Type;
+import pl.edu.mimuw.cloudatlas.model.TypePrimitive;
+import pl.edu.mimuw.cloudatlas.model.ZMI;
+import pl.edu.mimuw.cloudatlas.api.Api;
+
+public class NewApiImplementation implements Api {
+ private EventBus eventBus;
+
+ public NewApiImplementation(EventBus eventBus) {
+ this.eventBus = eventBus;
+ }
+
+ public Set<String> getZoneSet() throws RemoteException {
+ CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture();
+ RequestStateMessage message = new RequestStateMessage("", 0, responseFuture);
+ try {
+ eventBus.addMessage(message);
+ ResponseMessage response = responseFuture.get();
+
+ if (response.getType() == ResponseMessage.Type.STATE) {
+ StateMessage stateMessage = (StateMessage) response;
+ Set<String> zones = new HashSet<String>();
+ collectZoneNames(stateMessage.getZMI(), zones);
+ return zones;
+ } else {
+ System.out.println("ERROR: getZoneSet didn't receive a StateMessage");
+ throw new Exception("Failed to retrieve zone set");
+ }
+ } catch (Exception e) {
+ System.out.println("ERROR: exception caught in getZoneSet");
+ throw new RemoteException(e.getMessage());
+ }
+ }
+
+ private void collectZoneNames(ZMI zone, Set<String> names) {
+ names.add(zone.getPathName().toString());
+ for (ZMI son : zone.getSons()) {
+ collectZoneNames(son, names);
+ }
+ }
+
+ public AttributesMap getZoneAttributeValues(String zoneName) throws RemoteException {
+ /*
+ try {
+ ZMI zmi = root.findDescendant(new PathName(zoneName));
+ return zmi.getAttributes();
+ } catch (ZMI.NoSuchZoneException e) {
+ throw new RemoteException("Zone not found", e);
+ }
+ */
+
+ // placeholder
+ return new AttributesMap();
+ }
+
+ public void installQuery(String name, String queryCode) throws RemoteException {
+ /*
+ Pattern queryNamePattern = Pattern.compile("&[a-zA-Z][\\w_]*");
+ Matcher matcher = queryNamePattern.matcher(name);
+ if (!matcher.matches()) {
+ throw new RemoteException("Invalid query identifier");
+ }
+ try {
+ ValueQuery query = new ValueQuery(queryCode);
+ Attribute attributeName = new Attribute(name);
+ installQueryInHierarchy(root, attributeName, query);
+ executeAllQueries(root);
+ } catch (Exception e) {
+ throw new RemoteException("Failed to install query", e);
+ }
+ */
+ }
+
+ private void installQueryInHierarchy(ZMI zmi, Attribute queryName, ValueQuery query) {
+ /*
+ if (!zmi.getSons().isEmpty()) {
+ zmi.getAttributes().addOrChange(queryName, query);
+ for (ZMI son : zmi.getSons()) {
+ installQueryInHierarchy(son, queryName, query);
+ }
+ }
+ */
+ }
+
+ public void uninstallQuery(String queryName) throws RemoteException {
+ // uninstallQueryInHierarchy(root, new Attribute(queryName));
+ }
+
+ private void uninstallQueryInHierarchy(ZMI zmi, Attribute queryName) {
+ /*
+ if (!zmi.getSons().isEmpty()) {
+ zmi.getAttributes().remove(queryName);
+ for (ZMI son : zmi.getSons()) {
+ uninstallQueryInHierarchy(son, queryName);
+ }
+ }
+ */
+ }
+
+ public void setAttributeValue(String zoneName, String attributeName, Value value) throws RemoteException {
+ /*
+ try {
+ ZMI zmi = root.findDescendant(new PathName(zoneName));
+ zmi.getAttributes().addOrChange(new Attribute(attributeName), value);
+ executeAllQueries(root);
+ } catch (ZMI.NoSuchZoneException e) {
+ throw new RemoteException("Zone not found", e);
+ }
+ */
+ }
+
+ private void executeAllQueries(ZMI zmi) {
+ /*
+ if(!zmi.getSons().isEmpty()) {
+ for(ZMI son : zmi.getSons()) {
+ executeAllQueries(son);
+ }
+
+ Interpreter interpreter = new Interpreter(zmi);
+ for (ValueQuery query : getQueries(zmi)) {
+ try {
+ List<QueryResult> result = interpreter.interpretProgram(query.getQuery());
+ for(QueryResult r : result) {
+ zmi.getAttributes().addOrChange(r.getName(), r.getValue());
+ }
+ } catch(InterpreterException exception) {}
+ }
+ }
+ */
+ }
+
+ private Set<ValueQuery> getQueries(ZMI zmi) {
+ Set<ValueQuery> querySet = new HashSet<ValueQuery>();
+ /*
+ for (Map.Entry<Attribute, Value> attribute : zmi.getAttributes()) {
+ if (attribute.getValue().getType().getPrimaryType() == Type.PrimaryType.QUERY) {
+ querySet.add((ValueQuery) attribute.getValue());
+ }
+ }
+ */
+
+ return querySet;
+ }
+
+ public void setFallbackContacts(Set<ValueContact> contacts) throws RemoteException {
+ // this.contacts = contacts;
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RMIMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java
index a8e3afb..b0300cb 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RMIMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java
@@ -3,9 +3,20 @@ package pl.edu.mimuw.cloudatlas.agent.messages;
import pl.edu.mimuw.cloudatlas.agent.modules.Module;
import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
-public class RMIMessage extends AgentMessage {
- public RMIMessage(String messageId, long timestamp) {
+public abstract class RemikMessage extends AgentMessage {
+ public enum Type {
+ REQUEST_STATE
+ }
+
+ private Type type;
+
+ public RemikMessage(String messageId, long timestamp, Type type) {
super(messageId, ModuleType.RMI, timestamp);
+ this.type = type;
+ }
+
+ public Type getType() {
+ return type;
}
public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java
new file mode 100644
index 0000000..698aac7
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java
@@ -0,0 +1,16 @@
+package pl.edu.mimuw.cloudatlas.agent.messages;
+
+import java.util.concurrent.CompletableFuture;
+
+public class RequestStateMessage extends RemikMessage {
+ CompletableFuture<ResponseMessage> responseFuture;
+
+ public RequestStateMessage(String messageId, long timestamp, CompletableFuture<ResponseMessage> responseFuture) {
+ super(messageId, timestamp, Type.REQUEST_STATE);
+ this.responseFuture = responseFuture;
+ }
+
+ public CompletableFuture<ResponseMessage> getFuture() {
+ return responseFuture;
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
index ba5e1d1..0a934cb 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
@@ -4,8 +4,8 @@ import pl.edu.mimuw.cloudatlas.agent.Executor;
import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.QurnikMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.RemikMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.RMIMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage;
/*
@@ -37,8 +37,8 @@ public abstract class Module {
throw new InvalidMessageType("Got a QurnikMessage in module " + moduleType.toString());
}
- public void handleTyped(RMIMessage message) throws InterruptedException, InvalidMessageType {
- throw new InvalidMessageType("Got an RMIMessage in module " + moduleType.toString());
+ public void handleTyped(RemikMessage message) throws InterruptedException, InvalidMessageType {
+ throw new InvalidMessageType("Got a RemikMessage in module " + moduleType.toString());
}
public void handleTyped(StanikMessage message) throws InterruptedException, InvalidMessageType {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/RMI.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/RMI.java
deleted file mode 100644
index 1a86fc7..0000000
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/RMI.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package pl.edu.mimuw.cloudatlas.agent.modules;
-
-import pl.edu.mimuw.cloudatlas.agent.ApiImplementation;
-import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.RMIMessage;
-import pl.edu.mimuw.cloudatlas.api.Api;
-import pl.edu.mimuw.cloudatlas.interpreter.Main;
-import pl.edu.mimuw.cloudatlas.model.ZMI;
-
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.rmi.server.UnicastRemoteObject;
-
-public class RMI extends Module {
- public RMI(ModuleType moduleType) {
- super(moduleType);
- runRegistry();
- }
-
- public void runRegistry() {
- try {
- ZMI root = Main.createTestHierarchy2();
- ApiImplementation api = new ApiImplementation(root);
- Api apiStub =
- (Api) UnicastRemoteObject.exportObject(api, 0);
- Registry registry = LocateRegistry.getRegistry();
- registry.rebind("Api", apiStub);
- System.out.println("Agent: api bound");
- } catch (Exception e) {
- System.err.println("Agent registry initialization exception:");
- e.printStackTrace();
- }
- }
-
- public void handleTyped(RMIMessage event) throws InterruptedException {
- }
-}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Remik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Remik.java
new file mode 100644
index 0000000..9645d71
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Remik.java
@@ -0,0 +1,54 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.HashMap;
+import java.util.Map;
+
+
+import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.RemikMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.RequestStateMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage;
+
+/*
+ * Remik is a cute little module that allows RMI functions to interface with
+ * agent's asynchronous modules.
+ */
+public class Remik extends Module {
+ private Map<Long, CompletableFuture<ResponseMessage>> awaitingRequests;
+ private long nextRequestId = 0;
+
+ public Remik() {
+ super(ModuleType.RMI);
+ awaitingRequests = new HashMap();
+ }
+
+ public void handleTyped(RemikMessage message) throws InvalidMessageType, InterruptedException {
+ switch (message.getType()) {
+ case REQUEST_STATE:
+ handleRequestState((RequestStateMessage) message);
+ break;
+ default:
+ throw new InvalidMessageType("This type of message cannot be handled by Remik");
+ }
+ }
+
+ public void handleTyped(ResponseMessage message) {
+ CompletableFuture<ResponseMessage> responseFuture = awaitingRequests.get(message.getRequestId());
+
+ if (responseFuture == null) {
+ System.out.println("ERROR: Remik got response for nonexistent/finished request");
+ } else {
+ responseFuture.complete(message);
+ }
+ }
+
+ private void handleRequestState(RequestStateMessage message) throws InterruptedException {
+ awaitingRequests.put(nextRequestId, message.getFuture());
+
+ GetStateMessage getStateMessage = new GetStateMessage("", 0, ModuleType.RMI, nextRequestId);
+ nextRequestId++;
+
+ sendMessage(getStateMessage);
+ }
+}