From 70a233029d805104845f20b9904e1cdb6feac921 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 6 Jan 2020 18:51:49 +0100 Subject: Add noarg constructors to messages for serialization --- .../java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java | 2 ++ .../java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java | 2 ++ .../java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java | 3 +++ .../java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java | 3 +++ .../java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java | 2 ++ .../pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java | 2 ++ .../java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java | 2 ++ .../pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java | 2 ++ .../pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java | 2 ++ .../java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java | 2 ++ .../java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java | 2 ++ .../pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java | 2 ++ src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java | 4 +--- .../edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java | 2 ++ .../pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java | 2 ++ 15 files changed, 31 insertions(+), 3 deletions(-) (limited to 'src/main/java') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java index f343e0f..c3fcb40 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java @@ -20,6 +20,8 @@ public abstract class AgentMessage { this.timestamp = System.currentTimeMillis() / 1000L; } + public AgentMessage() {} + public String getMessageId() { return messageId; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java index 63392e8..67be8e9 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java @@ -12,6 +12,8 @@ public class GetStateMessage extends StanikMessage { this.requestId = requestId; } + public GetStateMessage() {} + public ModuleType getRequestingModule() { return requestingModule; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java index 0161a37..97bef69 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.messages; import pl.edu.mimuw.cloudatlas.agent.modules.Module; import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; +import pl.edu.mimuw.cloudatlas.agent.modules.Qurnik; public abstract class QurnikMessage extends AgentMessage { public enum Type { @@ -15,6 +16,8 @@ public abstract class QurnikMessage extends AgentMessage { this.type = type; } + public QurnikMessage() {} + public Type getType() { return type; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java index b0300cb..e4bc1b6 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.messages; import pl.edu.mimuw.cloudatlas.agent.modules.Module; import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; +import pl.edu.mimuw.cloudatlas.agent.modules.Remik; public abstract class RemikMessage extends AgentMessage { public enum Type { @@ -15,6 +16,8 @@ public abstract class RemikMessage extends AgentMessage { this.type = type; } + public RemikMessage() {} + public Type getType() { return type; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java index 9330185..e7b03ba 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java @@ -12,6 +12,8 @@ public class RemoveZMIMessage extends StanikMessage { this.removalTimestamp = removalTimestamp; } + public RemoveZMIMessage() {} + public String getPathName() { return pathName; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java index 698aac7..a1fd279 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java @@ -10,6 +10,8 @@ public class RequestStateMessage extends RemikMessage { this.responseFuture = responseFuture; } + public RequestStateMessage() {} + public CompletableFuture getFuture() { return responseFuture; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java index 02b3337..b28f4b9 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java @@ -17,6 +17,8 @@ public abstract class ResponseMessage extends AgentMessage { this.requestId = requestId; } + public ResponseMessage() {} + public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType { module.handleTyped(this); } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java index 35f7819..03882a0 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java @@ -6,4 +6,6 @@ public class RunQueriesMessage extends QurnikMessage { public RunQueriesMessage(String messageId, long timestamp) { super(messageId, timestamp, Type.RUN_QUERIES); } + + public RunQueriesMessage() {} } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java index 4888484..3de2b65 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java @@ -19,6 +19,8 @@ public class SetAttributeMessage extends StanikMessage { this.updateTimestamp = updateTimestamp; } + public SetAttributeMessage() {} + public String getPathName() { return pathName; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java index 844f31c..3daa5f9 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java @@ -19,6 +19,8 @@ public abstract class StanikMessage extends AgentMessage { this.type = type; } + public StanikMessage() {} + public Type getType() { return type; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java index f7f490b..759723b 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java @@ -19,6 +19,8 @@ public class StateMessage extends ResponseMessage { this.queries = queries; } + public StateMessage() {} + public ZMI getZMI() { return zmi; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java index 8566d67..b42e181 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java @@ -23,6 +23,8 @@ public class TimerSchedulerMessage extends AgentMessage { this.task = task; } + public TimerSchedulerMessage() {} + public long getDelay() { return delay; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java index fa8d1fa..3751b3c 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java @@ -20,9 +20,7 @@ public class UDUPMessage extends AgentMessage { this.content = content; } - public UDUPMessage() { - super("", ModuleType.UDP); - } + public UDUPMessage() {} @Override public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java index 7e41631..b943384 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java @@ -13,6 +13,8 @@ public class UpdateAttributesMessage extends StanikMessage { this.attributes = attributes; } + public UpdateAttributesMessage() {} + public String getPathName() { return pathName; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java index 58ad55a..4b0b9c8 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java @@ -15,6 +15,8 @@ public class UpdateQueriesMessage extends StanikMessage { this.queries = queries; } + public UpdateQueriesMessage() {} + public Map> getQueries() { return queries; } -- cgit v1.2.3 From 76b5ad38792c93cd530b5faf59c613e83a129d19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 6 Jan 2020 18:53:06 +0100 Subject: Extend kryo registration --- .../cloudatlas/agent/modules/UDUPSerializer.java | 77 +++++++++++++++++++--- 1 file changed, 68 insertions(+), 9 deletions(-) (limited to 'src/main/java') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java index ac35265..79236ca 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java @@ -4,18 +4,15 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; -import pl.edu.mimuw.cloudatlas.model.PathName; -import pl.edu.mimuw.cloudatlas.model.ValueContact; +import pl.edu.mimuw.cloudatlas.agent.messages.*; +import pl.edu.mimuw.cloudatlas.model.*; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.net.Inet4Address; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.LinkedHashMap; /** * Serializes classes to and from byte arrays for UDP use @@ -69,15 +66,77 @@ public class UDUPSerializer { } }); - kryo.register(byte[].class); + kryo.register(ValueList.class, new Serializer() { + @Override + public void write(Kryo kryo, Output output, Object object) { + + } + + @Override + public Object read(Kryo kryo, Input input, Class type) { + return null; + } + }); + + kryo.register(ValueSet.class, new Serializer() { + @Override + public void write(Kryo kryo, Output output, Object object) { + + } + + @Override + public Object read(Kryo kryo, Input input, Class type) { + return null; + } + }); + + // model + kryo.register(Value.class); + kryo.register(ValueBoolean.class); kryo.register(ValueContact.class); - kryo.register(ModuleType.class); + kryo.register(ValueDuration.class); + kryo.register(ValueInt.class); + kryo.register(ValueNull.class); + kryo.register(ValueQuery.class); + kryo.register(ValueSet.class); + kryo.register(ValueString.class); + kryo.register(ValueTime.class); + kryo.register(ValueUtils.class); + kryo.register(ZMI.class); + + kryo.register(Attribute.class); + kryo.register(AttributesMap.class); + kryo.register(AttributesUtil.class); + kryo.register(Type.class); + kryo.register(TypeCollection.class); + kryo.register(TypePrimitive.class); + + // messages in chronological order so it's easier to keep track kryo.register(AgentMessage.class); kryo.register(GetStateMessage.class); - kryo.register(UDUPMessage.class); + kryo.register(QurnikMessage.class); + kryo.register(RemikMessage.class); + kryo.register(RemoveZMIMessage.class); + kryo.register(RequestStateMessage.class); + kryo.register(ResponseMessage.class); + kryo.register(RunQueriesMessage.class); + kryo.register(SetAttributeMessage.class); kryo.register(StanikMessage.Type.class); kryo.register(StanikMessage.class); + kryo.register(TimerSchedulerMessage.class); + kryo.register(UDUPMessage.class); + kryo.register(UpdateAttributesMessage.class); + kryo.register(UpdateQueriesMessage.class); + + // modules + kryo.register(TimerScheduledTask.class); + kryo.register(RecursiveScheduledTask.class); + + // other + kryo.register(byte[].class); + kryo.register(LinkedHashMap.class); + kryo.register(ModuleType.class); } public UDUPMessage deserialize(byte[] packetData) { -- cgit v1.2.3 From 5f02fa0e59dc84e12fae1fde61bdfa8edb5446b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 6 Jan 2020 18:58:25 +0100 Subject: Fix handling of multipart udp messages --- src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'src/main/java') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index 6807a86..fb79dc6 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -31,12 +31,11 @@ public class UDUPServer { System.out.println("UDP " + this.address + " received packet from " + packet.getAddress()); if (packet.getOffset() == 0) { - UDUPMessage msg = this.serializer.deserialize(packet.getData()); - System.out.println("UDP received message " + msg.getContent().getMessageId()); - if (packet.getLength() == this.bufSize) { this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); } else { + UDUPMessage msg = this.serializer.deserialize(packet.getData()); + System.out.println("UDP received message " + msg.getContent().getMessageId()); if (msg.getContent().getDestinationModule() == ModuleType.TEST) { System.out.println("UDP server: test message received"); } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) { -- cgit v1.2.3 From 28c905a7365a37d0874b865f513c64b31d8679f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 6 Jan 2020 20:10:50 +0100 Subject: Add custom serialization to type collection --- .../mimuw/cloudatlas/agent/modules/UDUPSerializer.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) (limited to 'src/main/java') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java index 79236ca..40c4d7c 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java @@ -12,6 +12,8 @@ import java.io.ByteArrayOutputStream; import java.net.Inet4Address; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashSet; import java.util.LinkedHashMap; /** @@ -69,24 +71,32 @@ public class UDUPSerializer { kryo.register(ValueList.class, new Serializer() { @Override public void write(Kryo kryo, Output output, Object object) { - + ValueList vl = (ValueList) object; + kryo.writeObject(output, ((TypeCollection) vl.getType()).getElementType()); + kryo.writeObject(output, vl.getValue()); } @Override public Object read(Kryo kryo, Input input, Class type) { - return null; + Type t = kryo.readObject(input, Type.class); + ArrayList list = kryo.readObject(input, ArrayList.class); + return new ValueList(list, t); } }); kryo.register(ValueSet.class, new Serializer() { @Override public void write(Kryo kryo, Output output, Object object) { - + ValueSet vs = (ValueSet) object; + kryo.writeObject(output, ((TypeCollection) vs.getType()).getElementType()); + kryo.writeObject(output, vs.getValue()); } @Override public Object read(Kryo kryo, Input input, Class type) { - return null; + Type t = kryo.readObject(input, Type.class); + HashSet set = kryo.readObject(input, HashSet.class); + return new ValueSet(set, t); } }); -- cgit v1.2.3 From 2d7fe232b7c1f2ef62e7bf2f3100adb51e9bc0d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 6 Jan 2020 22:13:48 +0100 Subject: Add segmentation handling --- .../mimuw/cloudatlas/agent/modules/UDUPClient.java | 62 ++++++++++--- .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 103 ++++++++++++++++----- 2 files changed, 130 insertions(+), 35 deletions(-) (limited to 'src/main/java') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java index 82aaeb1..d7cbc9d 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java @@ -2,8 +2,10 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import javax.xml.crypto.Data; import java.io.IOException; import java.net.*; +import java.nio.ByteBuffer; public class UDUPClient { private UDUP udp; @@ -11,6 +13,7 @@ public class UDUPClient { private int serverPort; private DatagramSocket socket; private int bufsize; + private int lastTransmission; UDUPClient(UDUP udp, int serverPort, int bufferSize) throws SocketException { this.udp = udp; @@ -18,26 +21,57 @@ public class UDUPClient { this.socket = new DatagramSocket(); this.bufsize = bufferSize; this.serializer = new UDUPSerializer(); + this.lastTransmission = 0; } - public void sendMessage(UDUPMessage msg) throws IOException { - int offset = 0; - int outputSize; + private void logSending(DatagramPacket packet, int packetNo, byte[] buf) { + System.out.print("UDP sends packet no " + packetNo + + " with realbufsize " + (bufsize - 8) + + " out of data buffer with size " + buf.length + + " to " + packet.getAddress() + ": "); + for (byte b : packet.getData()) { + System.out.print(b); + } + System.out.println(); + } - byte[] buf = this.serializer.serialize(msg); - outputSize = buf.length; + byte[] toByteArray(int val) { + return ByteBuffer.allocate(4).putInt(val).array(); + } + + private byte[] packSendBuffer(int transmissionNo, int packetNo, byte[] buf) { + byte[] sendBuf = new byte[bufsize]; + int sendBufSize = bufsize - 8; + System.arraycopy(toByteArray(transmissionNo), 0, sendBuf, 0, 4); + System.arraycopy(toByteArray(packetNo), 0, sendBuf, 4, 4); + if (packetNo*sendBufSize >= buf.length) { + System.arraycopy(buf, 0, sendBuf, 8, sendBufSize); + } else { + System.arraycopy(buf, (packetNo-1)*sendBufSize, sendBuf, 8, sendBufSize); + } + return sendBuf; + } + + boolean checkEndOfTransmission(int packetNo, int dataBufSize) { + int sendBufSize = bufsize - 8; + int dataSentSoFar = (packetNo - 1) * sendBufSize; + System.out.println("used data " + dataSentSoFar + " " + dataBufSize); + return dataSentSoFar >= dataBufSize; + } + + public void sendMessage(UDUPMessage msg) throws IOException { + int packetNo = 1; + byte[] sendBuf; + byte[] dataBuf = this.serializer.serialize(msg); + this.lastTransmission++; do { - outputSize =- bufsize; - offset += bufsize; - DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort); - System.out.println("UDP sends message: "); - for (byte b : buf) { - System.out.print(b); - } - System.out.println("to " + packet.getAddress()); + sendBuf = packSendBuffer(this.lastTransmission, packetNo, dataBuf); + DatagramPacket packet = new DatagramPacket(sendBuf, bufsize, msg.getContact().getAddress(), this.serverPort); this.socket.send(packet); - } while (outputSize > bufsize); + logSending(packet, packetNo, dataBuf); + packetNo++; + } while (!checkEndOfTransmission(packetNo, dataBuf.length)); } void close() { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index fb79dc6..c7ceca2 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -3,16 +3,20 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import com.google.common.primitives.Bytes; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.*; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; +import java.util.NoSuchElementException; public class UDUPServer { private UDUP udp; private UDUPSerializer serializer; private DatagramSocket socket; private InetAddress address; - private HashMap partialPackets; + private HashMap> partialPackets; private int bufSize; public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException { @@ -25,42 +29,99 @@ public class UDUPServer { } public void acceptMessage() throws IOException, InterruptedException { - byte[] buf = new byte[bufSize]; - DatagramPacket packet = new DatagramPacket(buf, buf.length); - this.socket.receive(packet); - System.out.println("UDP " + this.address + " received packet from " + packet.getAddress()); + DatagramPacket packet = receivePacket(); + int transmissionNo = readTransmissionNo(packet.getData()); + String transmissionID = packTransmissionID(transmissionNo, packet.getAddress()); + int packetNo = readPacketNo(packet.getData()); + byte[] packetData = trimPacketBuffer(packet.getData()); - if (packet.getOffset() == 0) { + if (packetNo == 0) { if (packet.getLength() == this.bufSize) { - this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); + this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); } else { - UDUPMessage msg = this.serializer.deserialize(packet.getData()); + UDUPMessage msg = this.serializer.deserialize(packetData); System.out.println("UDP received message " + msg.getContent().getMessageId()); - if (msg.getContent().getDestinationModule() == ModuleType.TEST) { - System.out.println("UDP server: test message received"); - } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) { - this.udp.sendMessage(msg.getContent()); - } + sendMessageFurther(msg); } } else { - this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); + this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); } } - public void addPartialMessageAndCheckSerialization(InetAddress senderAddress, byte[] packetData) { - if (this.partialPackets.containsKey(senderAddress)) { - byte[] previousPacketData = this.partialPackets.get(senderAddress); - byte[] allPacketData = Bytes.concat(previousPacketData, packetData); + private DatagramPacket receivePacket() throws IOException { + byte[] buf = new byte[bufSize]; + DatagramPacket packet = new DatagramPacket(buf, buf.length); + this.socket.receive(packet); + System.out.println("UDP " + this.address + " received packet from " + packet.getAddress()); + return packet; + } + + private void sendMessageFurther(UDUPMessage msg) throws InterruptedException { + if (msg.getContent().getDestinationModule() == ModuleType.TEST) { + System.out.println("UDP server: test message received"); + } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) { + this.udp.sendMessage(msg.getContent()); + } + } + + private int readTransmissionNo(byte[] packetData) throws IOException { + ByteArrayInputStream in = new ByteArrayInputStream(packetData, 0, 4); + byte[] byteBuf = new byte[4]; + in.read(byteBuf); + return ByteBuffer.wrap(byteBuf).getInt(); + } + + private int readPacketNo(byte[] packetData) throws IOException { + ByteArrayInputStream in = new ByteArrayInputStream(packetData,4, 4); + byte[] byteBuf = new byte[4]; + in.read(byteBuf); + return ByteBuffer.wrap(byteBuf).getInt(); + } + + private byte[] trimPacketBuffer(byte[] packetData) { + int newPacketDataSize = packetData.length - 8; + byte[] newPacketData = new byte[newPacketDataSize]; + System.arraycopy(packetData, 8, newPacketData, 0, newPacketDataSize); + return newPacketData; + } + + private String packTransmissionID(int transmissionNo, InetAddress contactAddr) { + return contactAddr.getHostAddress() + ":" + transmissionNo; + } + + private byte[] concatPacketData(String transmissionID, int newPacketNo, byte[] newPacketData) { + ArrayList previousPacketData = this.partialPackets.get(transmissionID); + byte[] fullData = new byte[0]; + + previousPacketData.add(newPacketNo, newPacketData); + this.partialPackets.put(transmissionID, previousPacketData); + + if (previousPacketData.contains(null)) { + throw new NoSuchElementException("Packet is not full"); + } else { + for (byte[] prevData : previousPacketData) { + fullData = Bytes.concat(fullData, prevData); + } + } + + return fullData; + } + + public void addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { + if (this.partialPackets.containsKey(transmissionID)) { try { + byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData); UDUPMessage msg = this.serializer.deserialize(allPacketData); this.udp.sendMessage(msg.getContent()); - this.partialPackets.remove(senderAddress); + this.partialPackets.remove(transmissionID); + System.out.println("Kryo put together whole transmission"); } catch (Error | Exception e) { System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest"); - this.partialPackets.put(senderAddress, allPacketData); } } else { - this.partialPackets.put(senderAddress, packetData); + ArrayList newTransmission = new ArrayList(); + newTransmission.add(packetData); + this.partialPackets.put(transmissionID, newTransmission); } } -- cgit v1.2.3 From 90d3d2e3e1e116bbb288d78e9c6c996a7f1e0270 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Fri, 10 Jan 2020 15:26:52 +0100 Subject: Fix segmentation --- .../pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java | 6 ++++++ .../mimuw/cloudatlas/agent/modules/UDUPClient.java | 19 ++++++++++++------- .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 21 +++++++++------------ 3 files changed, 27 insertions(+), 19 deletions(-) (limited to 'src/main/java') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java index e616c93..8c6db8f 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java @@ -26,6 +26,12 @@ import java.util.concurrent.atomic.AtomicBoolean; // TODO set server port in global config - must be the same everywhere // TODO same with buffer size +// TODO separate server like newapiimpl +// TODO add timestamps as close to sending as possible + +// TODO wysylac tylko remotegossipgirl message +// TODO update timestampow odpowiedni w tym remotegossipgirlmessage + public class UDUP extends Module implements Runnable { private UDUPClient client; private UDUPServer server; diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java index d7cbc9d..2e4f0b4 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java @@ -26,7 +26,7 @@ public class UDUPClient { private void logSending(DatagramPacket packet, int packetNo, byte[] buf) { System.out.print("UDP sends packet no " + packetNo + - " with realbufsize " + (bufsize - 8) + + " with real bufsize " + (bufsize - 8) + " out of data buffer with size " + buf.length + " to " + packet.getAddress() + ": "); for (byte b : packet.getData()) { @@ -40,22 +40,27 @@ public class UDUPClient { } private byte[] packSendBuffer(int transmissionNo, int packetNo, byte[] buf) { - byte[] sendBuf = new byte[bufsize]; + byte[] sendBuf; int sendBufSize = bufsize - 8; - System.arraycopy(toByteArray(transmissionNo), 0, sendBuf, 0, 4); - System.arraycopy(toByteArray(packetNo), 0, sendBuf, 4, 4); + if (packetNo*sendBufSize >= buf.length) { - System.arraycopy(buf, 0, sendBuf, 8, sendBufSize); + int copyLength = buf.length - (packetNo-1)*sendBufSize; + sendBuf = new byte[copyLength + 8]; + System.arraycopy(buf, (packetNo-1)*sendBufSize, sendBuf, 8, copyLength); } else { + sendBuf = new byte[bufsize]; System.arraycopy(buf, (packetNo-1)*sendBufSize, sendBuf, 8, sendBufSize); } + + System.arraycopy(toByteArray(transmissionNo), 0, sendBuf, 0, 4); + System.arraycopy(toByteArray(packetNo), 0, sendBuf, 4, 4); + return sendBuf; } boolean checkEndOfTransmission(int packetNo, int dataBufSize) { int sendBufSize = bufsize - 8; int dataSentSoFar = (packetNo - 1) * sendBufSize; - System.out.println("used data " + dataSentSoFar + " " + dataBufSize); return dataSentSoFar >= dataBufSize; } @@ -67,7 +72,7 @@ public class UDUPClient { do { sendBuf = packSendBuffer(this.lastTransmission, packetNo, dataBuf); - DatagramPacket packet = new DatagramPacket(sendBuf, bufsize, msg.getContact().getAddress(), this.serverPort); + DatagramPacket packet = new DatagramPacket(sendBuf, 0, sendBuf.length, msg.getContact().getAddress(), this.serverPort); this.socket.send(packet); logSending(packet, packetNo, dataBuf); packetNo++; diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index c7ceca2..b71a475 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -35,15 +35,12 @@ public class UDUPServer { int packetNo = readPacketNo(packet.getData()); byte[] packetData = trimPacketBuffer(packet.getData()); - if (packetNo == 0) { - if (packet.getLength() == this.bufSize) { - this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); - } else { - UDUPMessage msg = this.serializer.deserialize(packetData); - System.out.println("UDP received message " + msg.getContent().getMessageId()); - sendMessageFurther(msg); - } + if (packetNo == 1 && packet.getLength() < this.bufSize) { + UDUPMessage msg = this.serializer.deserialize(packetData); + System.out.println("UDP received message " + msg.getContent().getMessageId()); + sendMessageFurther(msg); } else { + System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo); this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); } } @@ -93,7 +90,7 @@ public class UDUPServer { ArrayList previousPacketData = this.partialPackets.get(transmissionID); byte[] fullData = new byte[0]; - previousPacketData.add(newPacketNo, newPacketData); + previousPacketData.add(newPacketNo - 1, newPacketData); this.partialPackets.put(transmissionID, previousPacketData); if (previousPacketData.contains(null)) { @@ -112,15 +109,15 @@ public class UDUPServer { try { byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData); UDUPMessage msg = this.serializer.deserialize(allPacketData); - this.udp.sendMessage(msg.getContent()); this.partialPackets.remove(transmissionID); - System.out.println("Kryo put together whole transmission"); + System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId()); + this.udp.sendMessage(msg.getContent()); } catch (Error | Exception e) { System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest"); } } else { ArrayList newTransmission = new ArrayList(); - newTransmission.add(packetData); + newTransmission.add(newPacketNo-1, packetData); this.partialPackets.put(transmissionID, newTransmission); } } -- cgit v1.2.3