m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java3
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java3
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java4
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java6
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java67
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java87
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java99
-rw-r--r--src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java184
20 files changed, 359 insertions, 118 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java
index f343e0f..c3fcb40 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java
@@ -20,6 +20,8 @@ public abstract class AgentMessage {
this.timestamp = System.currentTimeMillis() / 1000L;
}
+ public AgentMessage() {}
+
public String getMessageId() {
return messageId;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java
index 63392e8..67be8e9 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java
@@ -12,6 +12,8 @@ public class GetStateMessage extends StanikMessage {
this.requestId = requestId;
}
+ public GetStateMessage() {}
+
public ModuleType getRequestingModule() {
return requestingModule;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java
index 0161a37..97bef69 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java
@@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.messages;
import pl.edu.mimuw.cloudatlas.agent.modules.Module;
import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
+import pl.edu.mimuw.cloudatlas.agent.modules.Qurnik;
public abstract class QurnikMessage extends AgentMessage {
public enum Type {
@@ -15,6 +16,8 @@ public abstract class QurnikMessage extends AgentMessage {
this.type = type;
}
+ public QurnikMessage() {}
+
public Type getType() {
return type;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java
index b0300cb..e4bc1b6 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java
@@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.messages;
import pl.edu.mimuw.cloudatlas.agent.modules.Module;
import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
+import pl.edu.mimuw.cloudatlas.agent.modules.Remik;
public abstract class RemikMessage extends AgentMessage {
public enum Type {
@@ -15,6 +16,8 @@ public abstract class RemikMessage extends AgentMessage {
this.type = type;
}
+ public RemikMessage() {}
+
public Type getType() {
return type;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java
index 9330185..e7b03ba 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java
@@ -12,6 +12,8 @@ public class RemoveZMIMessage extends StanikMessage {
this.removalTimestamp = removalTimestamp;
}
+ public RemoveZMIMessage() {}
+
public String getPathName() {
return pathName;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java
index 698aac7..a1fd279 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java
@@ -10,6 +10,8 @@ public class RequestStateMessage extends RemikMessage {
this.responseFuture = responseFuture;
}
+ public RequestStateMessage() {}
+
public CompletableFuture<ResponseMessage> getFuture() {
return responseFuture;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java
index 02b3337..b28f4b9 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java
@@ -17,6 +17,8 @@ public abstract class ResponseMessage extends AgentMessage {
this.requestId = requestId;
}
+ public ResponseMessage() {}
+
public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType {
module.handleTyped(this);
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java
index 35f7819..03882a0 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java
@@ -6,4 +6,6 @@ public class RunQueriesMessage extends QurnikMessage {
public RunQueriesMessage(String messageId, long timestamp) {
super(messageId, timestamp, Type.RUN_QUERIES);
}
+
+ public RunQueriesMessage() {}
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java
index 4888484..3de2b65 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java
@@ -19,6 +19,8 @@ public class SetAttributeMessage extends StanikMessage {
this.updateTimestamp = updateTimestamp;
}
+ public SetAttributeMessage() {}
+
public String getPathName() {
return pathName;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java
index 844f31c..3daa5f9 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java
@@ -19,6 +19,8 @@ public abstract class StanikMessage extends AgentMessage {
this.type = type;
}
+ public StanikMessage() {}
+
public Type getType() {
return type;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java
index f7f490b..759723b 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java
@@ -19,6 +19,8 @@ public class StateMessage extends ResponseMessage {
this.queries = queries;
}
+ public StateMessage() {}
+
public ZMI getZMI() {
return zmi;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java
index 8566d67..b42e181 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java
@@ -23,6 +23,8 @@ public class TimerSchedulerMessage extends AgentMessage {
this.task = task;
}
+ public TimerSchedulerMessage() {}
+
public long getDelay() {
return delay;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
index fa8d1fa..3751b3c 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
@@ -20,9 +20,7 @@ public class UDUPMessage extends AgentMessage {
this.content = content;
}
- public UDUPMessage() {
- super("", ModuleType.UDP);
- }
+ public UDUPMessage() {}
@Override
public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java
index 7e41631..b943384 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java
@@ -13,6 +13,8 @@ public class UpdateAttributesMessage extends StanikMessage {
this.attributes = attributes;
}
+ public UpdateAttributesMessage() {}
+
public String getPathName() {
return pathName;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java
index 58ad55a..4b0b9c8 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java
@@ -15,6 +15,8 @@ public class UpdateQueriesMessage extends StanikMessage {
this.queries = queries;
}
+ public UpdateQueriesMessage() {}
+
public Map<Attribute, Entry<ValueQuery, ValueTime>> getQueries() {
return queries;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
index e616c93..8c6db8f 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
@@ -26,6 +26,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
// TODO set server port in global config - must be the same everywhere
// TODO same with buffer size
+// TODO separate server like newapiimpl
+// TODO add timestamps as close to sending as possible
+
+// TODO wysylac tylko remotegossipgirl message
+// TODO update timestampow odpowiedni w tym remotegossipgirlmessage
+
public class UDUP extends Module implements Runnable {
private UDUPClient client;
private UDUPServer server;
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
index 82aaeb1..2e4f0b4 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
@@ -2,8 +2,10 @@ package pl.edu.mimuw.cloudatlas.agent.modules;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+import javax.xml.crypto.Data;
import java.io.IOException;
import java.net.*;
+import java.nio.ByteBuffer;
public class UDUPClient {
private UDUP udp;
@@ -11,6 +13,7 @@ public class UDUPClient {
private int serverPort;
private DatagramSocket socket;
private int bufsize;
+ private int lastTransmission;
UDUPClient(UDUP udp, int serverPort, int bufferSize) throws SocketException {
this.udp = udp;
@@ -18,26 +21,62 @@ public class UDUPClient {
this.socket = new DatagramSocket();
this.bufsize = bufferSize;
this.serializer = new UDUPSerializer();
+ this.lastTransmission = 0;
}
- public void sendMessage(UDUPMessage msg) throws IOException {
- int offset = 0;
- int outputSize;
+ private void logSending(DatagramPacket packet, int packetNo, byte[] buf) {
+ System.out.print("UDP sends packet no " + packetNo +
+ " with real bufsize " + (bufsize - 8) +
+ " out of data buffer with size " + buf.length +
+ " to " + packet.getAddress() + ": ");
+ for (byte b : packet.getData()) {
+ System.out.print(b);
+ }
+ System.out.println();
+ }
+
+ byte[] toByteArray(int val) {
+ return ByteBuffer.allocate(4).putInt(val).array();
+ }
+
+ private byte[] packSendBuffer(int transmissionNo, int packetNo, byte[] buf) {
+ byte[] sendBuf;
+ int sendBufSize = bufsize - 8;
+
+ if (packetNo*sendBufSize >= buf.length) {
+ int copyLength = buf.length - (packetNo-1)*sendBufSize;
+ sendBuf = new byte[copyLength + 8];
+ System.arraycopy(buf, (packetNo-1)*sendBufSize, sendBuf, 8, copyLength);
+ } else {
+ sendBuf = new byte[bufsize];
+ System.arraycopy(buf, (packetNo-1)*sendBufSize, sendBuf, 8, sendBufSize);
+ }
- byte[] buf = this.serializer.serialize(msg);
- outputSize = buf.length;
+ System.arraycopy(toByteArray(transmissionNo), 0, sendBuf, 0, 4);
+ System.arraycopy(toByteArray(packetNo), 0, sendBuf, 4, 4);
+
+ return sendBuf;
+ }
+
+ boolean checkEndOfTransmission(int packetNo, int dataBufSize) {
+ int sendBufSize = bufsize - 8;
+ int dataSentSoFar = (packetNo - 1) * sendBufSize;
+ return dataSentSoFar >= dataBufSize;
+ }
+
+ public void sendMessage(UDUPMessage msg) throws IOException {
+ int packetNo = 1;
+ byte[] sendBuf;
+ byte[] dataBuf = this.serializer.serialize(msg);
+ this.lastTransmission++;
do {
- outputSize =- bufsize;
- offset += bufsize;
- DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort);
- System.out.println("UDP sends message: ");
- for (byte b : buf) {
- System.out.print(b);
- }
- System.out.println("to " + packet.getAddress());
+ sendBuf = packSendBuffer(this.lastTransmission, packetNo, dataBuf);
+ DatagramPacket packet = new DatagramPacket(sendBuf, 0, sendBuf.length, msg.getContact().getAddress(), this.serverPort);
this.socket.send(packet);
- } while (outputSize > bufsize);
+ logSending(packet, packetNo, dataBuf);
+ packetNo++;
+ } while (!checkEndOfTransmission(packetNo, dataBuf.length));
}
void close() {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
index ac35265..40c4d7c 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
@@ -4,18 +4,17 @@ import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
-import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
-import pl.edu.mimuw.cloudatlas.model.PathName;
-import pl.edu.mimuw.cloudatlas.model.ValueContact;
+import pl.edu.mimuw.cloudatlas.agent.messages.*;
+import pl.edu.mimuw.cloudatlas.model.*;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
/**
* Serializes classes to and from byte arrays for UDP use
@@ -69,15 +68,85 @@ public class UDUPSerializer {
}
});
- kryo.register(byte[].class);
+ kryo.register(ValueList.class, new Serializer() {
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ ValueList vl = (ValueList) object;
+ kryo.writeObject(output, ((TypeCollection) vl.getType()).getElementType());
+ kryo.writeObject(output, vl.getValue());
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
+ Type t = kryo.readObject(input, Type.class);
+ ArrayList list = kryo.readObject(input, ArrayList.class);
+ return new ValueList(list, t);
+ }
+ });
+
+ kryo.register(ValueSet.class, new Serializer() {
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ ValueSet vs = (ValueSet) object;
+ kryo.writeObject(output, ((TypeCollection) vs.getType()).getElementType());
+ kryo.writeObject(output, vs.getValue());
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
+ Type t = kryo.readObject(input, Type.class);
+ HashSet set = kryo.readObject(input, HashSet.class);
+ return new ValueSet(set, t);
+ }
+ });
+
+ // model
+ kryo.register(Value.class);
+ kryo.register(ValueBoolean.class);
kryo.register(ValueContact.class);
- kryo.register(ModuleType.class);
+ kryo.register(ValueDuration.class);
+ kryo.register(ValueInt.class);
+ kryo.register(ValueNull.class);
+ kryo.register(ValueQuery.class);
+ kryo.register(ValueSet.class);
+ kryo.register(ValueString.class);
+ kryo.register(ValueTime.class);
+ kryo.register(ValueUtils.class);
+ kryo.register(ZMI.class);
+
+ kryo.register(Attribute.class);
+ kryo.register(AttributesMap.class);
+ kryo.register(AttributesUtil.class);
+ kryo.register(Type.class);
+ kryo.register(TypeCollection.class);
+ kryo.register(TypePrimitive.class);
+
+ // messages in chronological order so it's easier to keep track
kryo.register(AgentMessage.class);
kryo.register(GetStateMessage.class);
- kryo.register(UDUPMessage.class);
+ kryo.register(QurnikMessage.class);
+ kryo.register(RemikMessage.class);
+ kryo.register(RemoveZMIMessage.class);
+ kryo.register(RequestStateMessage.class);
+ kryo.register(ResponseMessage.class);
+ kryo.register(RunQueriesMessage.class);
+ kryo.register(SetAttributeMessage.class);
kryo.register(StanikMessage.Type.class);
kryo.register(StanikMessage.class);
+ kryo.register(TimerSchedulerMessage.class);
+ kryo.register(UDUPMessage.class);
+ kryo.register(UpdateAttributesMessage.class);
+ kryo.register(UpdateQueriesMessage.class);
+
+ // modules
+ kryo.register(TimerScheduledTask.class);
+ kryo.register(RecursiveScheduledTask.class);
+
+ // other
+ kryo.register(byte[].class);
+ kryo.register(LinkedHashMap.class);
+ kryo.register(ModuleType.class);
}
public UDUPMessage deserialize(byte[] packetData) {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index 6807a86..b71a475 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -3,16 +3,20 @@ package pl.edu.mimuw.cloudatlas.agent.modules;
import com.google.common.primitives.Bytes;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.NoSuchElementException;
public class UDUPServer {
private UDUP udp;
private UDUPSerializer serializer;
private DatagramSocket socket;
private InetAddress address;
- private HashMap<InetAddress, byte[]> partialPackets;
+ private HashMap<String, ArrayList<byte[]>> partialPackets;
private int bufSize;
public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException {
@@ -25,43 +29,96 @@ public class UDUPServer {
}
public void acceptMessage() throws IOException, InterruptedException {
+ DatagramPacket packet = receivePacket();
+ int transmissionNo = readTransmissionNo(packet.getData());
+ String transmissionID = packTransmissionID(transmissionNo, packet.getAddress());
+ int packetNo = readPacketNo(packet.getData());
+ byte[] packetData = trimPacketBuffer(packet.getData());
+
+ if (packetNo == 1 && packet.getLength() < this.bufSize) {
+ UDUPMessage msg = this.serializer.deserialize(packetData);
+ System.out.println("UDP received message " + msg.getContent().getMessageId());
+ sendMessageFurther(msg);
+ } else {
+ System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo);
+ this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData);
+ }
+ }
+
+ private DatagramPacket receivePacket() throws IOException {
byte[] buf = new byte[bufSize];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
this.socket.receive(packet);
System.out.println("UDP " + this.address + " received packet from " + packet.getAddress());
+ return packet;
+ }
- if (packet.getOffset() == 0) {
- UDUPMessage msg = this.serializer.deserialize(packet.getData());
- System.out.println("UDP received message " + msg.getContent().getMessageId());
+ private void sendMessageFurther(UDUPMessage msg) throws InterruptedException {
+ if (msg.getContent().getDestinationModule() == ModuleType.TEST) {
+ System.out.println("UDP server: test message received");
+ } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) {
+ this.udp.sendMessage(msg.getContent());
+ }
+ }
- if (packet.getLength() == this.bufSize) {
- this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
- } else {
- if (msg.getContent().getDestinationModule() == ModuleType.TEST) {
- System.out.println("UDP server: test message received");
- } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) {
- this.udp.sendMessage(msg.getContent());
- }
- }
+ private int readTransmissionNo(byte[] packetData) throws IOException {
+ ByteArrayInputStream in = new ByteArrayInputStream(packetData, 0, 4);
+ byte[] byteBuf = new byte[4];
+ in.read(byteBuf);
+ return ByteBuffer.wrap(byteBuf).getInt();
+ }
+
+ private int readPacketNo(byte[] packetData) throws IOException {
+ ByteArrayInputStream in = new ByteArrayInputStream(packetData,4, 4);
+ byte[] byteBuf = new byte[4];
+ in.read(byteBuf);
+ return ByteBuffer.wrap(byteBuf).getInt();
+ }
+
+ private byte[] trimPacketBuffer(byte[] packetData) {
+ int newPacketDataSize = packetData.length - 8;
+ byte[] newPacketData = new byte[newPacketDataSize];
+ System.arraycopy(packetData, 8, newPacketData, 0, newPacketDataSize);
+ return newPacketData;
+ }
+
+ private String packTransmissionID(int transmissionNo, InetAddress contactAddr) {
+ return contactAddr.getHostAddress() + ":" + transmissionNo;
+ }
+
+ private byte[] concatPacketData(String transmissionID, int newPacketNo, byte[] newPacketData) {
+ ArrayList<byte[]> previousPacketData = this.partialPackets.get(transmissionID);
+ byte[] fullData = new byte[0];
+
+ previousPacketData.add(newPacketNo - 1, newPacketData);
+ this.partialPackets.put(transmissionID, previousPacketData);
+
+ if (previousPacketData.contains(null)) {
+ throw new NoSuchElementException("Packet is not full");
} else {
- this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
+ for (byte[] prevData : previousPacketData) {
+ fullData = Bytes.concat(fullData, prevData);
+ }
}
+
+ return fullData;
}
- public void addPartialMessageAndCheckSerialization(InetAddress senderAddress, byte[] packetData) {
- if (this.partialPackets.containsKey(senderAddress)) {
- byte[] previousPacketData = this.partialPackets.get(senderAddress);
- byte[] allPacketData = Bytes.concat(previousPacketData, packetData);
+ public void addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) {
+ if (this.partialPackets.containsKey(transmissionID)) {
try {
+ byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData);
UDUPMessage msg = this.serializer.deserialize(allPacketData);
+ this.partialPackets.remove(transmissionID);
+ System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId());
this.udp.sendMessage(msg.getContent());
- this.partialPackets.remove(senderAddress);
} catch (Error | Exception e) {
System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest");
- this.partialPackets.put(senderAddress, allPacketData);
}
} else {
- this.partialPackets.put(senderAddress, packetData);
+ ArrayList<byte[]> newTransmission = new ArrayList<byte[]>();
+ newTransmission.add(newPacketNo-1, packetData);
+ this.partialPackets.put(transmissionID, newTransmission);
}
}
diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java
index e4f601a..13a322b 100644
--- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java
+++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java
@@ -2,15 +2,20 @@ package pl.edu.mimuw.cloudatlas.agent;
import org.junit.*;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage;
import pl.edu.mimuw.cloudatlas.agent.modules.Module;
import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
import pl.edu.mimuw.cloudatlas.agent.modules.UDUP;
+import pl.edu.mimuw.cloudatlas.model.AttributesMap;
import pl.edu.mimuw.cloudatlas.model.PathName;
import pl.edu.mimuw.cloudatlas.model.ValueContact;
+import pl.edu.mimuw.cloudatlas.model.ValueInt;
import java.net.InetAddress;
import java.net.UnknownHostException;
+// TODO add serialization tests that target custom serializers (type collections!)
+
public class UDUPTest {
@Test
@@ -39,6 +44,7 @@ public class UDUPTest {
UDUPMessage testContent = new UDUPMessage();
testContent.setDestinationModule(ModuleType.TEST);
+ testContent.setMessageId("singleMsgTest");
msg1 = new UDUPMessage(
"udup1",
@@ -78,45 +84,65 @@ public class UDUPTest {
Assert.fail();
}
}
-/*
+
@Test
public void bigMessageBetweenUDUPs() {
- UDUP udp1 = new UDUP(
- ModuleType.UDP,
- 5999,
- 1000,
- 5000,
- 256);
-
- UDUP udp2 = new UDUP(
- ModuleType.UDP,
- 5998,
- 1000,
- 5000,
- 256);
-
- AttributesMap bigAttrib = new AttributesMap();
- for (Long i = 1L; i < 20; i++) {
- bigAttrib.add(i.toString(), new ValueInt(i));
+ UDUP udp1 = null;
+ UDUP udp2 = null;
+ UDUPMessage msg1 = null;
+ boolean testSuccess = true;
+ int timeout = 5000;
+
+ try {
+ System.out.println("Starting udp1");
+
+ udp1 = new UDUP(
+ InetAddress.getByName("127.0.0.2"),
+ 5998,
+ timeout,
+ 30);
+
+ System.out.println("Starting udp2");
+
+ udp2 = new UDUP(
+ InetAddress.getByName("127.0.0.3"),
+ 5998,
+ timeout,
+ 30);
+
+ UDUPMessage testContent = new UDUPMessage();
+ testContent.setDestinationModule(ModuleType.TEST);
+ testContent.setMessageId("bigMsgTest");
+
+ msg1 = new UDUPMessage(
+ "udup1",
+ new ValueContact(new PathName("/udp2"), InetAddress.getByName("127.0.0.3")),
+ testContent
+ );
+
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
}
- UDUPMessage msg1 = new UDUPMessage(
- "udp1",
- ModuleType.UDP,
- "localhost",
- 5998,
- new UpdateAttributesMessage("updateattrib1", 0,"/", bigAttrib),
- 0,
- "conv1");
+ Thread udpThread1 = new Thread(udp1);
+ udpThread1.start();
+ Thread udpThread2 = new Thread(udp2);
+ udpThread2.start();
try {
udp1.handle(msg1);
- Thread.sleep(1000);
- UDUPMessage conv = udp2.fetchConversation("conv1");
- Assert.assertEquals(conv.getConversationId(), "conv1");
+ Thread.sleep(timeout + 1000);
} catch (InterruptedException | Module.InvalidMessageType e) {
e.printStackTrace();
- } catch (UDUP.InvalidConversation noConversationError) {
+ testSuccess = false;
+ }
+
+ udpThread1.interrupt();
+ udpThread2.interrupt();
+
+ if (testSuccess) {
+ Assert.assertTrue(true);
+ } else {
Assert.fail();
}
}
@@ -124,53 +150,69 @@ public class UDUPTest {
@Test
public void sendMultipleMessages() {
- @Test
- public void messageBetweenUDUPs() {
- UDUP udp1 = new UDUP(
- ModuleType.UDP,
- 5999,
- 1000,
- 5000,
- 3,
- 256);
+ UDUP udp1 = null;
+ UDUP udp2 = null;
+ UDUPMessage msg1 = null;
+ UDUPMessage msg2 = null;
+ UDUPMessage msg3 = null;
+ boolean testSuccess = true;
+ int timeout = 5000;
- UDUP udp2 = new UDUP(
- ModuleType.UDP,
- 5998,
- 1000,
- 5000,
- 3,
- 256);
+ try {
+ System.out.println("Starting udp1");
- UDUPMessage msg1 = new UDUPMessage(
- "1",
- ModuleType.UDP,
- "localhost",
- 5998,
- null,
- 0,
- "conv1");
+ udp1 = new UDUP(
+ InetAddress.getByName("127.0.0.2"),
+ 5997,
+ timeout,
+ 1000);
+ System.out.println("Starting udp2");
+ udp2 = new UDUP(
+ InetAddress.getByName("127.0.0.3"),
+ 5997,
+ timeout,
+ 1000);
+ UDUPMessage testContent = new UDUPMessage();
+ testContent.setDestinationModule(ModuleType.TEST);
+ testContent.setMessageId("multipleMsgTest");
- try {
- udp1.handle(msg1);
- udp1.handle(msg2);
- udp1.handle(msg3);
- Thread.sleep(1000);
- UDUPMessage conv1 = udp2.fetchConversation("conv1");
- Assert.assertEquals(conv1.getConversationId(), "conv1");
- UDUPMessage conv2 = udp2.fetchConversation("conv2");
- Assert.assertEquals(conv2.getConversationId(), "conv2");
- UDUPMessage conv3 = udp2.fetchConversation("conv3");
- Assert.assertEquals(conv3.getConversationId(), "conv3");
- } catch (InterruptedException | Module.InvalidMessageType e) {
- e.printStackTrace();
- } catch (UDUP.InvalidConversation invalidConversation) {
- Assert.fail();
- }
+ msg1 = new UDUPMessage(
+ "udup1",
+ new ValueContact(new PathName("/udp2"), InetAddress.getByName("127.0.0.3")),
+ testContent
+ );
+
+ msg2 = new UDUPMessage(
+ "udup2",
+ new ValueContact(new PathName("/udp2"), InetAddress.getByName("127.0.0.3")),
+ testContent
+ );
+
+ msg3 = new UDUPMessage(
+ "udup3",
+ new ValueContact(new PathName("/udp2"), InetAddress.getByName("127.0.0.3")),
+ testContent
+ );
+
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+
+ Thread udpThread1 = new Thread(udp1);
+ udpThread1.start();
+ Thread udpThread2 = new Thread(udp2);
+ udpThread2.start();
+
+ try {
+ udp1.handle(msg1);
+ udp1.handle(msg2);
+ udp1.handle(msg3);
+ Thread.sleep(timeout + 2000);
+ } catch (InterruptedException | Module.InvalidMessageType e) {
+ e.printStackTrace();
}
}
-*/
}