diff options
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java')
| -rw-r--r-- | src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java | 48 | 
1 files changed, 39 insertions, 9 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index b71a475..94882e4 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules;  import com.google.common.primitives.Bytes;  import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.model.ValueUtils;  import java.io.ByteArrayInputStream;  import java.io.IOException; @@ -10,22 +11,42 @@ import java.nio.ByteBuffer;  import java.util.ArrayList;  import java.util.HashMap;  import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicBoolean; -public class UDUPServer { +public class UDUPServer implements Runnable {      private UDUP udp;      private UDUPSerializer serializer;      private DatagramSocket socket;      private InetAddress address;      private HashMap<String, ArrayList<byte[]>> partialPackets;      private int bufSize; +    private final AtomicBoolean running; -    public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException { -        this.udp = udp; +    public UDUPServer(InetAddress addr, int port, int bufSize) throws SocketException {          this.socket = new DatagramSocket(port, addr);          this.address = addr;          this.bufSize = bufSize;          this.partialPackets = new HashMap<>();          this.serializer = new UDUPSerializer(); +        this.running = new AtomicBoolean(false); +    } + +    public void setUDUP(UDUP udup) { +        this.udp = udup; +    } + +    public void run() { +        System.out.println("UDP server running"); +        this.running.getAndSet(true); +        while(this.running.get()) { +            try { +                this.acceptMessage(); +            } catch (IOException | InterruptedException e) { +                e.printStackTrace(); +                this.running.getAndSet(false); +                this.close(); +            } +        }      }      public void acceptMessage() throws IOException, InterruptedException { @@ -34,14 +55,19 @@ public class UDUPServer {          String transmissionID = packTransmissionID(transmissionNo, packet.getAddress());          int packetNo = readPacketNo(packet.getData());          byte[] packetData = trimPacketBuffer(packet.getData()); +        UDUPMessage msg;          if (packetNo == 1 && packet.getLength() < this.bufSize) { -            UDUPMessage msg = this.serializer.deserialize(packetData); +            msg = this.serializer.deserialize(packetData); +            msg.getContent().setReceivedTimestamp(ValueUtils.currentTime());              System.out.println("UDP received message " + msg.getContent().getMessageId()); -            sendMessageFurther(msg);          } else {              System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo); -            this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); +            msg = this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); +        } + +        if (msg != null) { +            sendMessageFurther(msg);          }      } @@ -104,14 +130,16 @@ public class UDUPServer {          return fullData;      } -    public void addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { +    public UDUPMessage addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { +        UDUPMessage msg = null; +          if (this.partialPackets.containsKey(transmissionID)) {              try {                  byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData); -                UDUPMessage msg = this.serializer.deserialize(allPacketData); +                msg = this.serializer.deserialize(allPacketData); +                msg.getContent().setReceivedTimestamp(ValueUtils.currentTime());                  this.partialPackets.remove(transmissionID);                  System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId()); -                this.udp.sendMessage(msg.getContent());              } catch (Error | Exception e) {                  System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest");              } @@ -120,6 +148,8 @@ public class UDUPServer {              newTransmission.add(newPacketNo-1, packetData);              this.partialPackets.put(transmissionID, newTransmission);          } + +        return msg;      }      public void close() {  |