m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
blob: b71a475c85797a97c2420a4c2ab0eb719cf09c3f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package pl.edu.mimuw.cloudatlas.agent.modules;

import com.google.common.primitives.Bytes;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.NoSuchElementException;

public class UDUPServer {
    private UDUP udp;
    private UDUPSerializer serializer;
    private DatagramSocket socket;
    private InetAddress address;
    private HashMap<String, ArrayList<byte[]>> partialPackets;
    private int bufSize;

    public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException {
        this.udp = udp;
        this.socket = new DatagramSocket(port, addr);
        this.address = addr;
        this.bufSize = bufSize;
        this.partialPackets = new HashMap<>();
        this.serializer = new UDUPSerializer();
    }

    public void acceptMessage() throws IOException, InterruptedException {
        DatagramPacket packet = receivePacket();
        int transmissionNo = readTransmissionNo(packet.getData());
        String transmissionID = packTransmissionID(transmissionNo, packet.getAddress());
        int packetNo = readPacketNo(packet.getData());
        byte[] packetData = trimPacketBuffer(packet.getData());

        if (packetNo == 1 && packet.getLength() < this.bufSize) {
            UDUPMessage msg = this.serializer.deserialize(packetData);
            System.out.println("UDP received message " + msg.getContent().getMessageId());
            sendMessageFurther(msg);
        } else {
            System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo);
            this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData);
        }
    }

    private DatagramPacket receivePacket() throws IOException {
        byte[] buf = new byte[bufSize];
        DatagramPacket packet = new DatagramPacket(buf, buf.length);
        this.socket.receive(packet);
        System.out.println("UDP " + this.address + " received packet from " + packet.getAddress());
        return packet;
    }

    private void sendMessageFurther(UDUPMessage msg) throws InterruptedException {
        if (msg.getContent().getDestinationModule() == ModuleType.TEST) {
            System.out.println("UDP server: test message received");
        } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) {
            this.udp.sendMessage(msg.getContent());
        }
    }

    private int readTransmissionNo(byte[] packetData) throws IOException {
        ByteArrayInputStream in = new ByteArrayInputStream(packetData, 0, 4);
        byte[] byteBuf = new byte[4];
        in.read(byteBuf);
        return ByteBuffer.wrap(byteBuf).getInt();
    }

    private int readPacketNo(byte[] packetData) throws IOException {
        ByteArrayInputStream in = new ByteArrayInputStream(packetData,4, 4);
        byte[] byteBuf = new byte[4];
        in.read(byteBuf);
        return ByteBuffer.wrap(byteBuf).getInt();
    }

    private byte[] trimPacketBuffer(byte[] packetData) {
        int newPacketDataSize = packetData.length - 8;
        byte[] newPacketData = new byte[newPacketDataSize];
        System.arraycopy(packetData, 8, newPacketData, 0, newPacketDataSize);
        return newPacketData;
    }

    private String packTransmissionID(int transmissionNo, InetAddress contactAddr) {
        return contactAddr.getHostAddress() + ":" + transmissionNo;
    }

    private byte[] concatPacketData(String transmissionID, int newPacketNo, byte[] newPacketData) {
        ArrayList<byte[]> previousPacketData = this.partialPackets.get(transmissionID);
        byte[] fullData = new byte[0];

        previousPacketData.add(newPacketNo - 1, newPacketData);
        this.partialPackets.put(transmissionID, previousPacketData);

        if (previousPacketData.contains(null)) {
            throw new NoSuchElementException("Packet is not full");
        } else {
            for (byte[] prevData : previousPacketData) {
                fullData = Bytes.concat(fullData, prevData);
            }
        }

        return fullData;
    }

    public void addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) {
        if (this.partialPackets.containsKey(transmissionID)) {
            try {
                byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData);
                UDUPMessage msg = this.serializer.deserialize(allPacketData);
                this.partialPackets.remove(transmissionID);
                System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId());
                this.udp.sendMessage(msg.getContent());
            } catch (Error | Exception e) {
                System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest");
            }
        } else {
            ArrayList<byte[]> newTransmission = new ArrayList<byte[]>();
            newTransmission.add(newPacketNo-1, packetData);
            this.partialPackets.put(transmissionID, newTransmission);
        }
    }

    public void close() {
        this.socket.close();
    }

}