m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/pl/edu/mimuw/cloudatlas/agent
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java22
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java31
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java27
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java29
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java23
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java21
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java10
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/ModuleType.java4
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java167
9 files changed, 333 insertions, 1 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java
new file mode 100644
index 0000000..63392e8
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java
@@ -0,0 +1,22 @@
+package pl.edu.mimuw.cloudatlas.agent.messages;
+
+import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
+
+public class GetStateMessage extends StanikMessage {
+ private ModuleType requestingModule;
+ private long requestId;
+
+ public GetStateMessage(String messageId, long timestamp, ModuleType requestingModule, long requestId) {
+ super(messageId, timestamp, Type.GET_STATE);
+ this.requestingModule = requestingModule;
+ this.requestId = requestId;
+ }
+
+ public ModuleType getRequestingModule() {
+ return requestingModule;
+ }
+
+ public long getRequestId() {
+ return requestId;
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java
new file mode 100644
index 0000000..02b3337
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java
@@ -0,0 +1,31 @@
+package pl.edu.mimuw.cloudatlas.agent.messages;
+
+import pl.edu.mimuw.cloudatlas.agent.modules.Module;
+import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
+
+public abstract class ResponseMessage extends AgentMessage {
+ public enum Type {
+ STATE
+ }
+
+ Type type;
+ long requestId;
+
+ public ResponseMessage(String messageId, ModuleType destinationModule, long timestamp, Type type, long requestId) {
+ super(messageId, destinationModule, timestamp);
+ this.type = type;
+ this.requestId = requestId;
+ }
+
+ public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType {
+ module.handleTyped(this);
+ }
+
+ public long getRequestId() {
+ return requestId;
+ }
+
+ public Type getType() {
+ return type;
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java
new file mode 100644
index 0000000..b23f6e0
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java
@@ -0,0 +1,27 @@
+package pl.edu.mimuw.cloudatlas.agent.messages;
+
+import pl.edu.mimuw.cloudatlas.agent.modules.Module;
+import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
+
+public abstract class StanikMessage extends AgentMessage {
+ public enum Type {
+ GET_STATE,
+ UPDATE_ATTRIBUTES,
+ UPDATE_QUERIES
+ }
+
+ private Type type;
+
+ public StanikMessage(String messageId, long timestamp, Type type) {
+ super(messageId, ModuleType.STATE, timestamp);
+ this.type = type;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType {
+ module.handleTyped(this);
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java
new file mode 100644
index 0000000..f7f490b
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java
@@ -0,0 +1,29 @@
+package pl.edu.mimuw.cloudatlas.agent.messages;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
+import pl.edu.mimuw.cloudatlas.model.Attribute;
+import pl.edu.mimuw.cloudatlas.model.ValueQuery;
+import pl.edu.mimuw.cloudatlas.model.ValueTime;
+import pl.edu.mimuw.cloudatlas.model.ZMI;
+
+public class StateMessage extends ResponseMessage {
+ private ZMI zmi;
+ private Map<Attribute, Entry<ValueQuery, ValueTime>> queries;
+
+ public StateMessage(String messageId, ModuleType destinationModule, long timestamp, long requestId, ZMI zmi, Map<Attribute, Entry<ValueQuery, ValueTime>> queries) {
+ super(messageId, destinationModule, timestamp, Type.STATE, requestId);
+ this.zmi = zmi;
+ this.queries = queries;
+ }
+
+ public ZMI getZMI() {
+ return zmi;
+ }
+
+ public Map<Attribute, Entry<ValueQuery, ValueTime>> getQueries() {
+ return queries;
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java
new file mode 100644
index 0000000..7e41631
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java
@@ -0,0 +1,23 @@
+package pl.edu.mimuw.cloudatlas.agent.messages;
+
+import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
+import pl.edu.mimuw.cloudatlas.model.AttributesMap;
+
+public class UpdateAttributesMessage extends StanikMessage {
+ private String pathName;
+ private AttributesMap attributes;
+
+ public UpdateAttributesMessage(String messageId, long timestamp, String pathName, AttributesMap attributes) {
+ super(messageId, timestamp, Type.UPDATE_ATTRIBUTES);
+ this.pathName = pathName;
+ this.attributes = attributes;
+ }
+
+ public String getPathName() {
+ return pathName;
+ }
+
+ public AttributesMap getAttributes() {
+ return attributes;
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java
new file mode 100644
index 0000000..58ad55a
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java
@@ -0,0 +1,21 @@
+package pl.edu.mimuw.cloudatlas.agent.messages;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import pl.edu.mimuw.cloudatlas.model.Attribute;
+import pl.edu.mimuw.cloudatlas.model.ValueQuery;
+import pl.edu.mimuw.cloudatlas.model.ValueTime;
+
+public class UpdateQueriesMessage extends StanikMessage {
+ private Map<Attribute, Entry<ValueQuery, ValueTime>> queries;
+
+ public UpdateQueriesMessage(String messageId, long timestamp, Map<Attribute, Entry<ValueQuery, ValueTime>> queries) {
+ super(messageId, timestamp, Type.UPDATE_QUERIES);
+ this.queries = queries;
+ }
+
+ public Map<Attribute, Entry<ValueQuery, ValueTime>> getQueries() {
+ return queries;
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
index a50a95f..d0bf083 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
@@ -3,7 +3,9 @@ package pl.edu.mimuw.cloudatlas.agent.modules;
import pl.edu.mimuw.cloudatlas.agent.Executor;
import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.RMIMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage;
/*
* A Module is a (potentially stateful) event handler.
@@ -34,6 +36,14 @@ public abstract class Module {
throw new InvalidMessageType("Got an RMIMessage in module " + moduleType.toString());
}
+ public void handleTyped(StanikMessage message) throws InterruptedException, InvalidMessageType {
+ throw new InvalidMessageType("Got a StanikMessage in module " + moduleType.toString());
+ }
+
+ public void handleTyped(ResponseMessage message) throws InterruptedException, InvalidMessageType {
+ throw new InvalidMessageType("Got a ResponseMessage in module " + moduleType.toString());
+ }
+
public void setExecutor(Executor executor) {
this.executor = executor;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/ModuleType.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/ModuleType.java
index ff4a92e..d221f06 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/ModuleType.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/ModuleType.java
@@ -8,5 +8,7 @@ public enum ModuleType {
GOSSIP_IN,
GOSSIP_OUT,
STATE,
- QUERY
+ QUERY,
+ // for testing
+ TEST
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
new file mode 100644
index 0000000..e8721b3
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
@@ -0,0 +1,167 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import java.util.Iterator;
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.StateMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.UpdateQueriesMessage;
+import pl.edu.mimuw.cloudatlas.model.Attribute;
+import pl.edu.mimuw.cloudatlas.model.AttributesMap;
+import pl.edu.mimuw.cloudatlas.model.PathName;
+import pl.edu.mimuw.cloudatlas.model.Type;
+import pl.edu.mimuw.cloudatlas.model.TypePrimitive;
+import pl.edu.mimuw.cloudatlas.model.Value;
+import pl.edu.mimuw.cloudatlas.model.ValueBoolean;
+import pl.edu.mimuw.cloudatlas.model.ValueQuery;
+import pl.edu.mimuw.cloudatlas.model.ValueString;
+import pl.edu.mimuw.cloudatlas.model.ValueTime;
+import pl.edu.mimuw.cloudatlas.model.ZMI;
+
+public class Stanik extends Module {
+ private class InvalidUpdateAttributesMessage extends Exception {
+ public InvalidUpdateAttributesMessage(String message) {
+ super(message);
+ }
+ }
+
+ private ZMI hierarchy;
+ private HashMap<Attribute, Entry<ValueQuery, ValueTime>> queries;
+
+ public Stanik() {
+ super(ModuleType.STATE);
+ hierarchy = new ZMI();
+ queries = new HashMap<Attribute, Entry<ValueQuery, ValueTime>>();
+ hierarchy.getAttributes().add("timestamp", new ValueTime(0l));
+ }
+
+ public void handleTyped(StanikMessage message) throws InterruptedException, InvalidMessageType {
+ switch(message.getType()) {
+ case GET_STATE:
+ handleGetState((GetStateMessage) message);
+ break;
+ case UPDATE_ATTRIBUTES:
+ handleUpdateAttributes((UpdateAttributesMessage) message);
+ break;
+ case UPDATE_QUERIES:
+ handleUpdateQueries((UpdateQueriesMessage) message);
+ break;
+ default:
+ throw new InvalidMessageType("This type of message cannot be handled by Stanik");
+ }
+ }
+
+ public void handleGetState(GetStateMessage message) throws InterruptedException {
+ StateMessage response = new StateMessage("", message.getRequestingModule(), 0, message.getRequestId(), hierarchy.clone(), (HashMap<Attribute, Entry<ValueQuery, ValueTime>>) queries.clone());
+ sendMessage(response);
+ }
+
+ public void handleUpdateAttributes(UpdateAttributesMessage message) {
+ try {
+ validateUpdateAttributesMessage(message);
+ addMissingZones(new PathName(message.getPathName()));
+ ZMI zone = hierarchy.findDescendant(message.getPathName());
+ AttributesMap attributes = zone.getAttributes();
+ if (valueLower(attributes.get("timestamp"), message.getAttributes().get("timestamp"))) {
+ transferAttributes(message.getAttributes(), attributes);
+ } else {
+ System.out.println("DEBUG: not applying update with older attributes");
+ }
+ } catch (InvalidUpdateAttributesMessage e) {
+ System.out.println("ERROR: invalid UpdateAttributesMessage " + e.getMessage());
+ } catch (ZMI.NoSuchZoneException e) {
+ System.out.println("ERROR: zone should exist after being added");
+ }
+ }
+
+ public void handleUpdateQueries(UpdateQueriesMessage message) {
+ for (Entry<Attribute, Entry<ValueQuery, ValueTime>> entry : message.getQueries().entrySet()) {
+ Attribute attribute = entry.getKey();
+ ValueTime timestamp = entry.getValue().getValue();
+ Entry<ValueQuery, ValueTime> currentTimestampedQuery = queries.get(attribute);
+ if (currentTimestampedQuery == null || valueLower(currentTimestampedQuery.getValue(), timestamp)) {
+ queries.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ private boolean valueLower(Value a, Value b) {
+ return ((ValueBoolean) a.isLowerThan(b)).getValue();
+ }
+
+ private void validateUpdateAttributesMessage(UpdateAttributesMessage message) throws InvalidUpdateAttributesMessage {
+ validateZoneName(message);
+ validateHasTimeStamp(message);
+ }
+
+ private void validateZoneName(UpdateAttributesMessage message) throws InvalidUpdateAttributesMessage {
+ Value name = message.getAttributes().getOrNull("name");
+ if (message.getPathName().equals("/")) {
+ if (name != null && !name.isNull()) {
+ throw new InvalidUpdateAttributesMessage("The root zone should have a null name");
+ }
+ } else {
+ if (valueNonNullOfType(name, TypePrimitive.STRING)) {
+ ValueString nameString = (ValueString) name;
+ String expectedName = (new PathName(message.getPathName())).getSingletonName();
+ if (!nameString.getValue().equals(expectedName)) {
+ throw new InvalidUpdateAttributesMessage("The zone's name attribute should match its path name");
+ }
+ } else {
+ throw new InvalidUpdateAttributesMessage("Zone attributes should have a name attribute of type String");
+ }
+ }
+ }
+
+ private void validateHasTimeStamp(UpdateAttributesMessage message) throws InvalidUpdateAttributesMessage {
+ if (!valueNonNullOfType(message.getAttributes().getOrNull("timestamp"), TypePrimitive.TIME)) {
+ throw new InvalidUpdateAttributesMessage("Zone attriutes should have a timestamp attribute of type Time");
+ }
+ }
+
+ private boolean valueNonNullOfType(Value value, Type type) {
+ return value != null && !value.isNull() && value.getType().isCompatible(type);
+ }
+
+ private void transferAttributes(AttributesMap fromAttributes, AttributesMap toAttributes) {
+ Iterator<Entry<Attribute, Value>> iterator = toAttributes.iterator();
+ while (iterator.hasNext()) {
+ Entry<Attribute, Value> entry = iterator.next();
+ Attribute attribute = entry.getKey();
+ Value newValue = fromAttributes.getOrNull(attribute);
+ if (newValue == null) {
+ iterator.remove();
+ }
+ }
+ for (Entry<Attribute, Value> entry : fromAttributes) {
+ toAttributes.addOrChange(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private void addMissingZones(PathName path) {
+ try {
+ if (!hierarchy.descendantExists(path)) {
+ addMissingZones(path.levelUp());
+ ZMI parent = hierarchy.findDescendant(path.levelUp());
+ ZMI newSon = new ZMI(parent);
+ newSon.getAttributes().add("name", new ValueString(path.getSingletonName()));
+ newSon.getAttributes().add("timestamp", new ValueTime(0l));
+ parent.addSon(newSon);
+ }
+ } catch (ZMI.NoSuchZoneException e) {
+ System.out.println("ERROR: zone should exist after being added");
+ }
+ }
+
+ public ZMI getHierarchy() {
+ return hierarchy;
+ }
+
+ public HashMap<Attribute, Entry<ValueQuery, ValueTime>> getQueries() {
+ return queries;
+ }
+}