diff options
author | Martin <marcin.j.chrzanowski@gmail.com> | 2020-01-10 16:26:00 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-10 16:26:00 +0100 |
commit | 3e3677a34ab63d05cbc7a3c45dca98a47fbac77f (patch) | |
tree | abe2cc377960b47626bd38350c470104d1a1c1d2 /src/main/java/pl | |
parent | 60025accad672dac8228b7bac1006126223e2e58 (diff) | |
parent | 98ec32765bd769b457561dd9fdf34fee544dd54b (diff) |
Merge pull request #87 from m-chrzan/gossip-girl
Begin gossip girl
Diffstat (limited to 'src/main/java/pl')
11 files changed, 531 insertions, 2 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java new file mode 100644 index 0000000..49bda14 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AttributesMessage.java @@ -0,0 +1,31 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import java.util.Map; + +import pl.edu.mimuw.cloudatlas.model.AttributesMap; +import pl.edu.mimuw.cloudatlas.model.PathName; + +public class AttributesMessage extends RemoteGossipGirlMessage { + private PathName path; + private AttributesMap attributes; + private long receiverGossipId; + + public AttributesMessage(String messageId, long timestamp, PathName path, AttributesMap attributes, long receiverGossipId) { + super(messageId, timestamp, Type.ATTRIBUTES); + this.path = path; + this.attributes = attributes; + this.receiverGossipId = receiverGossipId; + } + + public PathName getPath() { + return path; + } + + public AttributesMap getAttributes() { + return attributes; + } + + public long getReceiverGossipId() { + return receiverGossipId; + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java new file mode 100644 index 0000000..508fe88 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java @@ -0,0 +1,29 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; + +public abstract class GossipGirlMessage extends AgentMessage { + public enum Type { + ATTRIBUTES, + HEJKA, + INITIATE, + NO_CO_TAM, + QUERY + } + + private Type type; + + public GossipGirlMessage(String messageId, long timestamp, Type type) { + super(messageId, ModuleType.GOSSIP, timestamp); + this.type = type; + } + + public Type getType() { + return type; + } + + public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType { + module.handleTyped(this); + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/HejkaMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/HejkaMessage.java new file mode 100644 index 0000000..340d939 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/HejkaMessage.java @@ -0,0 +1,32 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import java.util.Map; + +import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.PathName; +import pl.edu.mimuw.cloudatlas.model.ValueTime; + +public class HejkaMessage extends RemoteGossipGirlMessage { + private long senderGossipId; + private Map<PathName, ValueTime> zoneTimestamps; + private Map<Attribute, ValueTime> queryTimestamps; + + public HejkaMessage(String messageId, long timestamp, long senderGossipId, Map<PathName, ValueTime> zoneTimestamps, Map<Attribute, ValueTime> queryTimestamps) { + super(messageId, timestamp, Type.HEJKA); + this.senderGossipId = senderGossipId; + this.zoneTimestamps = zoneTimestamps; + this.queryTimestamps = queryTimestamps; + } + + public long getSenderGossipId() { + return senderGossipId; + } + + public Map<PathName, ValueTime> getZoneTimestamps() { + return zoneTimestamps; + } + + public Map<Attribute, ValueTime> getQueryTimestamps() { + return queryTimestamps; + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/InitiateGossipMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/InitiateGossipMessage.java new file mode 100644 index 0000000..955570e --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/InitiateGossipMessage.java @@ -0,0 +1,24 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; +import pl.edu.mimuw.cloudatlas.model.PathName; +import pl.edu.mimuw.cloudatlas.model.ValueContact; + +public class InitiateGossipMessage extends GossipGirlMessage { + private PathName ourPath; + private ValueContact theirContact; + + public InitiateGossipMessage(String messageId, long timestamp, PathName ourPath, ValueContact theirContact) { + super(messageId, timestamp, Type.INITIATE); + this.ourPath = ourPath; + this.theirContact = theirContact; + } + + public PathName getOurPath() { + return ourPath; + } + + public ValueContact getTheirContact() { + return theirContact; + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/NoCoTamMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/NoCoTamMessage.java new file mode 100644 index 0000000..3dd0c4d --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/NoCoTamMessage.java @@ -0,0 +1,50 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import java.util.Map; + +import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.PathName; +import pl.edu.mimuw.cloudatlas.model.ValueTime; + +public class NoCoTamMessage extends RemoteGossipGirlMessage { + private long receiverGossipId; + private long senderGossipId; + private Map<PathName, ValueTime> zoneTimestamps; + private Map<Attribute, ValueTime> queryTimestamps; + private ValueTime hejkaSendTimestamp; + private ValueTime hejkaReceiveTimestamp; + + public NoCoTamMessage(String messageId, long timestamp, long receiverGossipId, long senderGossipId, Map<PathName, ValueTime> zoneTimestamps, Map<Attribute, ValueTime> queryTimestamps, ValueTime hejkaSendTimestamp, ValueTime hejkaReceiveTimestamp) { + super(messageId, timestamp, Type.NO_CO_TAM); + this.receiverGossipId = receiverGossipId; + this.senderGossipId = senderGossipId; + this.zoneTimestamps = zoneTimestamps; + this.queryTimestamps = queryTimestamps; + this.hejkaSendTimestamp = hejkaSendTimestamp; + this.hejkaReceiveTimestamp = hejkaReceiveTimestamp; + } + + public long getReceiverGossipId() { + return receiverGossipId; + } + + public long getSenderGossipId() { + return senderGossipId; + } + + public Map<PathName, ValueTime> getZoneTimestamps() { + return zoneTimestamps; + } + + public Map<Attribute, ValueTime> getQueryTimestamps() { + return queryTimestamps; + } + + public ValueTime getHejkaSendTimestamp() { + return hejkaSendTimestamp; + } + + public ValueTime getHejkaReceiveTimestamp() { + return hejkaReceiveTimestamp; + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java new file mode 100644 index 0000000..2b3b064 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QueryMessage.java @@ -0,0 +1,29 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.ValueQuery; + +public class QueryMessage extends RemoteGossipGirlMessage { + private Attribute name; + private ValueQuery query; + private long receiverGossipId; + + public QueryMessage(String messageId, long timestamp, Attribute name, ValueQuery query, long receiverGossipId) { + super(messageId, timestamp, Type.QUERY); + this.name = name; + this.query = query; + this.receiverGossipId = receiverGossipId; + } + + public Attribute getName() { + return name; + } + + public ValueQuery getQuery() { + return query; + } + + public long getReceiverGossipId() { + return receiverGossipId; + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java new file mode 100644 index 0000000..0a3a868 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java @@ -0,0 +1,28 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import pl.edu.mimuw.cloudatlas.model.ValueTime; + +public class RemoteGossipGirlMessage extends GossipGirlMessage { + private ValueTime sentTimestamp; + private ValueTime receivedTimestamp; + + public RemoteGossipGirlMessage(String messageId, long timestamp, Type type) { + super(messageId, timestamp, type); + } + + public void setSentTimestamp(ValueTime sentTimestamp) { + this.sentTimestamp = sentTimestamp; + } + + public void setReceivedTimestamp(ValueTime receivedTimestamp) { + this.receivedTimestamp = receivedTimestamp; + } + + public ValueTime getSentTimestamp() { + return sentTimestamp; + } + + public ValueTime getReceivedTimestamp() { + return receivedTimestamp; + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java new file mode 100644 index 0000000..73aef8b --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirl.java @@ -0,0 +1,106 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import pl.edu.mimuw.cloudatlas.agent.messages.AttributesMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.GossipGirlMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.HejkaMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.InitiateGossipMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.QueryMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.StateMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.AttributesMap; +import pl.edu.mimuw.cloudatlas.model.PathName; +import pl.edu.mimuw.cloudatlas.model.ValueQuery; +import pl.edu.mimuw.cloudatlas.model.ValueTime; +import pl.edu.mimuw.cloudatlas.model.ZMI; + +public class GossipGirl extends Module { + private long nextGossipId = 0; + + private Map<Long, GossipGirlState> gossipStates; + public GossipGirl() { + super(ModuleType.GOSSIP); + gossipStates = new HashMap(); + } + + public void handleTyped(GossipGirlMessage message) throws InterruptedException, InvalidMessageType { + switch(message.getType()) { + case INITIATE: + initiateGossip((InitiateGossipMessage) message); + break; + case NO_CO_TAM: + handleNoCoTam((NoCoTamMessage) message); + break; + default: + throw new InvalidMessageType("This type of message cannot be handled by GossipGirl"); + } + } + + public void handleTyped(ResponseMessage message) throws InterruptedException, InvalidMessageType { + switch(message.getType()) { + case STATE: + setState((StateMessage) message); + break; + default: + throw new InvalidMessageType("This type of message cannot be handled by GossipGirl"); + } + } + + private void initiateGossip(InitiateGossipMessage message) throws InterruptedException { + Long gossipId = nextGossipId; + nextGossipId++; + gossipStates.put(gossipId, new GossipGirlState(gossipId, message.getOurPath(), message.getTheirContact(), true)); + + GetStateMessage getState = new GetStateMessage("", 0, ModuleType.GOSSIP, gossipId); + sendMessage(getState); + } + + private void setState(StateMessage message) throws InterruptedException { + GossipGirlState state = gossipStates.get(message.getRequestId()); + if (state != null) { + state.setState(message.getZMI(), message.getQueries()); + if (state.state == GossipGirlState.State.SEND_HEJKA) { + HejkaMessage hejka = new HejkaMessage( + "", + 0, + state.gossipId, + state.getZoneTimestampsToSend(), + state.getQueryTimestampsToSend() + ); + UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, hejka); + sendMessage(udupMessage); + state.sentHejka(); + } + } else { + System.out.println("ERROR: GossipGirl got state for a nonexistent gossip"); + } + } + + private void handleNoCoTam(NoCoTamMessage message) throws InterruptedException { + GossipGirlState state = gossipStates.get(message.getReceiverGossipId()); + if (state != null) { + state.handleNoCoTam(message); + for (ZMI zmi : state.getZMIsToSend()) { + AttributesMessage attributesMessage = new AttributesMessage("", 0, zmi.getPathName(), zmi.getAttributes(), state.theirGossipId); + UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, attributesMessage); + sendMessage(udupMessage); + } + + for (Entry<Attribute, ValueQuery> query : state.getQueriesToSend()) { + QueryMessage queryMessage = new QueryMessage("", 0, query.getKey(), query.getValue(), state.theirGossipId); + UDUPMessage udupMessage = new UDUPMessage("", 0, state.theirContact, queryMessage); + sendMessage(udupMessage); + } + state.sentInfo(); + } else { + System.out.println("ERROR: GossipGirl got state for a nonexistent gossip"); + } + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java new file mode 100644 index 0000000..df17957 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/GossipGirlState.java @@ -0,0 +1,197 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import pl.edu.mimuw.cloudatlas.agent.messages.NoCoTamMessage; +import pl.edu.mimuw.cloudatlas.model.Attribute; +import pl.edu.mimuw.cloudatlas.model.PathName; +import pl.edu.mimuw.cloudatlas.model.ValueContact; +import pl.edu.mimuw.cloudatlas.model.ValueQuery; +import pl.edu.mimuw.cloudatlas.model.ValueTime; +import pl.edu.mimuw.cloudatlas.model.ValueUtils; +import pl.edu.mimuw.cloudatlas.model.ZMI; + +public class GossipGirlState { + public enum State { + WAIT_FOR_STATE_INITIALIZER, + WAIT_FOR_STATE_RESPONDER, + SEND_HEJKA, + SEND_INFO, + WAIT_FOR_NO_CO_TAM, + WAIT_FOR_FIRST_INFO, + WAIT_FOR_INFO, + ERROR + } + public PathName ourPath; + public ValueContact theirContact; + public long gossipId; + public long theirGossipId; + public long timeOffest; + public State state; + public ZMI hierarchy; + public Map<Attribute, Entry<ValueQuery, ValueTime>> queries; + public ValueTime hejkaSendTimestamp; + public ValueTime hejkaReceiveTimestamp; + public ValueTime noCoTamSendTimestamp; + public ValueTime noCoTamSendReceiveTimestamp; + private Map<PathName, ValueTime> theirZoneTimestamps; + private Map<Attribute, ValueTime> theirQueryTimestamps; + + public GossipGirlState(long gossipId, PathName ourPath, ValueContact theirContact, boolean initiating) { + this.gossipId = gossipId; + this.ourPath = ourPath; + this.theirContact = theirContact; + if (initiating) { + state = State.WAIT_FOR_STATE_INITIALIZER; + } else { + state = State.WAIT_FOR_STATE_RESPONDER; + } + } + + public void setState(ZMI hierarchy, Map<Attribute, Entry<ValueQuery, ValueTime>> queries) { + switch (state) { + case WAIT_FOR_STATE_INITIALIZER: + this.hierarchy = hierarchy; + this.queries = queries; + state = State.SEND_HEJKA; + break; + case WAIT_FOR_STATE_RESPONDER: + this.hierarchy = hierarchy; + this.queries = queries; + state = State.WAIT_FOR_FIRST_INFO; + break; + default: + System.out.println("ERROR: tried to set gossip state when not expected"); + state = State.ERROR; + } + } + + public void sentHejka() { + switch (state) { + case SEND_HEJKA: + state = state.WAIT_FOR_NO_CO_TAM; + break; + default: + System.out.println("ERROR: tried to set gossip state when not expected"); + state = State.ERROR; + } + } + + public void handleNoCoTam(NoCoTamMessage message) { + switch (state) { + case WAIT_FOR_NO_CO_TAM: + theirGossipId = message.getSenderGossipId(); + theirZoneTimestamps = message.getZoneTimestamps(); + theirQueryTimestamps = message.getQueryTimestamps(); + hejkaSendTimestamp = message.getHejkaSendTimestamp(); + hejkaReceiveTimestamp = message.getHejkaReceiveTimestamp(); + state = State.SEND_INFO; + break; + default: + System.out.println("ERROR: tried to set gossip state when not expected"); + state = State.ERROR; + } + } + + public Map<PathName, ValueTime> getZoneTimestampsToSend() { + Map<PathName, ValueTime> timestamps = new HashMap(); + collectZoneTimestamps(timestamps, hierarchy, theirContact.getName()); + return timestamps; + } + + public Map<Attribute, ValueTime> getQueryTimestampsToSend() { + Map<Attribute, ValueTime> queryTimestamps= new HashMap(); + for (Entry<Attribute, Entry<ValueQuery, ValueTime>> query : queries.entrySet()) { + queryTimestamps.put(query.getKey(), query.getValue().getValue()); + } + + return queryTimestamps; + } + + public List<ZMI> getZMIsToSend() { + List<ZMI> zmis = new LinkedList(); + for (Entry<PathName, ValueTime> timestampedPath : getZoneTimestampsToSend().entrySet()) { + ValueTime theirTimestamp = theirZoneTimestamps.get(timestampedPath.getKey()); + if (theirTimestamp == null || ValueUtils.valueLower(theirTimestamp, timestampedPath.getValue())) { + System.out.println("going to send " + timestampedPath.getKey().toString()); + try { + zmis.add(hierarchy.findDescendant(timestampedPath.getKey())); + } catch (ZMI.NoSuchZoneException e) { + System.out.println("ERROR: didn't find a zone we wanted to send in getZMIsToSend"); + } + } + } + return zmis; + } + + public List<Entry<Attribute, ValueQuery>> getQueriesToSend() { + List<Entry<Attribute, ValueQuery>> queryList = new LinkedList(); + for (Entry<Attribute, ValueTime> timestampedQuery : getQueryTimestampsToSend().entrySet()) { + ValueTime theirTimestamp = theirQueryTimestamps.get(timestampedQuery.getKey()); + if (theirTimestamp == null || ValueUtils.valueLower(theirTimestamp, timestampedQuery.getValue())) { + queryList.add( + new SimpleImmutableEntry( + timestampedQuery.getKey(), + queries.get(timestampedQuery.getKey()).getKey() + ) + ); + } + } + return queryList; + } + + public void collectZoneTimestamps(Map<PathName, ValueTime> timestamps, ZMI currentZMI, PathName recipientPath) { + for (ZMI zmi : currentZMI.getSons()) { + if (interestedIn(recipientPath, zmi.getPathName())) { + ValueTime timestamp = (ValueTime) zmi.getAttributes().getOrNull("timestamp"); + if (timestamp != null) { + timestamps.put(zmi.getPathName(), timestamp); + } else { + System.out.println("ERROR: collectZoneTimestamps encountered a zone with no timestamp"); + } + } else { + collectZoneTimestamps(timestamps, zmi, recipientPath); + } + } + } + + public boolean interestedIn(PathName recipientPath, PathName zmiPath) { + return isPrefix(zmiPath.levelUp(), recipientPath) && !isPrefix(zmiPath, recipientPath); + } + + public boolean isPrefix(PathName prefix, PathName path) { + List<String> prefixComponents = prefix.getComponents(); + List<String> pathComponents = path.getComponents(); + + if (prefixComponents.size() > pathComponents.size()) { + return false; + } + + Iterator<String> prefixIterator = prefixComponents.iterator(); + Iterator<String> pathIterator = pathComponents.iterator(); + + while (prefixIterator.hasNext()) { + if (!prefixIterator.next().equals(pathIterator.next())) { + return false; + } + } + return true; + } + + public void sentInfo() { + switch (state) { + case SEND_INFO: + state = State.WAIT_FOR_INFO; + break; + default: + System.out.println("ERROR: tried to set gossip state when not expected"); + state = State.ERROR; + } + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java index 67fdab9..ec87649 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java @@ -28,6 +28,10 @@ public abstract class Module { throw new InvalidMessageType("Got a TimerSchedulerMessage in module " + moduleType.toString()); } + public void handleTyped(GossipGirlMessage message) throws InterruptedException, InvalidMessageType { + throw new InvalidMessageType("Got a GossipGirlMessage in module " + moduleType.toString()); + } + public void handleTyped(QurnikMessage message) throws InterruptedException, InvalidMessageType { throw new InvalidMessageType("Got a QurnikMessage in module " + moduleType.toString()); } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/ModuleType.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/ModuleType.java index d221f06..40e629a 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/ModuleType.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/ModuleType.java @@ -5,8 +5,7 @@ public enum ModuleType { TIMER_GTP, RMI, UDP, - GOSSIP_IN, - GOSSIP_OUT, + GOSSIP, STATE, QUERY, // for testing |