m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
blob: 677bf673549b177c7dd1ba895e730a6c57ce3b36 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package pl.edu.mimuw.cloudatlas.agent.modules;

import com.google.common.primitives.Bytes;
import pl.edu.mimuw.cloudatlas.ByteSerializer;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
import pl.edu.mimuw.cloudatlas.model.ValueUtils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class UDUPServer implements Runnable {
    private UDUP udp;
    private ByteSerializer serializer;
    private DatagramSocket socket;
    private InetAddress address;
    private HashMap<String, ArrayList<byte[]>> partialPackets;
    private HashMap<String, Long> partialPacketTimestamps;
    private int bufSize;
    private final AtomicBoolean running;
    private long freshnessPeriod;

    public UDUPServer(InetAddress addr, int port, int bufSize, long freshnessPeriod) throws SocketException {
        this.socket = new DatagramSocket(port);
        this.address = addr;
        this.bufSize = bufSize;
        this.partialPackets = new HashMap<>();
        this.serializer = new ByteSerializer();
        this.running = new AtomicBoolean(false);
        this.freshnessPeriod = freshnessPeriod;
        this.partialPacketTimestamps = new HashMap<>();
    }

    public void setUDUP(UDUP udup) {
        this.udp = udup;
    }

    private void purgeData() {
        long currentTime = System.currentTimeMillis();
        ArrayList<String> packetsToRemove = new ArrayList<>();
        for (Map.Entry<String, Long> packetTimestamp : partialPacketTimestamps.entrySet()) {
            if (packetTimestamp.getValue() + freshnessPeriod < currentTime) {
                packetsToRemove.add(packetTimestamp.getKey());
            }
        }

        for (String packetToRemove : packetsToRemove) {
            partialPacketTimestamps.remove(packetToRemove);
            partialPackets.remove(packetToRemove);
        }
    }

    public void run() {
        System.out.println("UDP server running");
        this.running.getAndSet(true);

        while(this.running.get()) {
            try {
                this.acceptMessage();
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
                this.running.getAndSet(false);
                this.close();
            }
        }
    }

    public void acceptMessage() throws IOException, InterruptedException {
        DatagramPacket packet = receivePacket();
        int transmissionNo = readTransmissionNo(packet.getData());
        String transmissionID = packTransmissionID(transmissionNo, packet.getAddress());
        int packetNo = readPacketNo(packet.getData());
        byte[] packetData = trimPacketBuffer(packet.getData());
        UDUPMessage msg;

        if (packetNo == 1 && packet.getLength() < this.bufSize) {
            msg = (UDUPMessage) this.serializer.deserialize(packetData, UDUPMessage.class);
            System.out.println("UDP received message " + msg.getContent().getMessageId());
        } else {
            System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo);
            msg = this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData);
        }

        if (msg != null) {
            msg.getContent().setReceivedTimestamp(ValueUtils.currentTime());
            msg.getContent().setSenderAddress(packet.getAddress());
            sendMessageFurther(msg);
        }

        purgeData();
    }

    private DatagramPacket receivePacket() throws IOException {
        byte[] buf = new byte[bufSize];
        DatagramPacket packet = new DatagramPacket(buf, buf.length);
        this.socket.receive(packet);
        System.out.println("UDP " + this.address + " received packet from " + packet.getAddress());
        return packet;
    }

    private void sendMessageFurther(UDUPMessage msg) throws InterruptedException {
        if (msg.getContent().getDestinationModule() == ModuleType.TEST) {
            System.out.println("UDP server: test message received");
        } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) {
            this.udp.sendMessage(msg.getContent());
        }
    }

    private int readTransmissionNo(byte[] packetData) throws IOException {
        ByteArrayInputStream in = new ByteArrayInputStream(packetData, 0, 4);
        byte[] byteBuf = new byte[4];
        in.read(byteBuf);
        return ByteBuffer.wrap(byteBuf).getInt();
    }

    private int readPacketNo(byte[] packetData) throws IOException {
        ByteArrayInputStream in = new ByteArrayInputStream(packetData,4, 4);
        byte[] byteBuf = new byte[4];
        in.read(byteBuf);
        return ByteBuffer.wrap(byteBuf).getInt();
    }

    private byte[] trimPacketBuffer(byte[] packetData) {
        int newPacketDataSize = packetData.length - 8;
        byte[] newPacketData = new byte[newPacketDataSize];
        System.arraycopy(packetData, 8, newPacketData, 0, newPacketDataSize);
        return newPacketData;
    }

    private String packTransmissionID(int transmissionNo, InetAddress contactAddr) {
        return contactAddr.getHostAddress() + ":" + transmissionNo;
    }

    private byte[] concatPacketData(String transmissionID, int newPacketNo, byte[] newPacketData) {
        ArrayList<byte[]> previousPacketData = this.partialPackets.get(transmissionID);
        byte[] fullData = new byte[0];

        previousPacketData.add(newPacketNo - 1, newPacketData);
        this.partialPackets.put(transmissionID, previousPacketData);
        this.partialPacketTimestamps.put(transmissionID, System.currentTimeMillis());

        if (previousPacketData.contains(null)) {
            throw new NoSuchElementException("Packet is not full");
        } else {
            for (byte[] prevData : previousPacketData) {
                fullData = Bytes.concat(fullData, prevData);
            }
        }

        return fullData;
    }

    public UDUPMessage addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) {
        UDUPMessage msg = null;

        this.partialPacketTimestamps.put(transmissionID, System.currentTimeMillis());
        if (this.partialPackets.containsKey(transmissionID)) {
            try {
                byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData);
                msg = (UDUPMessage) this.serializer.deserialize(allPacketData, UDUPMessage.class);
                this.partialPackets.remove(transmissionID);
                System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId());
            } catch (Error | Exception e) {
                System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest");
            }
        } else {
            ArrayList<byte[]> newTransmission = new ArrayList<byte[]>();
            newTransmission.add(newPacketNo-1, packetData);
            this.partialPackets.put(transmissionID, newTransmission);
        }

        return msg;
    }

    public void close() {
        this.socket.close();
    }

}