m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java10
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java5
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java7
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java4
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java22
7 files changed, 35 insertions, 17 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java
index 508fe88..03525bb 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java
@@ -19,6 +19,8 @@ public abstract class GossipGirlMessage extends AgentMessage {
this.type = type;
}
+ public GossipGirlMessage() {};
+
public Type getType() {
return type;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java
index 0a3a868..4c223f5 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java
@@ -10,6 +10,8 @@ public class RemoteGossipGirlMessage extends GossipGirlMessage {
super(messageId, timestamp, type);
}
+ public RemoteGossipGirlMessage() {};
+
public void setSentTimestamp(ValueTime sentTimestamp) {
this.sentTimestamp = sentTimestamp;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
index 3751b3c..b955340 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
@@ -6,15 +6,15 @@ import pl.edu.mimuw.cloudatlas.model.ValueContact;
public class UDUPMessage extends AgentMessage {
private ValueContact contact;
- private AgentMessage content;
+ private RemoteGossipGirlMessage content;
- public UDUPMessage(String messageId, long timestamp, ValueContact contact, AgentMessage content) {
+ public UDUPMessage(String messageId, long timestamp, ValueContact contact, RemoteGossipGirlMessage content) {
super(messageId, ModuleType.UDP, timestamp);
this.contact = contact;
this.content = content;
}
- public UDUPMessage(String messageId, ValueContact contact, AgentMessage content) {
+ public UDUPMessage(String messageId, ValueContact contact, RemoteGossipGirlMessage content) {
super(messageId, ModuleType.UDP);
this.contact = contact;
this.content = content;
@@ -27,11 +27,11 @@ public class UDUPMessage extends AgentMessage {
module.handleTyped(this);
}
- public AgentMessage getContent() {
+ public RemoteGossipGirlMessage getContent() {
return content;
}
- public void setContent(AgentMessage content) {
+ public void setContent(RemoteGossipGirlMessage content) {
this.content = content;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
index e2243e1..501c76e 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
@@ -23,11 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
* due to ValueContact design
*/
-// TODO add timestamps as close to sending as possible
-
-// TODO wysylac tylko remotegossipgirl message
-// TODO update timestampow odpowiedni w tym remotegossipgirlmessage
-
public class UDUP extends Module {
private UDUPClient client;
private UDUPServer server;
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
index 2e4f0b4..089cad2 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
@@ -1,6 +1,8 @@
package pl.edu.mimuw.cloudatlas.agent.modules;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+import pl.edu.mimuw.cloudatlas.model.ValueTime;
+import pl.edu.mimuw.cloudatlas.model.ValueUtils;
import javax.xml.crypto.Data;
import java.io.IOException;
@@ -67,9 +69,12 @@ public class UDUPClient {
public void sendMessage(UDUPMessage msg) throws IOException {
int packetNo = 1;
byte[] sendBuf;
- byte[] dataBuf = this.serializer.serialize(msg);
+ byte[] dataBuf;
this.lastTransmission++;
+ msg.getContent().setSentTimestamp(ValueUtils.currentTime());
+ dataBuf = this.serializer.serialize(msg);
+
do {
sendBuf = packSendBuffer(this.lastTransmission, packetNo, dataBuf);
DatagramPacket packet = new DatagramPacket(sendBuf, 0, sendBuf.length, msg.getContact().getAddress(), this.serverPort);
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
index 40c4d7c..0f7b99d 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
@@ -12,6 +12,7 @@ import java.io.ByteArrayOutputStream;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.rmi.Remote;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -138,6 +139,9 @@ public class UDUPSerializer {
kryo.register(UDUPMessage.class);
kryo.register(UpdateAttributesMessage.class);
kryo.register(UpdateQueriesMessage.class);
+ kryo.register(GossipGirlMessage.class);
+ kryo.register(GossipGirlMessage.Type.class);
+ kryo.register(RemoteGossipGirlMessage.class);
// modules
kryo.register(TimerScheduledTask.class);
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index d6180be..94882e4 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules;
import com.google.common.primitives.Bytes;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+import pl.edu.mimuw.cloudatlas.model.ValueUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -54,14 +55,19 @@ public class UDUPServer implements Runnable {
String transmissionID = packTransmissionID(transmissionNo, packet.getAddress());
int packetNo = readPacketNo(packet.getData());
byte[] packetData = trimPacketBuffer(packet.getData());
+ UDUPMessage msg;
if (packetNo == 1 && packet.getLength() < this.bufSize) {
- UDUPMessage msg = this.serializer.deserialize(packetData);
+ msg = this.serializer.deserialize(packetData);
+ msg.getContent().setReceivedTimestamp(ValueUtils.currentTime());
System.out.println("UDP received message " + msg.getContent().getMessageId());
- sendMessageFurther(msg);
} else {
System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo);
- this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData);
+ msg = this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData);
+ }
+
+ if (msg != null) {
+ sendMessageFurther(msg);
}
}
@@ -124,14 +130,16 @@ public class UDUPServer implements Runnable {
return fullData;
}
- public void addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) {
+ public UDUPMessage addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) {
+ UDUPMessage msg = null;
+
if (this.partialPackets.containsKey(transmissionID)) {
try {
byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData);
- UDUPMessage msg = this.serializer.deserialize(allPacketData);
+ msg = this.serializer.deserialize(allPacketData);
+ msg.getContent().setReceivedTimestamp(ValueUtils.currentTime());
this.partialPackets.remove(transmissionID);
System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId());
- this.udp.sendMessage(msg.getContent());
} catch (Error | Exception e) {
System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest");
}
@@ -140,6 +148,8 @@ public class UDUPServer implements Runnable {
newTransmission.add(newPacketNo-1, packetData);
this.partialPackets.put(transmissionID, newTransmission);
}
+
+ return msg;
}
public void close() {