From bb0d92a3a0339695776797d25252815bf8921fa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Fri, 10 Jan 2020 15:53:02 +0100 Subject: Separate server from udup --- .../java/pl/edu/mimuw/cloudatlas/agent/Agent.java | 23 +++++--- .../edu/mimuw/cloudatlas/agent/modules/UDUP.java | 27 +++------ .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 26 +++++++- .../pl/edu/mimuw/cloudatlas/agent/UDUPTest.java | 69 +++++++++++++--------- 4 files changed, 88 insertions(+), 57 deletions(-) (limited to 'src') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 256fa6b..ad2a650 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -39,18 +39,15 @@ public class Agent { } } - public static HashMap initializeModules() { + public static HashMap initializeModules() throws UnknownHostException { HashMap modules = new HashMap(); modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER)); modules.put(ModuleType.RMI, new Remik()); Long freshnessPeriod = new Long(System.getProperty("freshness_period")); modules.put(ModuleType.STATE, new Stanik(freshnessPeriod)); modules.put(ModuleType.QUERY, new Qurnik()); - try { - modules.put(ModuleType.UDP, new UDUP(InetAddress.getByName("127.0.0.1"), 5988, 5000, 20000)); - } catch (UnknownHostException e) { - e.printStackTrace(); - } + UDUPServer server = new UDUPServer(InetAddress.getByName("127.0.0.1"), 5988, 2000); + modules.put(ModuleType.UDP, new UDUP(5988, 5000, 20000, null)); // TODO add modules as we implement them return modules; } @@ -89,14 +86,24 @@ public class Agent { } public static void runModulesAsThreads() { - HashMap modules = initializeModules(); + HashMap modules = null; + + try { + modules = initializeModules(); + } catch (UnknownHostException e) { + System.out.println("Module initialization failed"); + e.printStackTrace(); + return; + } + HashMap executors = initializeExecutors(modules); ArrayList executorThreads = initializeExecutorThreads(executors); - eventBus = new EventBus(executors); + Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer()); Thread eventBusThread = new Thread(eventBus); System.out.println("Initializing event bus"); eventBusThread.start(); + UDUPServerThread.start(); } private static void initZones() { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java index 8c6db8f..341d947 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java @@ -32,21 +32,19 @@ import java.util.concurrent.atomic.AtomicBoolean; // TODO wysylac tylko remotegossipgirl message // TODO update timestampow odpowiedni w tym remotegossipgirlmessage -public class UDUP extends Module implements Runnable { +public class UDUP extends Module { private UDUPClient client; private UDUPServer server; - private final AtomicBoolean running; - public UDUP(InetAddress serverAddr, - int serverPort, + public UDUP(int serverPort, int timeout, - int bufferSize) { + int bufferSize, + UDUPServer server) { super(ModuleType.UDP); - this.running = new AtomicBoolean(false); try { this.client = new UDUPClient(this, serverPort, bufferSize); - this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize); - this.running.getAndSet(true); + this.server = server; + this.server.setUDUP(this); } catch (SocketException e) { e.printStackTrace(); this.client.close(); @@ -54,17 +52,8 @@ public class UDUP extends Module implements Runnable { } } - public void run() { - System.out.println("UDP server running"); - while(this.running.get()) { - try { - this.server.acceptMessage(); - } catch (IOException | InterruptedException e) { - e.printStackTrace(); - this.running.getAndSet(false); - this.server.close(); - } - } + public UDUPServer getServer() { + return this.server; } public void handleTyped(UDUPMessage event) throws InterruptedException { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index b71a475..d6180be 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -10,22 +10,42 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicBoolean; -public class UDUPServer { +public class UDUPServer implements Runnable { private UDUP udp; private UDUPSerializer serializer; private DatagramSocket socket; private InetAddress address; private HashMap> partialPackets; private int bufSize; + private final AtomicBoolean running; - public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException { - this.udp = udp; + public UDUPServer(InetAddress addr, int port, int bufSize) throws SocketException { this.socket = new DatagramSocket(port, addr); this.address = addr; this.bufSize = bufSize; this.partialPackets = new HashMap<>(); this.serializer = new UDUPSerializer(); + this.running = new AtomicBoolean(false); + } + + public void setUDUP(UDUP udup) { + this.udp = udup; + } + + public void run() { + System.out.println("UDP server running"); + this.running.getAndSet(true); + while(this.running.get()) { + try { + this.acceptMessage(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + this.running.getAndSet(false); + this.close(); + } + } } public void acceptMessage() throws IOException, InterruptedException { diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java index 13a322b..93ed8be 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java @@ -6,12 +6,14 @@ import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; import pl.edu.mimuw.cloudatlas.agent.modules.Module; import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; import pl.edu.mimuw.cloudatlas.agent.modules.UDUP; +import pl.edu.mimuw.cloudatlas.agent.modules.UDUPServer; import pl.edu.mimuw.cloudatlas.model.AttributesMap; import pl.edu.mimuw.cloudatlas.model.PathName; import pl.edu.mimuw.cloudatlas.model.ValueContact; import pl.edu.mimuw.cloudatlas.model.ValueInt; import java.net.InetAddress; +import java.net.SocketException; import java.net.UnknownHostException; // TODO add serialization tests that target custom serializers (type collections!) @@ -22,25 +24,30 @@ public class UDUPTest { public void messageBetweenUDUPs() { UDUP udp1 = null; UDUP udp2 = null; + UDUPServer server1 = null; + UDUPServer server2 = null; UDUPMessage msg1 = null; boolean testSuccess = true; + int timeout = 5000; try { System.out.println("Starting udp1"); + server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5996, 1000); udp1 = new UDUP( - InetAddress.getByName("127.0.0.2"), - 5999, - 5000, - 1000); + 5997, + timeout, + 1000, + server1); System.out.println("Starting udp2"); + server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5996, 1000); udp2 = new UDUP( - InetAddress.getByName("127.0.0.3"), - 5999, - 5000, - 1000); + 5997, + timeout, + 1000, + server2); UDUPMessage testContent = new UDUPMessage(); testContent.setDestinationModule(ModuleType.TEST); @@ -52,13 +59,13 @@ public class UDUPTest { testContent ); - } catch (UnknownHostException e) { + } catch (UnknownHostException | SocketException e) { e.printStackTrace(); } - Thread udpThread1 = new Thread(udp1); + Thread udpThread1 = new Thread(server1); udpThread1.start(); - Thread udpThread2 = new Thread(udp2); + Thread udpThread2 = new Thread(server2); udpThread2.start(); try { @@ -89,6 +96,8 @@ public class UDUPTest { public void bigMessageBetweenUDUPs() { UDUP udp1 = null; UDUP udp2 = null; + UDUPServer server1 = null; + UDUPServer server2 = null; UDUPMessage msg1 = null; boolean testSuccess = true; int timeout = 5000; @@ -96,19 +105,21 @@ public class UDUPTest { try { System.out.println("Starting udp1"); + server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5991, 1000); udp1 = new UDUP( - InetAddress.getByName("127.0.0.2"), - 5998, + 5997, timeout, - 30); + 30, + server1); System.out.println("Starting udp2"); + server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5991, 1000); udp2 = new UDUP( - InetAddress.getByName("127.0.0.3"), - 5998, + 5997, timeout, - 30); + 30, + server2); UDUPMessage testContent = new UDUPMessage(); testContent.setDestinationModule(ModuleType.TEST); @@ -120,13 +131,13 @@ public class UDUPTest { testContent ); - } catch (UnknownHostException e) { + } catch (UnknownHostException | SocketException e) { e.printStackTrace(); } - Thread udpThread1 = new Thread(udp1); + Thread udpThread1 = new Thread(server1); udpThread1.start(); - Thread udpThread2 = new Thread(udp2); + Thread udpThread2 = new Thread(server2); udpThread2.start(); try { @@ -152,6 +163,8 @@ public class UDUPTest { public void sendMultipleMessages() { UDUP udp1 = null; UDUP udp2 = null; + UDUPServer server1 = null; + UDUPServer server2 = null; UDUPMessage msg1 = null; UDUPMessage msg2 = null; UDUPMessage msg3 = null; @@ -161,19 +174,21 @@ public class UDUPTest { try { System.out.println("Starting udp1"); + server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5997, 1000); udp1 = new UDUP( - InetAddress.getByName("127.0.0.2"), 5997, timeout, - 1000); + 1000, + server1); System.out.println("Starting udp2"); + server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5997, 1000); udp2 = new UDUP( - InetAddress.getByName("127.0.0.3"), 5997, timeout, - 1000); + 1000, + server2); UDUPMessage testContent = new UDUPMessage(); testContent.setDestinationModule(ModuleType.TEST); @@ -197,13 +212,13 @@ public class UDUPTest { testContent ); - } catch (UnknownHostException e) { + } catch (UnknownHostException | SocketException e) { e.printStackTrace(); } - Thread udpThread1 = new Thread(udp1); + Thread udpThread1 = new Thread(server1); udpThread1.start(); - Thread udpThread2 = new Thread(udp2); + Thread udpThread2 = new Thread(server2); udpThread2.start(); try { -- cgit v1.2.3 From f042953bdbe2a5e0d9e9e19d275fd45a958fe626 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Fri, 10 Jan 2020 16:09:19 +0100 Subject: Set udup config from system properties --- src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java | 15 ++++++++++----- .../java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java | 4 ---- 2 files changed, 10 insertions(+), 9 deletions(-) (limited to 'src') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index ad2a650..a178193 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent; import java.net.Inet4Address; import java.net.InetAddress; +import java.net.SocketException; import java.net.UnknownHostException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; @@ -39,15 +40,19 @@ public class Agent { } } - public static HashMap initializeModules() throws UnknownHostException { + public static HashMap initializeModules() throws UnknownHostException, SocketException { HashMap modules = new HashMap(); modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER)); modules.put(ModuleType.RMI, new Remik()); - Long freshnessPeriod = new Long(System.getProperty("freshness_period")); + Long freshnessPeriod = Long.getLong(System.getProperty("freshness_period")); modules.put(ModuleType.STATE, new Stanik(freshnessPeriod)); modules.put(ModuleType.QUERY, new Qurnik()); - UDUPServer server = new UDUPServer(InetAddress.getByName("127.0.0.1"), 5988, 2000); - modules.put(ModuleType.UDP, new UDUP(5988, 5000, 20000, null)); + + Integer port = Integer.getInteger(System.getProperty("port")); + Integer timeout = Integer.getInteger(System.getProperty("timeout")); + Integer bufsize = Integer.getInteger(System.getProperty("bufsize")); + UDUPServer server = new UDUPServer(InetAddress.getByName("127.0.0.1"), port, bufsize); + modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server)); // TODO add modules as we implement them return modules; } @@ -90,7 +95,7 @@ public class Agent { try { modules = initializeModules(); - } catch (UnknownHostException e) { + } catch (UnknownHostException | SocketException e) { System.out.println("Module initialization failed"); e.printStackTrace(); return; diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java index 341d947..e2243e1 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java @@ -23,10 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean; * due to ValueContact design */ -// TODO set server port in global config - must be the same everywhere -// TODO same with buffer size - -// TODO separate server like newapiimpl // TODO add timestamps as close to sending as possible // TODO wysylac tylko remotegossipgirl message -- cgit v1.2.3 From ad872a25f94f6297a659cf945c4e1547ed8f28d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Fri, 10 Jan 2020 16:29:49 +0100 Subject: Fix getting system properties --- src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index a178193..62cd544 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -40,17 +40,17 @@ public class Agent { } } - public static HashMap initializeModules() throws UnknownHostException, SocketException { + public static HashMap initializeModules() throws UnknownHostException, SocketException, NullPointerException { HashMap modules = new HashMap(); modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER)); modules.put(ModuleType.RMI, new Remik()); - Long freshnessPeriod = Long.getLong(System.getProperty("freshness_period")); + Long freshnessPeriod = Long.getLong("freshness_period"); modules.put(ModuleType.STATE, new Stanik(freshnessPeriod)); modules.put(ModuleType.QUERY, new Qurnik()); - Integer port = Integer.getInteger(System.getProperty("port")); - Integer timeout = Integer.getInteger(System.getProperty("timeout")); - Integer bufsize = Integer.getInteger(System.getProperty("bufsize")); + Integer port = Integer.getInteger("UDUPServer.port"); + Integer timeout = Integer.getInteger("UDUPServer.timeout"); + Integer bufsize = Integer.getInteger("UDUPServer.bufsize"); UDUPServer server = new UDUPServer(InetAddress.getByName("127.0.0.1"), port, bufsize); modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server)); // TODO add modules as we implement them -- cgit v1.2.3 From 1f87c2d319b2671bd2d61feb3a76d102059c4c52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Fri, 10 Jan 2020 19:06:26 +0100 Subject: Make UDUP handle only gossip girl messages with its timestamps --- .../agent/messages/GossipGirlMessage.java | 2 + .../agent/messages/RemoteGossipGirlMessage.java | 2 + .../cloudatlas/agent/messages/UDUPMessage.java | 10 ++-- .../edu/mimuw/cloudatlas/agent/modules/UDUP.java | 5 -- .../mimuw/cloudatlas/agent/modules/UDUPClient.java | 7 ++- .../cloudatlas/agent/modules/UDUPSerializer.java | 4 ++ .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 22 +++++--- .../pl/edu/mimuw/cloudatlas/agent/UDUPTest.java | 60 +++++++++++++++------- 8 files changed, 77 insertions(+), 35 deletions(-) (limited to 'src') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java index 508fe88..03525bb 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java @@ -19,6 +19,8 @@ public abstract class GossipGirlMessage extends AgentMessage { this.type = type; } + public GossipGirlMessage() {}; + public Type getType() { return type; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java index 0a3a868..4c223f5 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java @@ -10,6 +10,8 @@ public class RemoteGossipGirlMessage extends GossipGirlMessage { super(messageId, timestamp, type); } + public RemoteGossipGirlMessage() {}; + public void setSentTimestamp(ValueTime sentTimestamp) { this.sentTimestamp = sentTimestamp; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java index 3751b3c..b955340 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java @@ -6,15 +6,15 @@ import pl.edu.mimuw.cloudatlas.model.ValueContact; public class UDUPMessage extends AgentMessage { private ValueContact contact; - private AgentMessage content; + private RemoteGossipGirlMessage content; - public UDUPMessage(String messageId, long timestamp, ValueContact contact, AgentMessage content) { + public UDUPMessage(String messageId, long timestamp, ValueContact contact, RemoteGossipGirlMessage content) { super(messageId, ModuleType.UDP, timestamp); this.contact = contact; this.content = content; } - public UDUPMessage(String messageId, ValueContact contact, AgentMessage content) { + public UDUPMessage(String messageId, ValueContact contact, RemoteGossipGirlMessage content) { super(messageId, ModuleType.UDP); this.contact = contact; this.content = content; @@ -27,11 +27,11 @@ public class UDUPMessage extends AgentMessage { module.handleTyped(this); } - public AgentMessage getContent() { + public RemoteGossipGirlMessage getContent() { return content; } - public void setContent(AgentMessage content) { + public void setContent(RemoteGossipGirlMessage content) { this.content = content; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java index e2243e1..501c76e 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java @@ -23,11 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean; * due to ValueContact design */ -// TODO add timestamps as close to sending as possible - -// TODO wysylac tylko remotegossipgirl message -// TODO update timestampow odpowiedni w tym remotegossipgirlmessage - public class UDUP extends Module { private UDUPClient client; private UDUPServer server; diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java index 2e4f0b4..089cad2 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java @@ -1,6 +1,8 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.model.ValueTime; +import pl.edu.mimuw.cloudatlas.model.ValueUtils; import javax.xml.crypto.Data; import java.io.IOException; @@ -67,9 +69,12 @@ public class UDUPClient { public void sendMessage(UDUPMessage msg) throws IOException { int packetNo = 1; byte[] sendBuf; - byte[] dataBuf = this.serializer.serialize(msg); + byte[] dataBuf; this.lastTransmission++; + msg.getContent().setSentTimestamp(ValueUtils.currentTime()); + dataBuf = this.serializer.serialize(msg); + do { sendBuf = packSendBuffer(this.lastTransmission, packetNo, dataBuf); DatagramPacket packet = new DatagramPacket(sendBuf, 0, sendBuf.length, msg.getContact().getAddress(), this.serverPort); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java index 40c4d7c..0f7b99d 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java @@ -12,6 +12,7 @@ import java.io.ByteArrayOutputStream; import java.net.Inet4Address; import java.net.InetAddress; import java.net.UnknownHostException; +import java.rmi.Remote; import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedHashMap; @@ -138,6 +139,9 @@ public class UDUPSerializer { kryo.register(UDUPMessage.class); kryo.register(UpdateAttributesMessage.class); kryo.register(UpdateQueriesMessage.class); + kryo.register(GossipGirlMessage.class); + kryo.register(GossipGirlMessage.Type.class); + kryo.register(RemoteGossipGirlMessage.class); // modules kryo.register(TimerScheduledTask.class); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index d6180be..94882e4 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import com.google.common.primitives.Bytes; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.model.ValueUtils; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -54,14 +55,19 @@ public class UDUPServer implements Runnable { String transmissionID = packTransmissionID(transmissionNo, packet.getAddress()); int packetNo = readPacketNo(packet.getData()); byte[] packetData = trimPacketBuffer(packet.getData()); + UDUPMessage msg; if (packetNo == 1 && packet.getLength() < this.bufSize) { - UDUPMessage msg = this.serializer.deserialize(packetData); + msg = this.serializer.deserialize(packetData); + msg.getContent().setReceivedTimestamp(ValueUtils.currentTime()); System.out.println("UDP received message " + msg.getContent().getMessageId()); - sendMessageFurther(msg); } else { System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo); - this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); + msg = this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); + } + + if (msg != null) { + sendMessageFurther(msg); } } @@ -124,14 +130,16 @@ public class UDUPServer implements Runnable { return fullData; } - public void addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { + public UDUPMessage addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { + UDUPMessage msg = null; + if (this.partialPackets.containsKey(transmissionID)) { try { byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData); - UDUPMessage msg = this.serializer.deserialize(allPacketData); + msg = this.serializer.deserialize(allPacketData); + msg.getContent().setReceivedTimestamp(ValueUtils.currentTime()); this.partialPackets.remove(transmissionID); System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId()); - this.udp.sendMessage(msg.getContent()); } catch (Error | Exception e) { System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest"); } @@ -140,6 +148,8 @@ public class UDUPServer implements Runnable { newTransmission.add(newPacketNo-1, packetData); this.partialPackets.put(transmissionID, newTransmission); } + + return msg; } public void close() { diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java index 93ed8be..ac2c587 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java @@ -1,6 +1,8 @@ package pl.edu.mimuw.cloudatlas.agent; import org.junit.*; +import pl.edu.mimuw.cloudatlas.agent.messages.GossipGirlMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.RemoteGossipGirlMessage; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; import pl.edu.mimuw.cloudatlas.agent.modules.Module; @@ -49,9 +51,9 @@ public class UDUPTest { 1000, server2); - UDUPMessage testContent = new UDUPMessage(); + RemoteGossipGirlMessage testContent = + new RemoteGossipGirlMessage("singleMsgTest", 0, GossipGirlMessage.Type.NO_CO_TAM); testContent.setDestinationModule(ModuleType.TEST); - testContent.setMessageId("singleMsgTest"); msg1 = new UDUPMessage( "udup1", @@ -61,6 +63,7 @@ public class UDUPTest { } catch (UnknownHostException | SocketException e) { e.printStackTrace(); + testSuccess = false; } Thread udpThread1 = new Thread(server1); @@ -69,13 +72,12 @@ public class UDUPTest { udpThread2.start(); try { - Thread.sleep(5000); - System.out.println("Sending message"); + Thread.sleep(500); if (udp1 == null | udp2 == null) { - Assert.fail("UDPs not initialized"); + testSuccess = false; } else { udp1.handle(msg1); - Thread.sleep(10000); + Thread.sleep(timeout); } } catch (InterruptedException | Module.InvalidMessageType e) { e.printStackTrace(); @@ -100,7 +102,7 @@ public class UDUPTest { UDUPServer server2 = null; UDUPMessage msg1 = null; boolean testSuccess = true; - int timeout = 5000; + int timeout = 3000; try { System.out.println("Starting udp1"); @@ -121,9 +123,9 @@ public class UDUPTest { 30, server2); - UDUPMessage testContent = new UDUPMessage(); + RemoteGossipGirlMessage testContent = + new RemoteGossipGirlMessage("bigMsgTest", 0, GossipGirlMessage.Type.NO_CO_TAM); testContent.setDestinationModule(ModuleType.TEST); - testContent.setMessageId("bigMsgTest"); msg1 = new UDUPMessage( "udup1", @@ -133,6 +135,7 @@ public class UDUPTest { } catch (UnknownHostException | SocketException e) { e.printStackTrace(); + testSuccess = false; } Thread udpThread1 = new Thread(server1); @@ -141,8 +144,13 @@ public class UDUPTest { udpThread2.start(); try { - udp1.handle(msg1); - Thread.sleep(timeout + 1000); + Thread.sleep(500); + if (udp1 == null | udp2 == null) { + testSuccess = false; + } else { + udp1.handle(msg1); + Thread.sleep(timeout); + } } catch (InterruptedException | Module.InvalidMessageType e) { e.printStackTrace(); testSuccess = false; @@ -169,7 +177,7 @@ public class UDUPTest { UDUPMessage msg2 = null; UDUPMessage msg3 = null; boolean testSuccess = true; - int timeout = 5000; + int timeout = 3000; try { System.out.println("Starting udp1"); @@ -190,9 +198,9 @@ public class UDUPTest { 1000, server2); - UDUPMessage testContent = new UDUPMessage(); + RemoteGossipGirlMessage testContent = + new RemoteGossipGirlMessage("multipleMsgTest", 0, GossipGirlMessage.Type.NO_CO_TAM); testContent.setDestinationModule(ModuleType.TEST); - testContent.setMessageId("multipleMsgTest"); msg1 = new UDUPMessage( "udup1", @@ -214,6 +222,7 @@ public class UDUPTest { } catch (UnknownHostException | SocketException e) { e.printStackTrace(); + testSuccess = false; } Thread udpThread1 = new Thread(server1); @@ -222,12 +231,27 @@ public class UDUPTest { udpThread2.start(); try { - udp1.handle(msg1); - udp1.handle(msg2); - udp1.handle(msg3); - Thread.sleep(timeout + 2000); + Thread.sleep(500); + if (udp1 == null | udp2 == null) { + testSuccess = false; + } else { + udp1.handle(msg1); + udp1.handle(msg2); + udp1.handle(msg3); + Thread.sleep(timeout); + } } catch (InterruptedException | Module.InvalidMessageType e) { e.printStackTrace(); + testSuccess = false; + } + + udpThread1.interrupt(); + udpThread2.interrupt(); + + if (testSuccess) { + Assert.assertTrue(true); + } else { + Assert.fail(); } } } -- cgit v1.2.3