From 90d3d2e3e1e116bbb288d78e9c6c996a7f1e0270 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Fri, 10 Jan 2020 15:26:52 +0100 Subject: Fix segmentation --- .../pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java | 6 ++++++ .../mimuw/cloudatlas/agent/modules/UDUPClient.java | 19 ++++++++++++------- .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 21 +++++++++------------ 3 files changed, 27 insertions(+), 19 deletions(-) (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java index e616c93..8c6db8f 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java @@ -26,6 +26,12 @@ import java.util.concurrent.atomic.AtomicBoolean; // TODO set server port in global config - must be the same everywhere // TODO same with buffer size +// TODO separate server like newapiimpl +// TODO add timestamps as close to sending as possible + +// TODO wysylac tylko remotegossipgirl message +// TODO update timestampow odpowiedni w tym remotegossipgirlmessage + public class UDUP extends Module implements Runnable { private UDUPClient client; private UDUPServer server; diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java index d7cbc9d..2e4f0b4 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java @@ -26,7 +26,7 @@ public class UDUPClient { private void logSending(DatagramPacket packet, int packetNo, byte[] buf) { System.out.print("UDP sends packet no " + packetNo + - " with realbufsize " + (bufsize - 8) + + " with real bufsize " + (bufsize - 8) + " out of data buffer with size " + buf.length + " to " + packet.getAddress() + ": "); for (byte b : packet.getData()) { @@ -40,22 +40,27 @@ public class UDUPClient { } private byte[] packSendBuffer(int transmissionNo, int packetNo, byte[] buf) { - byte[] sendBuf = new byte[bufsize]; + byte[] sendBuf; int sendBufSize = bufsize - 8; - System.arraycopy(toByteArray(transmissionNo), 0, sendBuf, 0, 4); - System.arraycopy(toByteArray(packetNo), 0, sendBuf, 4, 4); + if (packetNo*sendBufSize >= buf.length) { - System.arraycopy(buf, 0, sendBuf, 8, sendBufSize); + int copyLength = buf.length - (packetNo-1)*sendBufSize; + sendBuf = new byte[copyLength + 8]; + System.arraycopy(buf, (packetNo-1)*sendBufSize, sendBuf, 8, copyLength); } else { + sendBuf = new byte[bufsize]; System.arraycopy(buf, (packetNo-1)*sendBufSize, sendBuf, 8, sendBufSize); } + + System.arraycopy(toByteArray(transmissionNo), 0, sendBuf, 0, 4); + System.arraycopy(toByteArray(packetNo), 0, sendBuf, 4, 4); + return sendBuf; } boolean checkEndOfTransmission(int packetNo, int dataBufSize) { int sendBufSize = bufsize - 8; int dataSentSoFar = (packetNo - 1) * sendBufSize; - System.out.println("used data " + dataSentSoFar + " " + dataBufSize); return dataSentSoFar >= dataBufSize; } @@ -67,7 +72,7 @@ public class UDUPClient { do { sendBuf = packSendBuffer(this.lastTransmission, packetNo, dataBuf); - DatagramPacket packet = new DatagramPacket(sendBuf, bufsize, msg.getContact().getAddress(), this.serverPort); + DatagramPacket packet = new DatagramPacket(sendBuf, 0, sendBuf.length, msg.getContact().getAddress(), this.serverPort); this.socket.send(packet); logSending(packet, packetNo, dataBuf); packetNo++; diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index c7ceca2..b71a475 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -35,15 +35,12 @@ public class UDUPServer { int packetNo = readPacketNo(packet.getData()); byte[] packetData = trimPacketBuffer(packet.getData()); - if (packetNo == 0) { - if (packet.getLength() == this.bufSize) { - this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); - } else { - UDUPMessage msg = this.serializer.deserialize(packetData); - System.out.println("UDP received message " + msg.getContent().getMessageId()); - sendMessageFurther(msg); - } + if (packetNo == 1 && packet.getLength() < this.bufSize) { + UDUPMessage msg = this.serializer.deserialize(packetData); + System.out.println("UDP received message " + msg.getContent().getMessageId()); + sendMessageFurther(msg); } else { + System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo); this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); } } @@ -93,7 +90,7 @@ public class UDUPServer { ArrayList previousPacketData = this.partialPackets.get(transmissionID); byte[] fullData = new byte[0]; - previousPacketData.add(newPacketNo, newPacketData); + previousPacketData.add(newPacketNo - 1, newPacketData); this.partialPackets.put(transmissionID, previousPacketData); if (previousPacketData.contains(null)) { @@ -112,15 +109,15 @@ public class UDUPServer { try { byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData); UDUPMessage msg = this.serializer.deserialize(allPacketData); - this.udp.sendMessage(msg.getContent()); this.partialPackets.remove(transmissionID); - System.out.println("Kryo put together whole transmission"); + System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId()); + this.udp.sendMessage(msg.getContent()); } catch (Error | Exception e) { System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest"); } } else { ArrayList newTransmission = new ArrayList(); - newTransmission.add(packetData); + newTransmission.add(newPacketNo-1, packetData); this.partialPackets.put(transmissionID, newTransmission); } } -- cgit v1.2.3