diff options
author | Marcin Chrzanowski <marcin.j.chrzanowski@gmail.com> | 2020-01-12 17:10:36 +0100 |
---|---|---|
committer | Marcin Chrzanowski <marcin.j.chrzanowski@gmail.com> | 2020-01-12 17:10:36 +0100 |
commit | a07000c6dae6f84dd221454fca24b65e9579afae (patch) | |
tree | cd647a463f073840842a7d8283597031322d7c59 /src/main | |
parent | 9c6a9bb5a8ce6d3282d1fd669214911c302e9aca (diff) | |
parent | f0076abd246bdf40ba4a03c0c894f6f75173eac8 (diff) |
Merge branch 'master' into logs-n-stuff
Diffstat (limited to 'src/main')
9 files changed, 76 insertions, 10 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 0cbda2d..06e067b 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -42,14 +42,16 @@ public class Agent { // TODO: make query period confiurable with config file and from tests - // TODO config setup String zonePath = System.getProperty("zone_path"); String selectionStrategy = System.getProperty("Gossip.zone_strategy"); Long queryPeriod = Long.getLong("query_period"); Long gossipPeriod = Long.getLong("gossip_period"); + Long freshnessPeriod = Long.getLong("freshness_period"); HierarchyConfig hierarchyConfig = new HierarchyConfig(eventBus, zonePath, selectionStrategy); hierarchyConfig.startQueries(queryPeriod); - hierarchyConfig.startGossip(gossipPeriod); + hierarchyConfig.startGossip(gossipPeriod, zonePath); + // TODO: should this be different than ZMI freshness period? + hierarchyConfig.startCleaningGossips(freshnessPeriod); } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java index dc5241d..92b7f66 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java @@ -37,7 +37,7 @@ public class HierarchyConfig { } } - public void startGossip(long gossipPeriod) { + public void startGossip(long gossipPeriod, String zonePath) { Supplier<TimerScheduledTask> taskSupplier = () -> new TimerScheduledTask() { public void run() { @@ -47,7 +47,7 @@ public class HierarchyConfig { ValueContact contact = selectContactFromLevel(gossipLevel); if (contact != null) { System.out.println("INFO: found a contact " + contact.toString()); - InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact); + InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName(zonePath), contact); sendMessage(message); } else { System.out.println("DEBUG: couldn't find contact for gossip"); @@ -153,4 +153,20 @@ public class HierarchyConfig { AgentUtils.startRecursiveTask(taskSupplier, queriesPeriod, eventBus); } + + public void startCleaningGossips(long gossipCleanPeriod) { + Supplier<TimerScheduledTask> taskSupplier = () -> + new TimerScheduledTask() { + public void run() { + try { + System.out.println("INFO: Scheduling old gossip cleanup"); + sendMessage(new CleanOldGossipsMessage("", 0, ValueUtils.addToTime(ValueUtils.currentTime(), -gossipCleanPeriod))); + } catch (InterruptedException e) { + System.out.println("Interrupted while triggering queries"); + } + } + }; + + AgentUtils.startRecursiveTask(taskSupplier, gossipCleanPeriod, eventBus); + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/CleanOldGossipsMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/CleanOldGossipsMessage.java new file mode 100644 index 0000000..6c3cb7e --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/CleanOldGossipsMessage.java @@ -0,0 +1,16 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import pl.edu.mimuw.cloudatlas.model.ValueTime; + +public class CleanOldGossipsMessage extends GossipGirlMessage { + private ValueTime ageThreshold; + + public CleanOldGossipsMessage(String messageId, long timestamp, ValueTime ageThreshold) { + super(messageId, timestamp, Type.CLEAN); + this.ageThreshold = ageThreshold; + } + + public ValueTime getAgeThreshold() { + return ageThreshold; + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java index 03525bb..6255e3b 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java @@ -9,7 +9,8 @@ public abstract class GossipGirlMessage extends AgentMessage { HEJKA, INITIATE, NO_CO_TAM, - QUERY + QUERY, + CLEAN } private Type type; diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index 0ec9d6c..440df33 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -2,10 +2,12 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.CleanOldGossipsMessage; import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage; import pl.edu.mimuw.cloudatlas.agent.messages.GossipGirlMessage; import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage; @@ -52,6 +54,9 @@ public class GossipGirl extends Module { case QUERY: handleQuery((QueryMessage) message); break; + case CLEAN: + cleanOldGossips((CleanOldGossipsMessage) message); + break; default: throw new InvalidMessageType("This type of message cannot be handled by GossipGirl"); } @@ -100,6 +105,7 @@ public class GossipGirl extends Module { GossipGirlState state = gossipStates.get(message.getRequestId()); if (state != null) { System.out.println("INFO: setting state in gossip " + Long.toString(message.getRequestId())); + state.setLastAction(); state.setState(message.getZMI(), message.getQueries()); if (state.state == GossipGirlState.State.SEND_HEJKA) { HejkaMessage hejka = new HejkaMessage( @@ -140,6 +146,7 @@ public class GossipGirl extends Module { GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); if (state != null) { System.out.println("INFO: handling NoCoTamMessage in" + Long.toString(message.getReceiverGossipId())); + state.setLastAction(); state.handleNoCoTam(message); System.out.println("DEBUG: handled NoCoTam in GossipGirlState"); sendInfo(state); @@ -171,8 +178,9 @@ public class GossipGirl extends Module { GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); if (state != null) { System.out.println("INFO: handling Attributes in " + Long.toString(message.getReceiverGossipId())); + state.setLastAction(); state.gotAttributes(message); - if (state.state == GossipGirlState.State.SEND_INFO) { + if (state.state == GossipGirlState.State.SEND_INFO || state.state == GossipGirlState.State.SEND_INFO_AND_FINISH) { sendInfo(state); } UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes()); @@ -190,6 +198,7 @@ public class GossipGirl extends Module { GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); if (state != null) { System.out.println("INFO: handling Query in " + Long.toString(message.getReceiverGossipId())); + state.setLastAction(); state.gotQuery(message.getName()); Map<Attribute, Entry<ValueQuery, ValueTime>> queries = new HashMap(); queries.put( @@ -206,4 +215,15 @@ public class GossipGirl extends Module { System.out.println("ERROR: GossipGirl got query for a nonexistent gossip"); } } + + private void cleanOldGossips(CleanOldGossipsMessage message) { + Iterator<Entry<Long, GossipGirlState>> iterator = gossipStates.entrySet().iterator(); + while (iterator.hasNext()) { + GossipGirlState state = iterator.next().getValue(); + if (state.lastAction.isLowerThan(message.getAgeThreshold()).getValue()) { + System.out.println("INFO: GossipGirl removing old gossip " + Long.toString(state.gossipId)); + iterator.remove(); + } + } + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index 6ee7474..e9bc02a 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -36,6 +36,7 @@ public class GossipGirlState { FINISHED, ERROR } + public ValueTime lastAction; public PathName ourPath; public ValueContact theirContact; public long gossipId; @@ -65,6 +66,11 @@ public class GossipGirlState { } else { state = State.APPLY_HEJKA; } + this.lastAction = ValueUtils.currentTime(); + } + + public void setLastAction() { + lastAction = ValueUtils.currentTime(); } public void setState(ZMI hierarchy, Map<Attribute, Entry<ValueQuery, ValueTime>> queries) { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java b/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java index 210505d..4019696 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java @@ -53,7 +53,7 @@ public class ClientController { return size() > MAX_ENTRIES; } }; - this.currentZoneName = "/uw/violet07"; + this.currentZoneName = System.getProperty("zone_path"); fetchAttributeData(); // fetch attribute data as early as possible } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java index 15f8a59..12d795a 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java @@ -92,7 +92,7 @@ public class Fetcher { } // https://jj09.net/interprocess-communication-python-java/ - private static void fetchData() { + private static void fetchData(String zonePath) { BufferedReader bufferRead; ArrayList deserializedAttribs; String jsonAttribs; @@ -111,7 +111,7 @@ public class Fetcher { deserializedAttribs = deserializeAttribs(jsonAttribs); for (int i = 0; i < fetcherAttributeNames.size(); i++) { api.setAttributeValue( - "/uw/violet07", + zonePath, fetcherAttributeNames.get(i), packAttributeValue( deserializedAttribs.get(i), @@ -143,7 +143,8 @@ public class Fetcher { } public static void main(String[] args) { + String zonePath = System.getProperty("zone_path"); parseArgs(args); - fetchData(); + fetchData(zonePath); } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java index 3df8231..866349f 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java @@ -12,4 +12,8 @@ public class ValueUtils { public static ValueTime currentTime() { return new ValueTime(System.currentTimeMillis()); } + + public static ValueTime addToTime(ValueTime time, long millis) { + return time.addValue(new ValueDuration(millis)); + } } |