1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
package pl.edu.mimuw.cloudatlas.agent.modules;
import com.google.common.primitives.Bytes;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.NoSuchElementException;
public class UDUPServer {
private UDUP udp;
private UDUPSerializer serializer;
private DatagramSocket socket;
private InetAddress address;
private HashMap<String, ArrayList<byte[]>> partialPackets;
private int bufSize;
public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException {
this.udp = udp;
this.socket = new DatagramSocket(port, addr);
this.address = addr;
this.bufSize = bufSize;
this.partialPackets = new HashMap<>();
this.serializer = new UDUPSerializer();
}
public void acceptMessage() throws IOException, InterruptedException {
DatagramPacket packet = receivePacket();
int transmissionNo = readTransmissionNo(packet.getData());
String transmissionID = packTransmissionID(transmissionNo, packet.getAddress());
int packetNo = readPacketNo(packet.getData());
byte[] packetData = trimPacketBuffer(packet.getData());
if (packetNo == 1 && packet.getLength() < this.bufSize) {
UDUPMessage msg = this.serializer.deserialize(packetData);
System.out.println("UDP received message " + msg.getContent().getMessageId());
sendMessageFurther(msg);
} else {
System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo);
this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData);
}
}
private DatagramPacket receivePacket() throws IOException {
byte[] buf = new byte[bufSize];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
this.socket.receive(packet);
System.out.println("UDP " + this.address + " received packet from " + packet.getAddress());
return packet;
}
private void sendMessageFurther(UDUPMessage msg) throws InterruptedException {
if (msg.getContent().getDestinationModule() == ModuleType.TEST) {
System.out.println("UDP server: test message received");
} else if (msg.getContent().getDestinationModule() != ModuleType.UDP) {
this.udp.sendMessage(msg.getContent());
}
}
private int readTransmissionNo(byte[] packetData) throws IOException {
ByteArrayInputStream in = new ByteArrayInputStream(packetData, 0, 4);
byte[] byteBuf = new byte[4];
in.read(byteBuf);
return ByteBuffer.wrap(byteBuf).getInt();
}
private int readPacketNo(byte[] packetData) throws IOException {
ByteArrayInputStream in = new ByteArrayInputStream(packetData,4, 4);
byte[] byteBuf = new byte[4];
in.read(byteBuf);
return ByteBuffer.wrap(byteBuf).getInt();
}
private byte[] trimPacketBuffer(byte[] packetData) {
int newPacketDataSize = packetData.length - 8;
byte[] newPacketData = new byte[newPacketDataSize];
System.arraycopy(packetData, 8, newPacketData, 0, newPacketDataSize);
return newPacketData;
}
private String packTransmissionID(int transmissionNo, InetAddress contactAddr) {
return contactAddr.getHostAddress() + ":" + transmissionNo;
}
private byte[] concatPacketData(String transmissionID, int newPacketNo, byte[] newPacketData) {
ArrayList<byte[]> previousPacketData = this.partialPackets.get(transmissionID);
byte[] fullData = new byte[0];
previousPacketData.add(newPacketNo - 1, newPacketData);
this.partialPackets.put(transmissionID, previousPacketData);
if (previousPacketData.contains(null)) {
throw new NoSuchElementException("Packet is not full");
} else {
for (byte[] prevData : previousPacketData) {
fullData = Bytes.concat(fullData, prevData);
}
}
return fullData;
}
public void addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) {
if (this.partialPackets.containsKey(transmissionID)) {
try {
byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData);
UDUPMessage msg = this.serializer.deserialize(allPacketData);
this.partialPackets.remove(transmissionID);
System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId());
this.udp.sendMessage(msg.getContent());
} catch (Error | Exception e) {
System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest");
}
} else {
ArrayList<byte[]> newTransmission = new ArrayList<byte[]>();
newTransmission.add(newPacketNo-1, packetData);
this.partialPackets.put(transmissionID, newTransmission);
}
}
public void close() {
this.socket.close();
}
}
|