diff options
Diffstat (limited to 'src/main')
9 files changed, 76 insertions, 10 deletions
| diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 0cbda2d..06e067b 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -42,14 +42,16 @@ public class Agent {          // TODO: make query period confiurable with config file and from tests -        // TODO config setup          String zonePath = System.getProperty("zone_path");          String selectionStrategy = System.getProperty("Gossip.zone_strategy");          Long queryPeriod = Long.getLong("query_period");          Long gossipPeriod = Long.getLong("gossip_period"); +        Long freshnessPeriod = Long.getLong("freshness_period");          HierarchyConfig hierarchyConfig = new HierarchyConfig(eventBus, zonePath, selectionStrategy);          hierarchyConfig.startQueries(queryPeriod); -        hierarchyConfig.startGossip(gossipPeriod); +        hierarchyConfig.startGossip(gossipPeriod, zonePath); +        // TODO: should this be different than ZMI freshness period? +        hierarchyConfig.startCleaningGossips(freshnessPeriod);      }  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java index dc5241d..92b7f66 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java @@ -37,7 +37,7 @@ public class HierarchyConfig {          }      } -    public void startGossip(long gossipPeriod) { +    public void startGossip(long gossipPeriod, String zonePath) {          Supplier<TimerScheduledTask> taskSupplier = () ->                  new TimerScheduledTask() {                      public void run() { @@ -47,7 +47,7 @@ public class HierarchyConfig {                              ValueContact contact = selectContactFromLevel(gossipLevel);                              if (contact != null) {                                  System.out.println("INFO: found a contact " + contact.toString()); -                                InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName("/uw/violet07"), contact); +                                InitiateGossipMessage message = new InitiateGossipMessage("", 0, new PathName(zonePath), contact);                                  sendMessage(message);                              } else {                                  System.out.println("DEBUG: couldn't find contact for gossip"); @@ -153,4 +153,20 @@ public class HierarchyConfig {          AgentUtils.startRecursiveTask(taskSupplier, queriesPeriod, eventBus);      } + +    public void startCleaningGossips(long gossipCleanPeriod) { +        Supplier<TimerScheduledTask> taskSupplier = () -> +                new TimerScheduledTask() { +                    public void run() { +                        try { +                            System.out.println("INFO: Scheduling old gossip cleanup"); +                            sendMessage(new CleanOldGossipsMessage("", 0, ValueUtils.addToTime(ValueUtils.currentTime(), -gossipCleanPeriod))); +                        } catch (InterruptedException e) { +                            System.out.println("Interrupted while triggering queries"); +                        } +                    } +                }; + +        AgentUtils.startRecursiveTask(taskSupplier, gossipCleanPeriod, eventBus); +    }  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/CleanOldGossipsMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/CleanOldGossipsMessage.java new file mode 100644 index 0000000..6c3cb7e --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/CleanOldGossipsMessage.java @@ -0,0 +1,16 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import pl.edu.mimuw.cloudatlas.model.ValueTime; + +public class CleanOldGossipsMessage extends GossipGirlMessage { +    private ValueTime ageThreshold; + +    public CleanOldGossipsMessage(String messageId, long timestamp, ValueTime ageThreshold) { +        super(messageId, timestamp, Type.CLEAN); +        this.ageThreshold = ageThreshold; +    } + +    public ValueTime getAgeThreshold() { +        return ageThreshold; +    } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java index 03525bb..6255e3b 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java @@ -9,7 +9,8 @@ public abstract class GossipGirlMessage extends AgentMessage {          HEJKA,          INITIATE,          NO_CO_TAM, -        QUERY +        QUERY, +        CLEAN      }      private Type type; diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index 0ec9d6c..440df33 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -2,10 +2,12 @@ package pl.edu.mimuw.cloudatlas.agent.modules;  import java.util.AbstractMap.SimpleImmutableEntry;  import java.util.HashMap; +import java.util.Iterator;  import java.util.Map;  import java.util.Map.Entry;  import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.CleanOldGossipsMessage;  import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage;  import pl.edu.mimuw.cloudatlas.agent.messages.GossipGirlMessage;  import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage; @@ -52,6 +54,9 @@ public class GossipGirl extends Module {              case QUERY:                  handleQuery((QueryMessage) message);                  break; +            case CLEAN: +                cleanOldGossips((CleanOldGossipsMessage) message); +                break;              default:                  throw new InvalidMessageType("This type of message cannot be handled by GossipGirl");          } @@ -100,6 +105,7 @@ public class GossipGirl extends Module {          GossipGirlState state = gossipStates.get(message.getRequestId());          if (state != null) {              System.out.println("INFO: setting state in gossip " + Long.toString(message.getRequestId())); +            state.setLastAction();              state.setState(message.getZMI(), message.getQueries());              if (state.state == GossipGirlState.State.SEND_HEJKA) {                  HejkaMessage hejka = new HejkaMessage( @@ -140,6 +146,7 @@ public class GossipGirl extends Module {          GossipGirlState state = gossipStates.get(message.getReceiverGossipId());          if (state != null) {              System.out.println("INFO: handling NoCoTamMessage in" + Long.toString(message.getReceiverGossipId())); +            state.setLastAction();              state.handleNoCoTam(message);              System.out.println("DEBUG: handled NoCoTam in GossipGirlState");              sendInfo(state); @@ -171,8 +178,9 @@ public class GossipGirl extends Module {          GossipGirlState state = gossipStates.get(message.getReceiverGossipId());          if (state != null) {              System.out.println("INFO: handling Attributes in " + Long.toString(message.getReceiverGossipId())); +            state.setLastAction();              state.gotAttributes(message); -            if (state.state == GossipGirlState.State.SEND_INFO) { +            if (state.state == GossipGirlState.State.SEND_INFO || state.state == GossipGirlState.State.SEND_INFO_AND_FINISH) {                  sendInfo(state);              }              UpdateAttributesMessage updateMessage = new UpdateAttributesMessage("", 0, message.getPath().toString(), message.getAttributes()); @@ -190,6 +198,7 @@ public class GossipGirl extends Module {          GossipGirlState state = gossipStates.get(message.getReceiverGossipId());          if (state != null) {              System.out.println("INFO: handling Query in " + Long.toString(message.getReceiverGossipId())); +            state.setLastAction();              state.gotQuery(message.getName());              Map<Attribute, Entry<ValueQuery, ValueTime>> queries = new HashMap();              queries.put( @@ -206,4 +215,15 @@ public class GossipGirl extends Module {              System.out.println("ERROR: GossipGirl got query for a nonexistent gossip");          }      } + +    private void cleanOldGossips(CleanOldGossipsMessage message) { +        Iterator<Entry<Long, GossipGirlState>> iterator = gossipStates.entrySet().iterator(); +        while (iterator.hasNext()) { +            GossipGirlState state = iterator.next().getValue(); +            if (state.lastAction.isLowerThan(message.getAgeThreshold()).getValue()) { +                System.out.println("INFO: GossipGirl removing old gossip " + Long.toString(state.gossipId)); +                iterator.remove(); +            } +        } +    }  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index 6ee7474..e9bc02a 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -36,6 +36,7 @@ public class GossipGirlState {          FINISHED,          ERROR      } +    public ValueTime lastAction;      public PathName ourPath;      public ValueContact theirContact;      public long gossipId; @@ -65,6 +66,11 @@ public class GossipGirlState {          } else {              state = State.APPLY_HEJKA;          } +        this.lastAction = ValueUtils.currentTime(); +    } + +    public void setLastAction() { +        lastAction = ValueUtils.currentTime();      }      public void setState(ZMI hierarchy, Map<Attribute, Entry<ValueQuery, ValueTime>> queries) { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java b/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java index 210505d..4019696 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/client/ClientController.java @@ -53,7 +53,7 @@ public class ClientController {                  return size() > MAX_ENTRIES;              }          }; -        this.currentZoneName = "/uw/violet07"; +        this.currentZoneName = System.getProperty("zone_path");          fetchAttributeData(); // fetch attribute data as early as possible      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java index 15f8a59..12d795a 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java @@ -92,7 +92,7 @@ public class Fetcher {      }      // https://jj09.net/interprocess-communication-python-java/ -    private static void fetchData() { +    private static void fetchData(String zonePath) {          BufferedReader bufferRead;          ArrayList deserializedAttribs;          String jsonAttribs; @@ -111,7 +111,7 @@ public class Fetcher {                  deserializedAttribs = deserializeAttribs(jsonAttribs);                  for (int i = 0; i < fetcherAttributeNames.size(); i++) {                      api.setAttributeValue( -                            "/uw/violet07", +                            zonePath,                              fetcherAttributeNames.get(i),                              packAttributeValue(                                      deserializedAttribs.get(i), @@ -143,7 +143,8 @@ public class Fetcher {      }      public static void main(String[] args) { +        String zonePath = System.getProperty("zone_path");          parseArgs(args); -        fetchData(); +        fetchData(zonePath);      }  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java index 3df8231..866349f 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java @@ -12,4 +12,8 @@ public class ValueUtils {      public static ValueTime currentTime() {          return new ValueTime(System.currentTimeMillis());      } + +    public static ValueTime addToTime(ValueTime time, long millis) { +        return time.addValue(new ValueDuration(millis)); +    }  } |