m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
authorMarcin Chrzanowski <marcin.j.chrzanowski@gmail.com>2020-01-12 17:10:36 +0100
committerMarcin Chrzanowski <marcin.j.chrzanowski@gmail.com>2020-01-12 17:10:36 +0100
commita07000c6dae6f84dd221454fca24b65e9579afae (patch)
treecd647a463f073840842a7d8283597031322d7c59 /src/main
parent9c6a9bb5a8ce6d3282d1fd669214911c302e9aca (diff)
parentf0076abd246bdf40ba4a03c0c894f6f75173eac8 (diff)
Merge branch 'master' into logs-n-stuff
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java6
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java20
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/CleanOldGossipsMessage.java16
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java3
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java22
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java6
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java7
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java4
9 files changed, 76 insertions, 10 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
index 0cbda2d..06e067b 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
@@ -42,14 +42,16 @@ public class Agent {
// TODO: make query period confiurable with config file and from tests
- // TODO config setup
String zonePath = System.getProperty("zone_path");
String selectionStrategy = System.getProperty("Gossip.zone_strategy");
Long queryPeriod = Long.getLong("query_period");
Long gossipPeriod = Long.getLong("gossip_period");
+ Long freshnessPeriod = Long.getLong("freshness_period");
HierarchyConfig hierarchyConfig = new HierarchyConfig(eventBus, zonePath, selectionStrategy);
hierarchyConfig.startQueries(queryPeriod);
- hierarchyConfig.startGossip(gossipPeriod);
+ hierarchyConfig.startGossip(gossipPeriod, zonePath);
+ // TODO: should this be different than ZMI freshness period?
+ hierarchyConfig.startCleaningGossips(freshnessPeriod);
}
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java
index dc5241d..92b7f66 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java
@@ -37,7 +37,7 @@ public class HierarchyConfig {
}
}
- public void startGossip(long gossipPeriod) {
+ public void startGossip(long gossipPeriod, String zonePath) {
Supplier<TimerScheduledTask> taskSupplier = () ->
new TimerScheduledTask() {
public void run() {
@@ -47,7 +47,7 @@ public class HierarchyConfig {
ValueContact contact = selectContactFromLevel(gossipLevel);
if (contact != null) {
System.out.println("INFO: found a contact " + contact.toString());
- InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact);
+ InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName(zonePath), contact);
sendMessage(message);
} else {
System.out.println("DEBUG: couldn't find contact for gossip");
@@ -153,4 +153,20 @@ public class HierarchyConfig {
AgentUtils.startRecursiveTask(taskSupplier, queriesPeriod, eventBus);
}
+
+ public void startCleaningGossips(long gossipCleanPeriod) {
+ Supplier<TimerScheduledTask> taskSupplier = () ->
+ new TimerScheduledTask() {
+ public void run() {
+ try {
+ System.out.println("INFO: Scheduling old gossip cleanup");
+ sendMessage(new CleanOldGossipsMessage("", 0, ValueUtils.addToTime(ValueUtils.currentTime(), -gossipCleanPeriod)));
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted while triggering queries");
+ }
+ }
+ };
+
+ AgentUtils.startRecursiveTask(taskSupplier, gossipCleanPeriod, eventBus);
+ }
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/CleanOldGossipsMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/CleanOldGossipsMessage.java
new file mode 100644
index 0000000..6c3cb7e
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/CleanOldGossipsMessage.java
@@ -0,0 +1,16 @@
+package pl.edu.mimuw.cloudatlas.agent.messages;
+
+import pl.edu.mimuw.cloudatlas.model.ValueTime;
+
+public class CleanOldGossipsMessage extends GossipGirlMessage {
+ private ValueTime ageThreshold;
+
+ public CleanOldGossipsMessage(String messageId, long timestamp, ValueTime ageThreshold) {
+ super(messageId, timestamp, Type.CLEAN);
+ this.ageThreshold = ageThreshold;
+ }
+
+ public ValueTime getAgeThreshold() {
+ return ageThreshold;
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java
index 03525bb..6255e3b 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java
@@ -9,7 +9,8 @@ public abstract class GossipGirlMessage extends AgentMessage {
HEJKA,
INITIATE,
NO_CO_TAM,
- QUERY
+ QUERY,
+ CLEAN
}
private Type type;
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
index 0ec9d6c..440df33 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java
@@ -2,10 +2,12 @@ package pl.edu.mimuw.cloudatlas.agent.modules;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.CleanOldGossipsMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.GossipGirlMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage;
@@ -52,6 +54,9 @@ public class GossipGirl extends Module {
case QUERY:
handleQuery((QueryMessage) message);
break;
+ case CLEAN:
+ cleanOldGossips((CleanOldGossipsMessage) message);
+ break;
default:
throw new InvalidMessageType("This type of message cannot be handled by GossipGirl");
}
@@ -100,6 +105,7 @@ public class GossipGirl extends Module {
GossipGirlState state = gossipStates.get(message.getRequestId());
if (state != null) {
System.out.println("INFO: setting state in gossip " + Long.toString(message.getRequestId()));
+ state.setLastAction();
state.setState(message.getZMI(), message.getQueries());
if (state.state == GossipGirlState.State.SEND_HEJKA) {
HejkaMessage hejka = new HejkaMessage(
@@ -140,6 +146,7 @@ public class GossipGirl extends Module {
GossipGirlState state = gossipStates.get(message.getReceiverGossipId());
if (state != null) {
System.out.println("INFO: handling NoCoTamMessage in" + Long.toString(message.getReceiverGossipId()));
+ state.setLastAction();
state.handleNoCoTam(message);
System.out.println("DEBUG: handled NoCoTam in GossipGirlState");
sendInfo(state);
@@ -171,8 +178,9 @@ public class GossipGirl extends Module {
GossipGirlState state = gossipStates.get(message.getReceiverGossipId());
if (state != null) {
System.out.println("INFO: handling Attributes in " + Long.toString(message.getReceiverGossipId()));
+ state.setLastAction();
state.gotAttributes(message);
- if (state.state == GossipGirlState.State.SEND_INFO) {
+ if (state.state == GossipGirlState.State.SEND_INFO || state.state == GossipGirlState.State.SEND_INFO_AND_FINISH) {
sendInfo(state);
}
UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes());
@@ -190,6 +198,7 @@ public class GossipGirl extends Module {
GossipGirlState state = gossipStates.get(message.getReceiverGossipId());
if (state != null) {
System.out.println("INFO: handling Query in " + Long.toString(message.getReceiverGossipId()));
+ state.setLastAction();
state.gotQuery(message.getName());
Map<Attribute, Entry<ValueQuery, ValueTime>> queries = new HashMap();
queries.put(
@@ -206,4 +215,15 @@ public class GossipGirl extends Module {
System.out.println("ERROR: GossipGirl got query for a nonexistent gossip");
}
}
+
+ private void cleanOldGossips(CleanOldGossipsMessage message) {
+ Iterator<Entry<Long, GossipGirlState>> iterator = gossipStates.entrySet().iterator();
+ while (iterator.hasNext()) {
+ GossipGirlState state = iterator.next().getValue();
+ if (state.lastAction.isLowerThan(message.getAgeThreshold()).getValue()) {
+ System.out.println("INFO: GossipGirl removing old gossip " + Long.toString(state.gossipId));
+ iterator.remove();
+ }
+ }
+ }
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
index 6ee7474..e9bc02a 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java
@@ -36,6 +36,7 @@ public class GossipGirlState {
FINISHED,
ERROR
}
+ public ValueTime lastAction;
public PathName ourPath;
public ValueContact theirContact;
public long gossipId;
@@ -65,6 +66,11 @@ public class GossipGirlState {
} else {
state = State.APPLY_HEJKA;
}
+ this.lastAction = ValueUtils.currentTime();
+ }
+
+ public void setLastAction() {
+ lastAction = ValueUtils.currentTime();
}
public void setState(ZMI hierarchy, Map<Attribute, Entry<ValueQuery, ValueTime>> queries) {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java b/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java
index 210505d..4019696 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java
@@ -53,7 +53,7 @@ public class ClientController {
return size() > MAX_ENTRIES;
}
};
- this.currentZoneName = "/uw/violet07";
+ this.currentZoneName = System.getProperty("zone_path");
fetchAttributeData(); // fetch attribute data as early as possible
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java
index 15f8a59..12d795a 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java
@@ -92,7 +92,7 @@ public class Fetcher {
}
// https://jj09.net/interprocess-communication-python-java/
- private static void fetchData() {
+ private static void fetchData(String zonePath) {
BufferedReader bufferRead;
ArrayList deserializedAttribs;
String jsonAttribs;
@@ -111,7 +111,7 @@ public class Fetcher {
deserializedAttribs = deserializeAttribs(jsonAttribs);
for (int i = 0; i < fetcherAttributeNames.size(); i++) {
api.setAttributeValue(
- "/uw/violet07",
+ zonePath,
fetcherAttributeNames.get(i),
packAttributeValue(
deserializedAttribs.get(i),
@@ -143,7 +143,8 @@ public class Fetcher {
}
public static void main(String[] args) {
+ String zonePath = System.getProperty("zone_path");
parseArgs(args);
- fetchData();
+ fetchData(zonePath);
}
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java
index 3df8231..866349f 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java
@@ -12,4 +12,8 @@ public class ValueUtils {
public static ValueTime currentTime() {
return new ValueTime(System.currentTimeMillis());
}
+
+ public static ValueTime addToTime(ValueTime time, long millis) {
+ return time.addValue(new ValueDuration(millis));
+ }
}