diff options
| author | Magdalena GrodziĆska <mag.grodzinska@gmail.com> | 2020-01-10 15:34:43 +0100 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-01-10 15:34:43 +0100 | 
| commit | 60025accad672dac8228b7bac1006126223e2e58 (patch) | |
| tree | 8ef924fd1525c6f03a354bea1957df98479f05bc /src/main/java/pl/edu/mimuw | |
| parent | 2feff1aa41c41008fcda2dd60c718cf09deb3fa1 (diff) | |
| parent | 90d3d2e3e1e116bbb288d78e9c6c996a7f1e0270 (diff) | |
Merge pull request #88 from m-chrzan/improve_udp_tests_and_kryo
Improve udp tests and kryo
Diffstat (limited to 'src/main/java/pl/edu/mimuw')
19 files changed, 246 insertions, 47 deletions
| diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java index f343e0f..c3fcb40 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/AgentMessage.java @@ -20,6 +20,8 @@ public abstract class AgentMessage {          this.timestamp = System.currentTimeMillis() / 1000L;      } +    public AgentMessage() {} +      public String getMessageId() {          return messageId;      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java index 63392e8..67be8e9 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GetStateMessage.java @@ -12,6 +12,8 @@ public class GetStateMessage extends StanikMessage {          this.requestId = requestId;      } +    public GetStateMessage() {} +      public ModuleType getRequestingModule() {          return requestingModule;      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java index 0161a37..97bef69 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/QurnikMessage.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.messages;  import pl.edu.mimuw.cloudatlas.agent.modules.Module;  import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; +import pl.edu.mimuw.cloudatlas.agent.modules.Qurnik;  public abstract class QurnikMessage extends AgentMessage {      public enum Type { @@ -15,6 +16,8 @@ public abstract class QurnikMessage extends AgentMessage {          this.type = type;      } +    public QurnikMessage() {} +      public Type getType() {          return type;      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java index b0300cb..e4bc1b6 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemikMessage.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.messages;  import pl.edu.mimuw.cloudatlas.agent.modules.Module;  import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; +import pl.edu.mimuw.cloudatlas.agent.modules.Remik;  public abstract class RemikMessage extends AgentMessage {      public enum Type { @@ -15,6 +16,8 @@ public abstract class RemikMessage extends AgentMessage {          this.type = type;      } +    public RemikMessage() {} +      public Type getType() {          return type;      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java index 9330185..e7b03ba 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoveZMIMessage.java @@ -12,6 +12,8 @@ public class RemoveZMIMessage extends StanikMessage {          this.removalTimestamp = removalTimestamp;      } +    public RemoveZMIMessage() {} +      public String getPathName() {          return pathName;      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java index 698aac7..a1fd279 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RequestStateMessage.java @@ -10,6 +10,8 @@ public class RequestStateMessage extends RemikMessage {          this.responseFuture = responseFuture;      } +    public RequestStateMessage() {} +      public CompletableFuture<ResponseMessage> getFuture() {          return responseFuture;      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java index 02b3337..b28f4b9 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/ResponseMessage.java @@ -17,6 +17,8 @@ public abstract class ResponseMessage extends AgentMessage {          this.requestId = requestId;      } +    public ResponseMessage() {} +      public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType {          module.handleTyped(this);      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java index 35f7819..03882a0 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RunQueriesMessage.java @@ -6,4 +6,6 @@ public class RunQueriesMessage extends QurnikMessage {      public RunQueriesMessage(String messageId, long timestamp) {          super(messageId, timestamp, Type.RUN_QUERIES);      } + +    public RunQueriesMessage() {}  } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java index 4888484..3de2b65 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/SetAttributeMessage.java @@ -19,6 +19,8 @@ public class SetAttributeMessage extends StanikMessage {          this.updateTimestamp = updateTimestamp;      } +    public SetAttributeMessage() {} +      public String getPathName() {          return pathName;      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java index 844f31c..3daa5f9 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StanikMessage.java @@ -19,6 +19,8 @@ public abstract class StanikMessage extends AgentMessage {          this.type = type;      } +    public StanikMessage() {} +      public Type getType() {          return type;      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java index f7f490b..759723b 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/StateMessage.java @@ -19,6 +19,8 @@ public class StateMessage extends ResponseMessage {          this.queries = queries;      } +    public StateMessage() {} +      public ZMI getZMI() {          return zmi;      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java index 8566d67..b42e181 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/TimerSchedulerMessage.java @@ -23,6 +23,8 @@ public class TimerSchedulerMessage extends AgentMessage {          this.task = task;      } +    public TimerSchedulerMessage() {} +      public long getDelay() {          return delay;      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java index fa8d1fa..3751b3c 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java @@ -20,9 +20,7 @@ public class UDUPMessage extends AgentMessage {          this.content = content;      } -    public UDUPMessage() { -        super("", ModuleType.UDP); -    } +    public UDUPMessage() {}      @Override      public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java index 7e41631..b943384 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateAttributesMessage.java @@ -13,6 +13,8 @@ public class UpdateAttributesMessage extends StanikMessage {          this.attributes = attributes;      } +    public UpdateAttributesMessage() {} +      public String getPathName() {          return pathName;      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java index 58ad55a..4b0b9c8 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UpdateQueriesMessage.java @@ -15,6 +15,8 @@ public class UpdateQueriesMessage extends StanikMessage {          this.queries = queries;      } +    public UpdateQueriesMessage() {} +      public Map<Attribute, Entry<ValueQuery, ValueTime>> getQueries() {          return queries;      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java index e616c93..8c6db8f 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java @@ -26,6 +26,12 @@ import java.util.concurrent.atomic.AtomicBoolean;  // TODO set server port in global config - must be the same everywhere  // TODO same with buffer size +// TODO separate server like newapiimpl +// TODO add timestamps as close to sending as possible + +// TODO wysylac tylko remotegossipgirl message +// TODO update timestampow odpowiedni w tym remotegossipgirlmessage +  public class UDUP extends Module implements Runnable {      private UDUPClient client;      private UDUPServer server; diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java index 82aaeb1..2e4f0b4 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java @@ -2,8 +2,10 @@ package pl.edu.mimuw.cloudatlas.agent.modules;  import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import javax.xml.crypto.Data;  import java.io.IOException;  import java.net.*; +import java.nio.ByteBuffer;  public class UDUPClient {      private UDUP udp; @@ -11,6 +13,7 @@ public class UDUPClient {      private int serverPort;      private DatagramSocket socket;      private int bufsize; +    private int lastTransmission;      UDUPClient(UDUP udp, int serverPort, int bufferSize) throws SocketException {          this.udp = udp; @@ -18,26 +21,62 @@ public class UDUPClient {          this.socket = new DatagramSocket();          this.bufsize = bufferSize;          this.serializer = new UDUPSerializer(); +        this.lastTransmission = 0;      } -    public void sendMessage(UDUPMessage msg) throws IOException { -        int offset = 0; -        int outputSize; +    private void logSending(DatagramPacket packet, int packetNo, byte[] buf) { +        System.out.print("UDP sends packet no " + packetNo + +                " with real bufsize " + (bufsize - 8) + +                " out of data buffer with size " + buf.length + +                " to " + packet.getAddress() + ": "); +        for (byte b : packet.getData()) { +            System.out.print(b); +        } +        System.out.println(); +    } + +    byte[] toByteArray(int val) { +        return ByteBuffer.allocate(4).putInt(val).array(); +    } + +    private byte[] packSendBuffer(int transmissionNo, int packetNo, byte[] buf) { +        byte[] sendBuf; +        int sendBufSize = bufsize - 8; + +        if (packetNo*sendBufSize >= buf.length) { +            int copyLength = buf.length - (packetNo-1)*sendBufSize; +            sendBuf = new byte[copyLength + 8]; +            System.arraycopy(buf, (packetNo-1)*sendBufSize, sendBuf, 8, copyLength); +        } else { +            sendBuf = new byte[bufsize]; +            System.arraycopy(buf, (packetNo-1)*sendBufSize, sendBuf, 8, sendBufSize); +        } -        byte[] buf = this.serializer.serialize(msg); -        outputSize = buf.length; +        System.arraycopy(toByteArray(transmissionNo), 0, sendBuf, 0, 4); +        System.arraycopy(toByteArray(packetNo), 0, sendBuf, 4, 4); + +        return sendBuf; +    } + +    boolean checkEndOfTransmission(int packetNo, int dataBufSize) { +        int sendBufSize = bufsize - 8; +        int dataSentSoFar = (packetNo - 1) * sendBufSize; +        return dataSentSoFar >= dataBufSize; +    } + +    public void sendMessage(UDUPMessage msg) throws IOException { +        int packetNo = 1; +        byte[] sendBuf; +        byte[] dataBuf = this.serializer.serialize(msg); +        this.lastTransmission++;          do { -            outputSize =- bufsize; -            offset += bufsize; -            DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort); -            System.out.println("UDP sends message: "); -            for (byte b : buf) { -                System.out.print(b); -            } -            System.out.println("to " + packet.getAddress()); +            sendBuf = packSendBuffer(this.lastTransmission, packetNo, dataBuf); +            DatagramPacket packet = new DatagramPacket(sendBuf, 0, sendBuf.length, msg.getContact().getAddress(), this.serverPort);              this.socket.send(packet); -        } while (outputSize > bufsize); +            logSending(packet, packetNo, dataBuf); +            packetNo++; +        } while (!checkEndOfTransmission(packetNo, dataBuf.length));      }      void close() { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java index ac35265..40c4d7c 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java @@ -4,18 +4,17 @@ import com.esotericsoftware.kryo.Kryo;  import com.esotericsoftware.kryo.Serializer;  import com.esotericsoftware.kryo.io.Input;  import com.esotericsoftware.kryo.io.Output; -import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; -import pl.edu.mimuw.cloudatlas.model.PathName; -import pl.edu.mimuw.cloudatlas.model.ValueContact; +import pl.edu.mimuw.cloudatlas.agent.messages.*; +import pl.edu.mimuw.cloudatlas.model.*;  import java.io.ByteArrayInputStream;  import java.io.ByteArrayOutputStream;  import java.net.Inet4Address;  import java.net.InetAddress;  import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap;  /**   * Serializes classes to and from byte arrays for UDP use @@ -69,15 +68,85 @@ public class UDUPSerializer {              }          }); -        kryo.register(byte[].class); +        kryo.register(ValueList.class, new Serializer() { +            @Override +            public void write(Kryo kryo, Output output, Object object) { +                ValueList vl = (ValueList) object; +                kryo.writeObject(output, ((TypeCollection) vl.getType()).getElementType()); +                kryo.writeObject(output, vl.getValue()); +            } + +            @Override +            public Object read(Kryo kryo, Input input, Class type) { +                Type t = kryo.readObject(input, Type.class); +                ArrayList list = kryo.readObject(input, ArrayList.class); +                return new ValueList(list, t); +            } +        }); + +        kryo.register(ValueSet.class, new Serializer() { +            @Override +            public void write(Kryo kryo, Output output, Object object) { +                ValueSet vs = (ValueSet) object; +                kryo.writeObject(output, ((TypeCollection) vs.getType()).getElementType()); +                kryo.writeObject(output, vs.getValue()); +            } + +            @Override +            public Object read(Kryo kryo, Input input, Class type) { +                Type t = kryo.readObject(input, Type.class); +                HashSet set = kryo.readObject(input, HashSet.class); +                return new ValueSet(set, t); +            } +        }); + +        // model +        kryo.register(Value.class); +        kryo.register(ValueBoolean.class);          kryo.register(ValueContact.class); -        kryo.register(ModuleType.class); +        kryo.register(ValueDuration.class); +        kryo.register(ValueInt.class); +        kryo.register(ValueNull.class); +        kryo.register(ValueQuery.class); +        kryo.register(ValueSet.class); +        kryo.register(ValueString.class); +        kryo.register(ValueTime.class); +        kryo.register(ValueUtils.class); +        kryo.register(ZMI.class); + +        kryo.register(Attribute.class); +        kryo.register(AttributesMap.class); +        kryo.register(AttributesUtil.class); +        kryo.register(Type.class); +        kryo.register(TypeCollection.class); +        kryo.register(TypePrimitive.class); + +        // messages in chronological order so it's easier to keep track          kryo.register(AgentMessage.class);          kryo.register(GetStateMessage.class); -        kryo.register(UDUPMessage.class); +        kryo.register(QurnikMessage.class); +        kryo.register(RemikMessage.class); +        kryo.register(RemoveZMIMessage.class); +        kryo.register(RequestStateMessage.class); +        kryo.register(ResponseMessage.class); +        kryo.register(RunQueriesMessage.class); +        kryo.register(SetAttributeMessage.class);          kryo.register(StanikMessage.Type.class);          kryo.register(StanikMessage.class); +        kryo.register(TimerSchedulerMessage.class); +        kryo.register(UDUPMessage.class); +        kryo.register(UpdateAttributesMessage.class); +        kryo.register(UpdateQueriesMessage.class); + +        // modules +        kryo.register(TimerScheduledTask.class); +        kryo.register(RecursiveScheduledTask.class); + +        // other +        kryo.register(byte[].class); +        kryo.register(LinkedHashMap.class); +        kryo.register(ModuleType.class);      }      public UDUPMessage deserialize(byte[] packetData) { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index 6807a86..b71a475 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -3,16 +3,20 @@ package pl.edu.mimuw.cloudatlas.agent.modules;  import com.google.common.primitives.Bytes;  import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import java.io.ByteArrayInputStream;  import java.io.IOException;  import java.net.*; +import java.nio.ByteBuffer; +import java.util.ArrayList;  import java.util.HashMap; +import java.util.NoSuchElementException;  public class UDUPServer {      private UDUP udp;      private UDUPSerializer serializer;      private DatagramSocket socket;      private InetAddress address; -    private HashMap<InetAddress, byte[]> partialPackets; +    private HashMap<String, ArrayList<byte[]>> partialPackets;      private int bufSize;      public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException { @@ -25,43 +29,96 @@ public class UDUPServer {      }      public void acceptMessage() throws IOException, InterruptedException { +        DatagramPacket packet = receivePacket(); +        int transmissionNo = readTransmissionNo(packet.getData()); +        String transmissionID = packTransmissionID(transmissionNo, packet.getAddress()); +        int packetNo = readPacketNo(packet.getData()); +        byte[] packetData = trimPacketBuffer(packet.getData()); + +        if (packetNo == 1 && packet.getLength() < this.bufSize) { +            UDUPMessage msg = this.serializer.deserialize(packetData); +            System.out.println("UDP received message " + msg.getContent().getMessageId()); +            sendMessageFurther(msg); +        } else { +            System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo); +            this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); +        } +    } + +    private DatagramPacket receivePacket() throws IOException {          byte[] buf = new byte[bufSize];          DatagramPacket packet = new DatagramPacket(buf, buf.length);          this.socket.receive(packet);          System.out.println("UDP " + this.address + " received packet from " + packet.getAddress()); +        return packet; +    } -        if (packet.getOffset() == 0) { -            UDUPMessage msg = this.serializer.deserialize(packet.getData()); -            System.out.println("UDP received message " + msg.getContent().getMessageId()); +    private void sendMessageFurther(UDUPMessage msg) throws InterruptedException { +        if (msg.getContent().getDestinationModule() == ModuleType.TEST) { +            System.out.println("UDP server: test message received"); +        } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) { +            this.udp.sendMessage(msg.getContent()); +        } +    } -            if (packet.getLength() == this.bufSize) { -                this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); -            } else  { -                if (msg.getContent().getDestinationModule() == ModuleType.TEST) { -                    System.out.println("UDP server: test message received"); -                } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) { -                    this.udp.sendMessage(msg.getContent()); -                } -            } +    private int readTransmissionNo(byte[] packetData) throws IOException { +        ByteArrayInputStream in = new ByteArrayInputStream(packetData, 0, 4); +        byte[] byteBuf = new byte[4]; +        in.read(byteBuf); +        return ByteBuffer.wrap(byteBuf).getInt(); +    } + +    private int readPacketNo(byte[] packetData) throws IOException { +        ByteArrayInputStream in = new ByteArrayInputStream(packetData,4, 4); +        byte[] byteBuf = new byte[4]; +        in.read(byteBuf); +        return ByteBuffer.wrap(byteBuf).getInt(); +    } + +    private byte[] trimPacketBuffer(byte[] packetData) { +        int newPacketDataSize = packetData.length - 8; +        byte[] newPacketData = new byte[newPacketDataSize]; +        System.arraycopy(packetData, 8, newPacketData, 0, newPacketDataSize); +        return newPacketData; +    } + +    private String packTransmissionID(int transmissionNo, InetAddress contactAddr) { +        return contactAddr.getHostAddress() + ":" + transmissionNo; +    } + +    private byte[] concatPacketData(String transmissionID, int newPacketNo, byte[] newPacketData) { +        ArrayList<byte[]> previousPacketData = this.partialPackets.get(transmissionID); +        byte[] fullData = new byte[0]; + +        previousPacketData.add(newPacketNo - 1, newPacketData); +        this.partialPackets.put(transmissionID, previousPacketData); + +        if (previousPacketData.contains(null)) { +            throw new NoSuchElementException("Packet is not full");          } else { -            this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); +            for (byte[] prevData : previousPacketData) { +                fullData = Bytes.concat(fullData, prevData); +            }          } + +        return fullData;      } -    public void addPartialMessageAndCheckSerialization(InetAddress senderAddress, byte[] packetData) { -        if (this.partialPackets.containsKey(senderAddress)) { -            byte[] previousPacketData = this.partialPackets.get(senderAddress); -            byte[] allPacketData = Bytes.concat(previousPacketData, packetData); +    public void addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { +        if (this.partialPackets.containsKey(transmissionID)) {              try { +                byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData);                  UDUPMessage msg = this.serializer.deserialize(allPacketData); +                this.partialPackets.remove(transmissionID); +                System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId());                  this.udp.sendMessage(msg.getContent()); -                this.partialPackets.remove(senderAddress);              } catch (Error | Exception e) {                  System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest"); -                this.partialPackets.put(senderAddress, allPacketData);              }          } else { -            this.partialPackets.put(senderAddress, packetData); +            ArrayList<byte[]> newTransmission = new ArrayList<byte[]>(); +            newTransmission.add(newPacketNo-1, packetData); +            this.partialPackets.put(transmissionID, newTransmission);          }      } |