m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java6
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java19
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java21
3 files changed, 27 insertions, 19 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
index e616c93..8c6db8f 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
@@ -26,6 +26,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
// TODO set server port in global config - must be the same everywhere
// TODO same with buffer size
+// TODO separate server like newapiimpl
+// TODO add timestamps as close to sending as possible
+
+// TODO wysylac tylko remotegossipgirl message
+// TODO update timestampow odpowiedni w tym remotegossipgirlmessage
+
public class UDUP extends Module implements Runnable {
private UDUPClient client;
private UDUPServer server;
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
index d7cbc9d..2e4f0b4 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
@@ -26,7 +26,7 @@ public class UDUPClient {
private void logSending(DatagramPacket packet, int packetNo, byte[] buf) {
System.out.print("UDP sends packet no " + packetNo +
- " with realbufsize " + (bufsize - 8) +
+ " with real bufsize " + (bufsize - 8) +
" out of data buffer with size " + buf.length +
" to " + packet.getAddress() + ": ");
for (byte b : packet.getData()) {
@@ -40,22 +40,27 @@ public class UDUPClient {
}
private byte[] packSendBuffer(int transmissionNo, int packetNo, byte[] buf) {
- byte[] sendBuf = new byte[bufsize];
+ byte[] sendBuf;
int sendBufSize = bufsize - 8;
- System.arraycopy(toByteArray(transmissionNo), 0, sendBuf, 0, 4);
- System.arraycopy(toByteArray(packetNo), 0, sendBuf, 4, 4);
+
if (packetNo*sendBufSize >= buf.length) {
- System.arraycopy(buf, 0, sendBuf, 8, sendBufSize);
+ int copyLength = buf.length - (packetNo-1)*sendBufSize;
+ sendBuf = new byte[copyLength + 8];
+ System.arraycopy(buf, (packetNo-1)*sendBufSize, sendBuf, 8, copyLength);
} else {
+ sendBuf = new byte[bufsize];
System.arraycopy(buf, (packetNo-1)*sendBufSize, sendBuf, 8, sendBufSize);
}
+
+ System.arraycopy(toByteArray(transmissionNo), 0, sendBuf, 0, 4);
+ System.arraycopy(toByteArray(packetNo), 0, sendBuf, 4, 4);
+
return sendBuf;
}
boolean checkEndOfTransmission(int packetNo, int dataBufSize) {
int sendBufSize = bufsize - 8;
int dataSentSoFar = (packetNo - 1) * sendBufSize;
- System.out.println("used data " + dataSentSoFar + " " + dataBufSize);
return dataSentSoFar >= dataBufSize;
}
@@ -67,7 +72,7 @@ public class UDUPClient {
do {
sendBuf = packSendBuffer(this.lastTransmission, packetNo, dataBuf);
- DatagramPacket packet = new DatagramPacket(sendBuf, bufsize, msg.getContact().getAddress(), this.serverPort);
+ DatagramPacket packet = new DatagramPacket(sendBuf, 0, sendBuf.length, msg.getContact().getAddress(), this.serverPort);
this.socket.send(packet);
logSending(packet, packetNo, dataBuf);
packetNo++;
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index c7ceca2..b71a475 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -35,15 +35,12 @@ public class UDUPServer {
int packetNo = readPacketNo(packet.getData());
byte[] packetData = trimPacketBuffer(packet.getData());
- if (packetNo == 0) {
- if (packet.getLength() == this.bufSize) {
- this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData);
- } else {
- UDUPMessage msg = this.serializer.deserialize(packetData);
- System.out.println("UDP received message " + msg.getContent().getMessageId());
- sendMessageFurther(msg);
- }
+ if (packetNo == 1 && packet.getLength() < this.bufSize) {
+ UDUPMessage msg = this.serializer.deserialize(packetData);
+ System.out.println("UDP received message " + msg.getContent().getMessageId());
+ sendMessageFurther(msg);
} else {
+ System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo);
this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData);
}
}
@@ -93,7 +90,7 @@ public class UDUPServer {
ArrayList<byte[]> previousPacketData = this.partialPackets.get(transmissionID);
byte[] fullData = new byte[0];
- previousPacketData.add(newPacketNo, newPacketData);
+ previousPacketData.add(newPacketNo - 1, newPacketData);
this.partialPackets.put(transmissionID, previousPacketData);
if (previousPacketData.contains(null)) {
@@ -112,15 +109,15 @@ public class UDUPServer {
try {
byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData);
UDUPMessage msg = this.serializer.deserialize(allPacketData);
- this.udp.sendMessage(msg.getContent());
this.partialPackets.remove(transmissionID);
- System.out.println("Kryo put together whole transmission");
+ System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId());
+ this.udp.sendMessage(msg.getContent());
} catch (Error | Exception e) {
System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest");
}
} else {
ArrayList<byte[]> newTransmission = new ArrayList<byte[]>();
- newTransmission.add(packetData);
+ newTransmission.add(newPacketNo-1, packetData);
this.partialPackets.put(transmissionID, newTransmission);
}
}