diff options
| author | Magdalena Grodzińska <mag.grodzinska@gmail.com> | 2020-01-06 22:13:48 +0100 | 
|---|---|---|
| committer | Magdalena Grodzińska <mag.grodzinska@gmail.com> | 2020-01-06 23:12:10 +0100 | 
| commit | 2d7fe232b7c1f2ef62e7bf2f3100adb51e9bc0d4 (patch) | |
| tree | a061de5c7e26a0755721b20daff262591195b8d4 /src/main/java/pl/edu/mimuw | |
| parent | 28c905a7365a37d0874b865f513c64b31d8679f4 (diff) | |
Add segmentation handling
Diffstat (limited to 'src/main/java/pl/edu/mimuw')
| -rw-r--r-- | src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java | 62 | ||||
| -rw-r--r-- | src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java | 103 | 
2 files changed, 130 insertions, 35 deletions
| diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java index 82aaeb1..d7cbc9d 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java @@ -2,8 +2,10 @@ package pl.edu.mimuw.cloudatlas.agent.modules;  import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import javax.xml.crypto.Data;  import java.io.IOException;  import java.net.*; +import java.nio.ByteBuffer;  public class UDUPClient {      private UDUP udp; @@ -11,6 +13,7 @@ public class UDUPClient {      private int serverPort;      private DatagramSocket socket;      private int bufsize; +    private int lastTransmission;      UDUPClient(UDUP udp, int serverPort, int bufferSize) throws SocketException {          this.udp = udp; @@ -18,26 +21,57 @@ public class UDUPClient {          this.socket = new DatagramSocket();          this.bufsize = bufferSize;          this.serializer = new UDUPSerializer(); +        this.lastTransmission = 0;      } -    public void sendMessage(UDUPMessage msg) throws IOException { -        int offset = 0; -        int outputSize; +    private void logSending(DatagramPacket packet, int packetNo, byte[] buf) { +        System.out.print("UDP sends packet no " + packetNo + +                " with realbufsize " + (bufsize - 8) + +                " out of data buffer with size " + buf.length + +                " to " + packet.getAddress() + ": "); +        for (byte b : packet.getData()) { +            System.out.print(b); +        } +        System.out.println(); +    } -        byte[] buf = this.serializer.serialize(msg); -        outputSize = buf.length; +    byte[] toByteArray(int val) { +        return ByteBuffer.allocate(4).putInt(val).array(); +    } + +    private byte[] packSendBuffer(int transmissionNo, int packetNo, byte[] buf) { +        byte[] sendBuf = new byte[bufsize]; +        int sendBufSize = bufsize - 8; +        System.arraycopy(toByteArray(transmissionNo), 0, sendBuf, 0, 4); +        System.arraycopy(toByteArray(packetNo), 0, sendBuf, 4, 4); +        if (packetNo*sendBufSize >= buf.length) { +            System.arraycopy(buf, 0, sendBuf, 8, sendBufSize); +        } else { +            System.arraycopy(buf, (packetNo-1)*sendBufSize, sendBuf, 8, sendBufSize); +        } +        return sendBuf; +    } + +    boolean checkEndOfTransmission(int packetNo, int dataBufSize) { +        int sendBufSize = bufsize - 8; +        int dataSentSoFar = (packetNo - 1) * sendBufSize; +        System.out.println("used data " + dataSentSoFar + " " + dataBufSize); +        return dataSentSoFar >= dataBufSize; +    } + +    public void sendMessage(UDUPMessage msg) throws IOException { +        int packetNo = 1; +        byte[] sendBuf; +        byte[] dataBuf = this.serializer.serialize(msg); +        this.lastTransmission++;          do { -            outputSize =- bufsize; -            offset += bufsize; -            DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort); -            System.out.println("UDP sends message: "); -            for (byte b : buf) { -                System.out.print(b); -            } -            System.out.println("to " + packet.getAddress()); +            sendBuf = packSendBuffer(this.lastTransmission, packetNo, dataBuf); +            DatagramPacket packet = new DatagramPacket(sendBuf, bufsize, msg.getContact().getAddress(), this.serverPort);              this.socket.send(packet); -        } while (outputSize > bufsize); +            logSending(packet, packetNo, dataBuf); +            packetNo++; +        } while (!checkEndOfTransmission(packetNo, dataBuf.length));      }      void close() { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index fb79dc6..c7ceca2 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -3,16 +3,20 @@ package pl.edu.mimuw.cloudatlas.agent.modules;  import com.google.common.primitives.Bytes;  import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import java.io.ByteArrayInputStream;  import java.io.IOException;  import java.net.*; +import java.nio.ByteBuffer; +import java.util.ArrayList;  import java.util.HashMap; +import java.util.NoSuchElementException;  public class UDUPServer {      private UDUP udp;      private UDUPSerializer serializer;      private DatagramSocket socket;      private InetAddress address; -    private HashMap<InetAddress, byte[]> partialPackets; +    private HashMap<String, ArrayList<byte[]>> partialPackets;      private int bufSize;      public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException { @@ -25,42 +29,99 @@ public class UDUPServer {      }      public void acceptMessage() throws IOException, InterruptedException { -        byte[] buf = new byte[bufSize]; -        DatagramPacket packet = new DatagramPacket(buf, buf.length); -        this.socket.receive(packet); -        System.out.println("UDP " + this.address + " received packet from " + packet.getAddress()); +        DatagramPacket packet = receivePacket(); +        int transmissionNo = readTransmissionNo(packet.getData()); +        String transmissionID = packTransmissionID(transmissionNo, packet.getAddress()); +        int packetNo = readPacketNo(packet.getData()); +        byte[] packetData = trimPacketBuffer(packet.getData()); -        if (packet.getOffset() == 0) { +        if (packetNo == 0) {              if (packet.getLength() == this.bufSize) { -                this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); +                this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData);              } else  { -                UDUPMessage msg = this.serializer.deserialize(packet.getData()); +                UDUPMessage msg = this.serializer.deserialize(packetData);                  System.out.println("UDP received message " + msg.getContent().getMessageId()); -                if (msg.getContent().getDestinationModule() == ModuleType.TEST) { -                    System.out.println("UDP server: test message received"); -                } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) { -                    this.udp.sendMessage(msg.getContent()); -                } +                sendMessageFurther(msg);              }          } else { -            this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); +            this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData);          }      } -    public void addPartialMessageAndCheckSerialization(InetAddress senderAddress, byte[] packetData) { -        if (this.partialPackets.containsKey(senderAddress)) { -            byte[] previousPacketData = this.partialPackets.get(senderAddress); -            byte[] allPacketData = Bytes.concat(previousPacketData, packetData); +    private DatagramPacket receivePacket() throws IOException { +        byte[] buf = new byte[bufSize]; +        DatagramPacket packet = new DatagramPacket(buf, buf.length); +        this.socket.receive(packet); +        System.out.println("UDP " + this.address + " received packet from " + packet.getAddress()); +        return packet; +    } + +    private void sendMessageFurther(UDUPMessage msg) throws InterruptedException { +        if (msg.getContent().getDestinationModule() == ModuleType.TEST) { +            System.out.println("UDP server: test message received"); +        } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) { +            this.udp.sendMessage(msg.getContent()); +        } +    } + +    private int readTransmissionNo(byte[] packetData) throws IOException { +        ByteArrayInputStream in = new ByteArrayInputStream(packetData, 0, 4); +        byte[] byteBuf = new byte[4]; +        in.read(byteBuf); +        return ByteBuffer.wrap(byteBuf).getInt(); +    } + +    private int readPacketNo(byte[] packetData) throws IOException { +        ByteArrayInputStream in = new ByteArrayInputStream(packetData,4, 4); +        byte[] byteBuf = new byte[4]; +        in.read(byteBuf); +        return ByteBuffer.wrap(byteBuf).getInt(); +    } + +    private byte[] trimPacketBuffer(byte[] packetData) { +        int newPacketDataSize = packetData.length - 8; +        byte[] newPacketData = new byte[newPacketDataSize]; +        System.arraycopy(packetData, 8, newPacketData, 0, newPacketDataSize); +        return newPacketData; +    } + +    private String packTransmissionID(int transmissionNo, InetAddress contactAddr) { +        return contactAddr.getHostAddress() + ":" + transmissionNo; +    } + +    private byte[] concatPacketData(String transmissionID, int newPacketNo, byte[] newPacketData) { +        ArrayList<byte[]> previousPacketData = this.partialPackets.get(transmissionID); +        byte[] fullData = new byte[0]; + +        previousPacketData.add(newPacketNo, newPacketData); +        this.partialPackets.put(transmissionID, previousPacketData); + +        if (previousPacketData.contains(null)) { +            throw new NoSuchElementException("Packet is not full"); +        } else { +            for (byte[] prevData : previousPacketData) { +                fullData = Bytes.concat(fullData, prevData); +            } +        } + +        return fullData; +    } + +    public void addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { +        if (this.partialPackets.containsKey(transmissionID)) {              try { +                byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData);                  UDUPMessage msg = this.serializer.deserialize(allPacketData);                  this.udp.sendMessage(msg.getContent()); -                this.partialPackets.remove(senderAddress); +                this.partialPackets.remove(transmissionID); +                System.out.println("Kryo put together whole transmission");              } catch (Error | Exception e) {                  System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest"); -                this.partialPackets.put(senderAddress, allPacketData);              }          } else { -            this.partialPackets.put(senderAddress, packetData); +            ArrayList<byte[]> newTransmission = new ArrayList<byte[]>(); +            newTransmission.add(packetData); +            this.partialPackets.put(transmissionID, newTransmission);          }      } |