From c3121bfa6c00682173a7f1fba9d4c524b8ef517e Mon Sep 17 00:00:00 2001 From: Marcin Chrzanowski Date: Sun, 29 Dec 2019 16:49:43 +0100 Subject: Handle queries in state --- .../cloudatlas/agent/messages/StanikMessage.java | 3 +- .../agent/messages/UpdateQueriesMessage.java | 21 ++++++++ .../edu/mimuw/cloudatlas/agent/modules/Stanik.java | 15 ++++++ .../mimuw/cloudatlas/agent/modules/StanikTest.java | 58 ++++++++++++++++++++++ 4 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java index da35f2d..b23f6e0 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java @@ -6,7 +6,8 @@ import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; public abstract class StanikMessage extends AgentMessage { public enum Type { GET_STATE, - UPDATE_ATTRIBUTES + UPDATE_ATTRIBUTES, + UPDATE_QUERIES } private Type type; diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java new file mode 100644 index 0000000..58ad55a --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java @@ -0,0 +1,21 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import java.util.Map; +import java.util.Map.Entry; + +import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.ValueQuery; +import pl.edu.mimuw.cloudatlas.model.ValueTime; + +public class UpdateQueriesMessage extends StanikMessage { + private Map> queries; + + public UpdateQueriesMessage(String messageId, long timestamp, Map> queries) { + super(messageId, timestamp, Type.UPDATE_QUERIES); + this.queries = queries; + } + + public Map> getQueries() { + return queries; + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java index a31f265..e8721b3 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java @@ -9,6 +9,7 @@ import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage; import pl.edu.mimuw.cloudatlas.agent.messages.StateMessage; import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage; import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UpdateQueriesMessage; import pl.edu.mimuw.cloudatlas.model.Attribute; import pl.edu.mimuw.cloudatlas.model.AttributesMap; import pl.edu.mimuw.cloudatlas.model.PathName; @@ -46,6 +47,9 @@ public class Stanik extends Module { case UPDATE_ATTRIBUTES: handleUpdateAttributes((UpdateAttributesMessage) message); break; + case UPDATE_QUERIES: + handleUpdateQueries((UpdateQueriesMessage) message); + break; default: throw new InvalidMessageType("This type of message cannot be handled by Stanik"); } @@ -74,6 +78,17 @@ public class Stanik extends Module { } } + public void handleUpdateQueries(UpdateQueriesMessage message) { + for (Entry> entry : message.getQueries().entrySet()) { + Attribute attribute = entry.getKey(); + ValueTime timestamp = entry.getValue().getValue(); + Entry currentTimestampedQuery = queries.get(attribute); + if (currentTimestampedQuery == null || valueLower(currentTimestampedQuery.getValue(), timestamp)) { + queries.put(entry.getKey(), entry.getValue()); + } + } + } + private boolean valueLower(Value a, Value b) { return ((ValueBoolean) a.isLowerThan(b)).getValue(); } diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java index dd44e8f..aeaf19f 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/modules/StanikTest.java @@ -2,6 +2,8 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import java.util.Iterator; import java.util.List; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -10,6 +12,7 @@ import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage; import pl.edu.mimuw.cloudatlas.agent.messages.StateMessage; import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage; import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UpdateQueriesMessage; import pl.edu.mimuw.cloudatlas.agent.MockExecutor; import pl.edu.mimuw.cloudatlas.model.Attribute; import pl.edu.mimuw.cloudatlas.model.AttributesMap; @@ -142,6 +145,61 @@ public class StanikTest { assertEquals(new ValueTime("2012/12/21 04:20:00.000"), actualAttributes.getOrNull("timestamp")); } + @Test + public void addQuery() throws Exception { + HashMap> queries = new HashMap>(); + queries.put(new Attribute("&query"), new SimpleImmutableEntry(new ValueQuery("SELECT 1 AS one"), new ValueTime(42l))); + UpdateQueriesMessage message = new UpdateQueriesMessage("test_msg", 0, queries); + stanik.handleTyped(message); + + HashMap> actualQueries = stanik.getQueries(); + assertEquals(1, iterableSize(actualQueries.keySet())); + assertTrue(actualQueries.containsKey(new Attribute("&query"))); + Entry timestampedQuery = actualQueries.get(new Attribute("&query")); + assertEquals(new ValueTime(42l), timestampedQuery.getValue()); + assertEquals(new ValueQuery("SELECT 1 AS one"), timestampedQuery.getKey()); + } + + @Test + public void updateQueries() throws Exception { + HashMap> queries = new HashMap>(); + queries.put(new Attribute("&query1"), new SimpleImmutableEntry(new ValueQuery("SELECT 1 AS one"), new ValueTime(42l))); + queries.put(new Attribute("&query3"), new SimpleImmutableEntry(new ValueQuery("SELECT 23 AS x"), new ValueTime(43l))); + queries.put(new Attribute("&query4"), new SimpleImmutableEntry(new ValueQuery("SELECT 1000 AS foo"), new ValueTime(43l))); + UpdateQueriesMessage message = new UpdateQueriesMessage("test_msg", 0, queries); + stanik.handleTyped(message); + + HashMap> otherQueries = new HashMap>(); + otherQueries.put(new Attribute("&query1"), new SimpleImmutableEntry(new ValueQuery("SELECT 2 AS one"), new ValueTime(41l))); + otherQueries.put(new Attribute("&query2"), new SimpleImmutableEntry(new ValueQuery("SELECT 42 AS answer"), new ValueTime(39l))); + otherQueries.put(new Attribute("&query3"), new SimpleImmutableEntry(new ValueQuery("SELECT 17 AS y"), new ValueTime(44l))); + UpdateQueriesMessage otherMessage = new UpdateQueriesMessage("test_msg", 0, otherQueries); + stanik.handleTyped(otherMessage); + + HashMap> actualQueries = stanik.getQueries(); + assertEquals(4, iterableSize(actualQueries.keySet())); + assertTrue(actualQueries.containsKey(new Attribute("&query1"))); + assertTrue(actualQueries.containsKey(new Attribute("&query2"))); + assertTrue(actualQueries.containsKey(new Attribute("&query3"))); + assertTrue(actualQueries.containsKey(new Attribute("&query4"))); + + Entry timestampedQuery1 = actualQueries.get(new Attribute("&query1")); + assertEquals(new ValueTime(42l), timestampedQuery1.getValue()); + assertEquals(new ValueQuery("SELECT 1 AS one"), timestampedQuery1.getKey()); + + Entry timestampedQuery2 = actualQueries.get(new Attribute("&query2")); + assertEquals(new ValueTime(39l), timestampedQuery2.getValue()); + assertEquals(new ValueQuery("SELECT 42 AS answer"), timestampedQuery2.getKey()); + + Entry timestampedQuery3 = actualQueries.get(new Attribute("&query3")); + assertEquals(new ValueTime(44l), timestampedQuery3.getValue()); + assertEquals(new ValueQuery("SELECT 17 AS y"), timestampedQuery3.getKey()); + + Entry timestampedQuery4 = actualQueries.get(new Attribute("&query4")); + assertEquals(new ValueTime(43l), timestampedQuery4.getValue()); + assertEquals(new ValueQuery("SELECT 1000 AS foo"), timestampedQuery4.getKey()); + } + public int iterableSize(Iterable iterable) { int count = 0; for (T attribute : iterable) { -- cgit v1.2.3