From 1f87c2d319b2671bd2d61feb3a76d102059c4c52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Fri, 10 Jan 2020 19:06:26 +0100 Subject: Make UDUP handle only gossip girl messages with its timestamps --- .../agent/messages/GossipGirlMessage.java | 2 ++ .../agent/messages/RemoteGossipGirlMessage.java | 2 ++ .../cloudatlas/agent/messages/UDUPMessage.java | 10 +++++----- .../edu/mimuw/cloudatlas/agent/modules/UDUP.java | 5 ----- .../mimuw/cloudatlas/agent/modules/UDUPClient.java | 7 ++++++- .../cloudatlas/agent/modules/UDUPSerializer.java | 4 ++++ .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 22 ++++++++++++++++------ 7 files changed, 35 insertions(+), 17 deletions(-) (limited to 'src/main') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java index 508fe88..03525bb 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java @@ -19,6 +19,8 @@ public abstract class GossipGirlMessage extends AgentMessage { this.type = type; } + public GossipGirlMessage() {}; + public Type getType() { return type; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java index 0a3a868..4c223f5 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java @@ -10,6 +10,8 @@ public class RemoteGossipGirlMessage extends GossipGirlMessage { super(messageId, timestamp, type); } + public RemoteGossipGirlMessage() {}; + public void setSentTimestamp(ValueTime sentTimestamp) { this.sentTimestamp = sentTimestamp; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java index 3751b3c..b955340 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java @@ -6,15 +6,15 @@ import pl.edu.mimuw.cloudatlas.model.ValueContact; public class UDUPMessage extends AgentMessage { private ValueContact contact; - private AgentMessage content; + private RemoteGossipGirlMessage content; - public UDUPMessage(String messageId, long timestamp, ValueContact contact, AgentMessage content) { + public UDUPMessage(String messageId, long timestamp, ValueContact contact, RemoteGossipGirlMessage content) { super(messageId, ModuleType.UDP, timestamp); this.contact = contact; this.content = content; } - public UDUPMessage(String messageId, ValueContact contact, AgentMessage content) { + public UDUPMessage(String messageId, ValueContact contact, RemoteGossipGirlMessage content) { super(messageId, ModuleType.UDP); this.contact = contact; this.content = content; @@ -27,11 +27,11 @@ public class UDUPMessage extends AgentMessage { module.handleTyped(this); } - public AgentMessage getContent() { + public RemoteGossipGirlMessage getContent() { return content; } - public void setContent(AgentMessage content) { + public void setContent(RemoteGossipGirlMessage content) { this.content = content; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java index e2243e1..501c76e 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java @@ -23,11 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean; * due to ValueContact design */ -// TODO add timestamps as close to sending as possible - -// TODO wysylac tylko remotegossipgirl message -// TODO update timestampow odpowiedni w tym remotegossipgirlmessage - public class UDUP extends Module { private UDUPClient client; private UDUPServer server; diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java index 2e4f0b4..089cad2 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java @@ -1,6 +1,8 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.model.ValueTime; +import pl.edu.mimuw.cloudatlas.model.ValueUtils; import javax.xml.crypto.Data; import java.io.IOException; @@ -67,9 +69,12 @@ public class UDUPClient { public void sendMessage(UDUPMessage msg) throws IOException { int packetNo = 1; byte[] sendBuf; - byte[] dataBuf = this.serializer.serialize(msg); + byte[] dataBuf; this.lastTransmission++; + msg.getContent().setSentTimestamp(ValueUtils.currentTime()); + dataBuf = this.serializer.serialize(msg); + do { sendBuf = packSendBuffer(this.lastTransmission, packetNo, dataBuf); DatagramPacket packet = new DatagramPacket(sendBuf, 0, sendBuf.length, msg.getContact().getAddress(), this.serverPort); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java index 40c4d7c..0f7b99d 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java @@ -12,6 +12,7 @@ import java.io.ByteArrayOutputStream; import java.net.Inet4Address; import java.net.InetAddress; import java.net.UnknownHostException; +import java.rmi.Remote; import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedHashMap; @@ -138,6 +139,9 @@ public class UDUPSerializer { kryo.register(UDUPMessage.class); kryo.register(UpdateAttributesMessage.class); kryo.register(UpdateQueriesMessage.class); + kryo.register(GossipGirlMessage.class); + kryo.register(GossipGirlMessage.Type.class); + kryo.register(RemoteGossipGirlMessage.class); // modules kryo.register(TimerScheduledTask.class); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index d6180be..94882e4 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import com.google.common.primitives.Bytes; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.model.ValueUtils; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -54,14 +55,19 @@ public class UDUPServer implements Runnable { String transmissionID = packTransmissionID(transmissionNo, packet.getAddress()); int packetNo = readPacketNo(packet.getData()); byte[] packetData = trimPacketBuffer(packet.getData()); + UDUPMessage msg; if (packetNo == 1 && packet.getLength() < this.bufSize) { - UDUPMessage msg = this.serializer.deserialize(packetData); + msg = this.serializer.deserialize(packetData); + msg.getContent().setReceivedTimestamp(ValueUtils.currentTime()); System.out.println("UDP received message " + msg.getContent().getMessageId()); - sendMessageFurther(msg); } else { System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo); - this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); + msg = this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); + } + + if (msg != null) { + sendMessageFurther(msg); } } @@ -124,14 +130,16 @@ public class UDUPServer implements Runnable { return fullData; } - public void addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { + public UDUPMessage addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { + UDUPMessage msg = null; + if (this.partialPackets.containsKey(transmissionID)) { try { byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData); - UDUPMessage msg = this.serializer.deserialize(allPacketData); + msg = this.serializer.deserialize(allPacketData); + msg.getContent().setReceivedTimestamp(ValueUtils.currentTime()); this.partialPackets.remove(transmissionID); System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId()); - this.udp.sendMessage(msg.getContent()); } catch (Error | Exception e) { System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest"); } @@ -140,6 +148,8 @@ public class UDUPServer implements Runnable { newTransmission.add(newPacketNo-1, packetData); this.partialPackets.put(transmissionID, newTransmission); } + + return msg; } public void close() { -- cgit v1.2.3