From 0e9ed16b0a048266b56d1238bd7a8a31e59fdac5 Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sun, 29 Dec 2019 19:12:48 +0100 Subject: Start running queries with Qurnik --- .../cloudatlas/agent/messages/QurnikMessage.java | 25 +++++ .../agent/messages/RunQueriesMessage.java | 9 ++ .../edu/mimuw/cloudatlas/agent/modules/Module.java | 5 + .../edu/mimuw/cloudatlas/agent/modules/Qurnik.java | 102 +++++++++++++++++++++ .../pl/edu/mimuw/cloudatlas/model/ValueUtils.java | 7 ++ .../mimuw/cloudatlas/agent/modules/QurnikTest.java | 86 +++++++++++++++++ .../mimuw/cloudatlas/agent/modules/StanikTest.java | 30 ++---- .../pl/edu/mimuw/cloudatlas/model/TestUtil.java | 16 ++++ 8 files changed, 260 insertions(+), 20 deletions(-) create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Qurnik.java create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java create mode 100644 src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/QurnikTest.java create mode 100644 src/test/java/pl/edu/mimuw/cloudatlas/model/TestUtil.java diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java new file mode 100644 index 0000000..0161a37 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java @@ -0,0 +1,25 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; + +public abstract class QurnikMessage extends AgentMessage { + public enum Type { + RUN_QUERIES + } + + private Type type; + + public QurnikMessage(String messageId, long timestamp, Type type) { + super(messageId, ModuleType.QUERY, timestamp); + this.type = type; + } + + public Type getType() { + return type; + } + + public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType { + module.handleTyped(this); + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java new file mode 100644 index 0000000..35f7819 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java @@ -0,0 +1,9 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; + +public class RunQueriesMessage extends QurnikMessage { + public RunQueriesMessage(String messageId, long timestamp) { + super(messageId, timestamp, Type.RUN_QUERIES); + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java index d0bf083..ba5e1d1 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java @@ -3,6 +3,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import pl.edu.mimuw.cloudatlas.agent.Executor; import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.QurnikMessage; import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage; import pl.edu.mimuw.cloudatlas.agent.messages.RMIMessage; import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage; @@ -32,6 +33,10 @@ public abstract class Module { throw new InvalidMessageType("Got a TimerSchedulerMessage in module " + moduleType.toString()); } + public void handleTyped(QurnikMessage message) throws InterruptedException, InvalidMessageType { + throw new InvalidMessageType("Got a QurnikMessage in module " + moduleType.toString()); + } + public void handleTyped(RMIMessage message) throws InterruptedException, InvalidMessageType { throw new InvalidMessageType("Got an RMIMessage in module " + moduleType.toString()); } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Qurnik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Qurnik.java new file mode 100644 index 0000000..3864aba --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Qurnik.java @@ -0,0 +1,102 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import java.util.List; +import java.util.LinkedList; +import java.util.Map.Entry; + +import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.QurnikMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.RunQueriesMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.StateMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; +import pl.edu.mimuw.cloudatlas.interpreter.Interpreter; +import pl.edu.mimuw.cloudatlas.interpreter.InterpreterException; +import pl.edu.mimuw.cloudatlas.interpreter.QueryResult; +import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.AttributesMap; +import pl.edu.mimuw.cloudatlas.model.PathName; +import pl.edu.mimuw.cloudatlas.model.TypePrimitive; +import pl.edu.mimuw.cloudatlas.model.Value; +import pl.edu.mimuw.cloudatlas.model.ValueQuery; +import pl.edu.mimuw.cloudatlas.model.ValueString; +import pl.edu.mimuw.cloudatlas.model.ValueTime; +import pl.edu.mimuw.cloudatlas.model.ValueUtils; +import pl.edu.mimuw.cloudatlas.model.ZMI; + +public class Qurnik extends Module { + public Qurnik() { + super(ModuleType.QUERY); + } + + @Override + public void handleTyped(QurnikMessage message) throws InterruptedException, InvalidMessageType { + switch (message.getType()) { + case RUN_QUERIES: + handleRunQueries((RunQueriesMessage) message); + break; + default: + throw new InvalidMessageType("This type of message cannot be handled by Qurnik"); + } + } + + @Override + public void handleTyped(ResponseMessage message) throws InterruptedException, InvalidMessageType { + switch (message.getType()) { + case STATE: + runQueriesOnState((StateMessage) message); + break; + default: + throw new InvalidMessageType("This type of message cannot be handled by Qurnik"); + } + } + + private void handleRunQueries(RunQueriesMessage message) throws InterruptedException { + GetStateMessage getStateMessage = new GetStateMessage("", 0, ModuleType.QUERY, 0); + sendMessage(getStateMessage); + } + + private void runQueriesOnState(StateMessage message) throws InterruptedException { + List queries = new LinkedList(); + for (Entry timestampedQuery : message.getQueries().values()) { + queries.add(timestampedQuery.getKey()); + } + executeAllQueries(message.getZMI(), queries, PathName.ROOT); + } + + private void executeAllQueries(ZMI zmi, List queries, PathName currentPath) throws InterruptedException { + if(!zmi.getSons().isEmpty()) { + for(ZMI son : zmi.getSons()) { + Value sonName = son.getAttributes().getOrNull("name"); + if (ValueUtils.valueNonNullOfType(sonName, TypePrimitive.STRING)) { + String sonNameString = ((ValueString) sonName).getValue(); + executeAllQueries(son, queries, currentPath.levelDown(sonNameString)); + } else { + System.out.println("ERROR: zone without a name attribute found while executing queries"); + } + } + + Interpreter interpreter = new Interpreter(zmi); + AttributesMap newAttributes = new AttributesMap(); + for (ValueQuery query : queries) { + try { + List result = interpreter.interpretProgram(query.getQuery()); + for(QueryResult r : result) { + newAttributes.addOrChange(r.getName(), r.getValue()); + } + } catch(InterpreterException exception) { + System.out.println("ERROR: thrown while running interpreter: " + exception.getMessage()); + } + } + + if (!currentPath.toString().equals("/")) { + newAttributes.add("name", new ValueString(currentPath.getSingletonName())); + } + long currentTime = System.currentTimeMillis() / 1000; + newAttributes.add("timestamp", new ValueTime(currentTime)); + + UpdateAttributesMessage message = new UpdateAttributesMessage("", currentTime, currentPath.toString(), newAttributes); + sendMessage(message); + } + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java new file mode 100644 index 0000000..01a45b5 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java @@ -0,0 +1,7 @@ +package pl.edu.mimuw.cloudatlas.model; + +public class ValueUtils { + public static boolean valueNonNullOfType(Value value, Type type) { + return value != null && !value.isNull() && value.getType().isCompatible(type); + } +} diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/QurnikTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/QurnikTest.java new file mode 100644 index 0000000..28f316c --- /dev/null +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/QurnikTest.java @@ -0,0 +1,86 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.RunQueriesMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.StateMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; +import pl.edu.mimuw.cloudatlas.agent.MockExecutor; +import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.AttributesMap; +import pl.edu.mimuw.cloudatlas.model.TestUtil; +import pl.edu.mimuw.cloudatlas.model.ValueInt; +import pl.edu.mimuw.cloudatlas.model.ValueQuery; +import pl.edu.mimuw.cloudatlas.model.ValueString; +import pl.edu.mimuw.cloudatlas.model.ValueTime; +import pl.edu.mimuw.cloudatlas.model.ZMI; + +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.*; + +public class QurnikTest { + private Qurnik qurnik; + private MockExecutor executor; + + @Before + public void setupLocals() { + qurnik = new Qurnik(); + executor = new MockExecutor(qurnik); + } + + @Test + public void runQueriesRequestsState() throws Exception { + RunQueriesMessage message = new RunQueriesMessage("", 0); + qurnik.handleTyped(message); + AgentMessage receivedMessage = (AgentMessage) executor.messagesToPass.take(); + assertEquals(ModuleType.STATE, receivedMessage.getDestinationModule()); + StanikMessage stanikMessage = (StanikMessage) receivedMessage; + assertEquals(StanikMessage.Type.GET_STATE, stanikMessage.getType()); + GetStateMessage getStateMessage = (GetStateMessage) stanikMessage; + assertEquals(ModuleType.QUERY, getStateMessage.getRequestingModule()); + } + + @Test + public void simpleQuery() throws Exception { + ZMI root = new ZMI(); + ZMI son = new ZMI(root); + root.addSon(son); + AttributesMap sonAttributes = new AttributesMap(); + sonAttributes.add("name", new ValueString("son")); + Map> queries = new HashMap(); + queries.put( + new Attribute("&query"), + new SimpleImmutableEntry( + new ValueQuery("SELECT 1 AS one"), + new ValueTime(0l) + ) + ); + StateMessage message = new StateMessage("", ModuleType.QUERY, 0, 0, root, queries); + long timeBefore = System.currentTimeMillis() / 1000; + qurnik.handleTyped(message); + long timeAfter = System.currentTimeMillis() / 1000; + + assertEquals(1, executor.messagesToPass.size()); + AgentMessage receivedMessage = (AgentMessage) executor.messagesToPass.take(); + assertEquals(ModuleType.STATE, receivedMessage.getDestinationModule()); + StanikMessage stanikMessage = (StanikMessage) receivedMessage; + assertEquals(StanikMessage.Type.UPDATE_ATTRIBUTES, stanikMessage.getType()); + UpdateAttributesMessage updateAttributesMessage = (UpdateAttributesMessage) stanikMessage; + assertEquals("/", updateAttributesMessage.getPathName()); + AttributesMap updatedAttributes = updateAttributesMessage.getAttributes(); + assertEquals(2, TestUtil.iterableSize(updatedAttributes)); + assertEquals(new ValueInt(1l), updatedAttributes.getOrNull("one")); + System.out.println(timeBefore); + System.out.println(updatedAttributes.getOrNull("timestamp")); + long timestamp = ((ValueTime) updatedAttributes.getOrNull("timestamp")).getValue(); + assertTrue(timeBefore <= timestamp); + assertTrue(timestamp <= timeAfter); + } +} diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java index aeaf19f..8958472 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java @@ -1,7 +1,5 @@ package pl.edu.mimuw.cloudatlas.agent.modules; -import java.util.Iterator; -import java.util.List; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.HashMap; import java.util.Map; @@ -16,6 +14,7 @@ import pl.edu.mimuw.cloudatlas.agent.messages.UpdateQueriesMessage; import pl.edu.mimuw.cloudatlas.agent.MockExecutor; import pl.edu.mimuw.cloudatlas.model.Attribute; import pl.edu.mimuw.cloudatlas.model.AttributesMap; +import pl.edu.mimuw.cloudatlas.model.TestUtil; import pl.edu.mimuw.cloudatlas.model.Value; import pl.edu.mimuw.cloudatlas.model.ValueInt; import pl.edu.mimuw.cloudatlas.model.ValueQuery; @@ -32,7 +31,7 @@ public class StanikTest { private MockExecutor executor; @Before - public void setupEventBus() { + public void setupLocals() { stanik = new Stanik(); executor = new MockExecutor(stanik); } @@ -50,9 +49,9 @@ public class StanikTest { ZMI zmi = stateMessage.getZMI(); assertNull(zmi.getFather()); assertTrue(zmi.getSons().isEmpty()); - assertEquals(1, iterableSize(zmi.getAttributes())); + assertEquals(1, TestUtil.iterableSize(zmi.getAttributes())); Map> queries = stateMessage.getQueries(); - assertEquals(0, iterableSize(queries.keySet())); + assertEquals(0, TestUtil.iterableSize(queries.keySet())); } @Test @@ -81,7 +80,7 @@ public class StanikTest { UpdateAttributesMessage message = new UpdateAttributesMessage("test_msg", 0, "/", attributes); stanik.handleTyped(message); AttributesMap actualAttributes = stanik.getHierarchy().getAttributes(); - assertEquals(3, iterableSize(actualAttributes)); + assertEquals(3, TestUtil.iterableSize(actualAttributes)); assertEquals(new ValueInt(1337l), actualAttributes.get("foo")); assertEquals(new ValueString("baz"), actualAttributes.get("bar")); assertEquals(new ValueTime("2012/12/21 04:20:00.000"), actualAttributes.getOrNull("timestamp")); @@ -97,7 +96,7 @@ public class StanikTest { UpdateAttributesMessage message = new UpdateAttributesMessage("test_msg", 0, "/new", attributes); stanik.handleTyped(message); AttributesMap actualAttributes = stanik.getHierarchy().findDescendant("/new").getAttributes(); - assertEquals(4, iterableSize(actualAttributes)); + assertEquals(4, TestUtil.iterableSize(actualAttributes)); assertEquals(new ValueInt(1337l), actualAttributes.getOrNull("foo")); assertEquals(new ValueString("baz"), actualAttributes.getOrNull("bar")); assertEquals(new ValueString("new"), actualAttributes.getOrNull("name")); @@ -120,7 +119,7 @@ public class StanikTest { stanik.handleTyped(newMessage); AttributesMap actualAttributes = stanik.getHierarchy().getAttributes(); - assertEquals(2, iterableSize(actualAttributes)); + assertEquals(2, TestUtil.iterableSize(actualAttributes)); assertEquals(new ValueInt(1338l), actualAttributes.getOrNull("foo")); assertEquals(new ValueTime("2012/12/21 04:20:42.000"), actualAttributes.getOrNull("timestamp")); } @@ -140,7 +139,7 @@ public class StanikTest { stanik.handleTyped(newMessage); AttributesMap actualAttributes = stanik.getHierarchy().getAttributes(); - assertEquals(2, iterableSize(actualAttributes)); + assertEquals(2, TestUtil.iterableSize(actualAttributes)); assertEquals(new ValueInt(1337l), actualAttributes.getOrNull("foo")); assertEquals(new ValueTime("2012/12/21 04:20:00.000"), actualAttributes.getOrNull("timestamp")); } @@ -153,7 +152,7 @@ public class StanikTest { stanik.handleTyped(message); HashMap> actualQueries = stanik.getQueries(); - assertEquals(1, iterableSize(actualQueries.keySet())); + assertEquals(1, TestUtil.iterableSize(actualQueries.keySet())); assertTrue(actualQueries.containsKey(new Attribute("&query"))); Entry timestampedQuery = actualQueries.get(new Attribute("&query")); assertEquals(new ValueTime(42l), timestampedQuery.getValue()); @@ -177,7 +176,7 @@ public class StanikTest { stanik.handleTyped(otherMessage); HashMap> actualQueries = stanik.getQueries(); - assertEquals(4, iterableSize(actualQueries.keySet())); + assertEquals(4, TestUtil.iterableSize(actualQueries.keySet())); assertTrue(actualQueries.containsKey(new Attribute("&query1"))); assertTrue(actualQueries.containsKey(new Attribute("&query2"))); assertTrue(actualQueries.containsKey(new Attribute("&query3"))); @@ -199,13 +198,4 @@ public class StanikTest { assertEquals(new ValueTime(43l), timestampedQuery4.getValue()); assertEquals(new ValueQuery("SELECT 1000 AS foo"), timestampedQuery4.getKey()); } - - public int iterableSize(Iterable iterable) { - int count = 0; - for (T attribute : iterable) { - count++; - } - - return count; - } } diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/model/TestUtil.java b/src/test/java/pl/edu/mimuw/cloudatlas/model/TestUtil.java new file mode 100644 index 0000000..ed633d7 --- /dev/null +++ b/src/test/java/pl/edu/mimuw/cloudatlas/model/TestUtil.java @@ -0,0 +1,16 @@ +package pl.edu.mimuw.cloudatlas.model; + +public class TestUtil { + public static boolean valueLower(Value a, Value b) { + return ((ValueBoolean) a.isLowerThan(b)).getValue(); + } + + public static int iterableSize(Iterable iterable) { + int count = 0; + for (T attribute : iterable) { + count++; + } + + return count; + } +} -- cgit v1.2.3