diff options
Diffstat (limited to 'src/main/java/pl/edu/mimuw')
7 files changed, 67 insertions, 1 deletions
| diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 24ed0c1..06e067b 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -46,9 +46,12 @@ public class Agent {          String selectionStrategy = System.getProperty("Gossip.zone_strategy");          Long queryPeriod = Long.getLong("query_period");          Long gossipPeriod = Long.getLong("gossip_period"); +        Long freshnessPeriod = Long.getLong("freshness_period");          HierarchyConfig hierarchyConfig = new HierarchyConfig(eventBus, zonePath, selectionStrategy);          hierarchyConfig.startQueries(queryPeriod);          hierarchyConfig.startGossip(gossipPeriod, zonePath); +        // TODO: should this be different than ZMI freshness period? +        hierarchyConfig.startCleaningGossips(freshnessPeriod);      }  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java index f9e323a..92b7f66 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/HierarchyConfig.java @@ -153,4 +153,20 @@ public class HierarchyConfig {          AgentUtils.startRecursiveTask(taskSupplier, queriesPeriod, eventBus);      } + +    public void startCleaningGossips(long gossipCleanPeriod) { +        Supplier<TimerScheduledTask> taskSupplier = () -> +                new TimerScheduledTask() { +                    public void run() { +                        try { +                            System.out.println("INFO: Scheduling old gossip cleanup"); +                            sendMessage(new CleanOldGossipsMessage("", 0, ValueUtils.addToTime(ValueUtils.currentTime(), -gossipCleanPeriod))); +                        } catch (InterruptedException e) { +                            System.out.println("Interrupted while triggering queries"); +                        } +                    } +                }; + +        AgentUtils.startRecursiveTask(taskSupplier, gossipCleanPeriod, eventBus); +    }  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/CleanOldGossipsMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/CleanOldGossipsMessage.java new file mode 100644 index 0000000..6c3cb7e --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/CleanOldGossipsMessage.java @@ -0,0 +1,16 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import pl.edu.mimuw.cloudatlas.model.ValueTime; + +public class CleanOldGossipsMessage extends GossipGirlMessage { +    private ValueTime ageThreshold; + +    public CleanOldGossipsMessage(String messageId, long timestamp, ValueTime ageThreshold) { +        super(messageId, timestamp, Type.CLEAN); +        this.ageThreshold = ageThreshold; +    } + +    public ValueTime getAgeThreshold() { +        return ageThreshold; +    } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java index 03525bb..6255e3b 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java @@ -9,7 +9,8 @@ public abstract class GossipGirlMessage extends AgentMessage {          HEJKA,          INITIATE,          NO_CO_TAM, -        QUERY +        QUERY, +        CLEAN      }      private Type type; diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java index 0cfa527..e795b83 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -2,10 +2,12 @@ package pl.edu.mimuw.cloudatlas.agent.modules;  import java.util.AbstractMap.SimpleImmutableEntry;  import java.util.HashMap; +import java.util.Iterator;  import java.util.Map;  import java.util.Map.Entry;  import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.CleanOldGossipsMessage;  import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage;  import pl.edu.mimuw.cloudatlas.agent.messages.GossipGirlMessage;  import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage; @@ -51,6 +53,9 @@ public class GossipGirl extends Module {              case QUERY:                  handleQuery((QueryMessage) message);                  break; +            case CLEAN: +                cleanOldGossips((CleanOldGossipsMessage) message); +                break;              default:                  throw new InvalidMessageType("This type of message cannot be handled by GossipGirl");          } @@ -95,6 +100,7 @@ public class GossipGirl extends Module {      private void setState(StateMessage message) throws InterruptedException {          GossipGirlState state = gossipStates.get(message.getRequestId());          if (state != null) { +            state.setLastAction();              state.setState(message.getZMI(), message.getQueries());              if (state.state == GossipGirlState.State.SEND_HEJKA) {                  HejkaMessage hejka = new HejkaMessage( @@ -132,6 +138,7 @@ public class GossipGirl extends Module {      private void handleNoCoTam(NoCoTamMessage message) throws InterruptedException {          GossipGirlState state = gossipStates.get(message.getReceiverGossipId());          if (state != null) { +            state.setLastAction();              state.handleNoCoTam(message);              sendInfo(state);          } else { @@ -157,6 +164,7 @@ public class GossipGirl extends Module {      private void handleAttributes(AttributesMessage message) throws InterruptedException {          GossipGirlState state = gossipStates.get(message.getReceiverGossipId());          if (state != null) { +            state.setLastAction();              state.gotAttributes(message);              if (state.state == GossipGirlState.State.SEND_INFO || state.state == GossipGirlState.State.SEND_INFO_AND_FINISH) {                  sendInfo(state); @@ -174,6 +182,7 @@ public class GossipGirl extends Module {      private void handleQuery(QueryMessage message) throws InterruptedException {          GossipGirlState state = gossipStates.get(message.getReceiverGossipId());          if (state != null) { +            state.setLastAction();              state.gotQuery(message.getName());              Map<Attribute, Entry<ValueQuery, ValueTime>> queries = new HashMap();              queries.put( @@ -189,4 +198,15 @@ public class GossipGirl extends Module {              System.out.println("ERROR: GossipGirl got query for a nonexistent gossip");          }      } + +    private void cleanOldGossips(CleanOldGossipsMessage message) { +        Iterator<Entry<Long, GossipGirlState>> iterator = gossipStates.entrySet().iterator(); +        while (iterator.hasNext()) { +            GossipGirlState state = iterator.next().getValue(); +            if (state.lastAction.isLowerThan(message.getAgeThreshold()).getValue()) { +                System.out.println("INFO: GossipGirl removing old gossip " + Long.toString(state.gossipId)); +                iterator.remove(); +            } +        } +    }  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java index eafbcca..70d57d9 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -36,6 +36,7 @@ public class GossipGirlState {          FINISHED,          ERROR      } +    public ValueTime lastAction;      public PathName ourPath;      public ValueContact theirContact;      public long gossipId; @@ -64,6 +65,11 @@ public class GossipGirlState {          } else {              state = State.APPLY_HEJKA;          } +        this.lastAction = ValueUtils.currentTime(); +    } + +    public void setLastAction() { +        lastAction = ValueUtils.currentTime();      }      public void setState(ZMI hierarchy, Map<Attribute, Entry<ValueQuery, ValueTime>> queries) { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java index 3df8231..866349f 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueUtils.java @@ -12,4 +12,8 @@ public class ValueUtils {      public static ValueTime currentTime() {          return new ValueTime(System.currentTimeMillis());      } + +    public static ValueTime addToTime(ValueTime time, long millis) { +        return time.addValue(new ValueDuration(millis)); +    }  } |