m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/pl/edu/mimuw/cloudatlas
diff options
context:
space:
mode:
authorMagdalena Grodzińska <mag.grodzinska@gmail.com>2020-01-06 16:14:44 +0100
committerMagdalena Grodzińska <mag.grodzinska@gmail.com>2020-01-06 16:14:44 +0100
commitab9a470aa67ef414581145ad671e119d9edb86d9 (patch)
tree6d48ae324b9e83c06e4eb598ddc05291ba14ea70 /src/main/java/pl/edu/mimuw/cloudatlas
parentaa7278e17afb5129034bbe1af1cf4ca3c6ba3e90 (diff)
Refactor UDUP module
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java79
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java64
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java94
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java30
4 files changed, 137 insertions, 130 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
index e4a7962..e616c93 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
@@ -1,19 +1,10 @@
package pl.edu.mimuw.cloudatlas.agent.modules;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.primitives.Bytes;
-import pl.edu.mimuw.cloudatlas.agent.EventBus;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -32,37 +23,28 @@ import java.util.concurrent.atomic.AtomicBoolean;
* due to ValueContact design
*/
-public class UDUP extends Module implements Runnable {
- public class InvalidConversation extends Exception {
- public InvalidConversation(String message) { super(message); }
- }
-
- public class InvalidContact extends Exception {
- public InvalidContact(String message) { super(message); }
- }
+// TODO set server port in global config - must be the same everywhere
+// TODO same with buffer size
+public class UDUP extends Module implements Runnable {
private UDUPClient client;
private UDUPServer server;
- private HashMap<String, UDUPMessage> currentConversations; // TODO find blocking one
private final AtomicBoolean running;
- public UDUP(ModuleType moduleType,
- InetAddress serverAddr,
+ public UDUP(InetAddress serverAddr,
int serverPort,
- int retryTimeout,
- int retriesCount,
+ int timeout,
int bufferSize) {
- super(moduleType);
- this.currentConversations = new HashMap<>();
- this.running = new AtomicBoolean(true);
+ super(ModuleType.UDP);
+ this.running = new AtomicBoolean(false);
try {
- this.client = new UDUPClient(this, serverPort, retryTimeout, retriesCount, bufferSize);
+ this.client = new UDUPClient(this, serverPort, bufferSize);
this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize);
- } catch (SocketException | UnknownHostException e) {
+ this.running.getAndSet(true);
+ } catch (SocketException e) {
e.printStackTrace();
this.client.close();
this.server.close();
- this.running.getAndSet(false);
}
}
@@ -80,41 +62,12 @@ public class UDUP extends Module implements Runnable {
}
public void handleTyped(UDUPMessage event) throws InterruptedException {
-// System.out.println("UDP sending message " + event.getContent().getMessageId());
- this.client.sendMessage(event);
- }
-
- // also used for updating
- public void addConversation(UDUPMessage msg) {
- this.currentConversations.put(msg.getConversationId(), msg);
- }
-
- public UDUPMessage fetchConversation(String conversationId) throws InvalidConversation {
- UDUPMessage ret = this.currentConversations.get(conversationId);
- if (ret == null) {
- throw new InvalidConversation("Conversation does not exist");
- } else {
- return ret;
+ System.out.println("UDP sending message " + event.getContent().getMessageId());
+ try {
+ this.client.sendMessage(event);
+ } catch (IOException e) {
+ System.out.println("UDP send message failed");
+ e.printStackTrace();
}
}
-
- // TODO add conversation removal
- public void removeConversation(String conversationId) {
- this.currentConversations.remove(conversationId);
- }
-
- public UDUPMessage deserialize(ByteArrayInputStream in) {
- Kryo kryo = new Kryo();
- Input kryoInput = new Input(in);
- UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class);
- return msg;
- }
-
- public void serialize(ByteArrayOutputStream out, UDUPMessage msg) {
- Kryo kryo = new Kryo();
- Output kryoOut = new Output(out);
- kryo.writeObject(kryoOut, msg);
- kryoOut.flush();
- }
-
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
index 93f2898..82aaeb1 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
@@ -1,83 +1,41 @@
package pl.edu.mimuw.cloudatlas.agent.modules;
-import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.*;
public class UDUPClient {
private UDUP udp;
+ private UDUPSerializer serializer;
private int serverPort;
- private int timeout;
- private int retriesCount;
private DatagramSocket socket;
private int bufsize;
- UDUPClient(UDUP udp, int serverPort, int timeout, int retriesCount, int bufferSize) throws SocketException, UnknownHostException {
+ UDUPClient(UDUP udp, int serverPort, int bufferSize) throws SocketException {
this.udp = udp;
this.serverPort = serverPort;
- this.timeout = timeout;
- this.retriesCount = retriesCount;
this.socket = new DatagramSocket();
this.bufsize = bufferSize;
+ this.serializer = new UDUPSerializer();
}
- // TODO make sure that retry count in message is updated correctly
- public void sendMessage(UDUPMessage msg) throws InterruptedException {
- String messageId = msg.getMessageId();
-
- if (msg.getRetry() >= this.retriesCount) {
- this.udp.removeConversation(msg.getConversationId());
- } else {
- this.udp.addConversation(msg);
- try {
- sendUDP(msg);
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- msg.setRetry(msg.getRetry() + 1);
-
- // TODO add sending message to timer with retry
- /*
- this.udp.executor.passMessage(new TimerSchedulerMessage(
- "",
- 0,
- "",
- this.timeout,
- System.currentTimeMillis() / 1000L,
- new TimerScheduledTask() {
- @Override
- public void run() {
- try {
- this.sendMessage(msg);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }));
-
- */
- }
- }
-
- private void sendUDP(UDUPMessage msg) throws IOException {
+ public void sendMessage(UDUPMessage msg) throws IOException {
int offset = 0;
int outputSize;
- byte[] buf = new byte[bufsize];
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- this.udp.serialize(output, msg);
- outputSize = output.size();
+ byte[] buf = this.serializer.serialize(msg);
+ outputSize = buf.length;
do {
- output.write(buf, offset, bufsize);
- System.out.println("UDP sends message: " + buf);
outputSize =- bufsize;
offset += bufsize;
DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort);
+ System.out.println("UDP sends message: ");
+ for (byte b : buf) {
+ System.out.print(b);
+ }
+ System.out.println("to " + packet.getAddress());
this.socket.send(packet);
} while (outputSize > bufsize);
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
index 3196a97..ac35265 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
@@ -1,4 +1,98 @@
package pl.edu.mimuw.cloudatlas.agent.modules;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+import pl.edu.mimuw.cloudatlas.model.PathName;
+import pl.edu.mimuw.cloudatlas.model.ValueContact;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * Serializes classes to and from byte arrays for UDP use
+ */
public class UDUPSerializer {
+ private Kryo kryo;
+
+ UDUPSerializer() {
+ kryo = new Kryo();
+ kryo.setReferences(true);
+ kryo.setRegistrationRequired(true);
+ registerClasses();
+ }
+
+ private void registerClasses() {
+
+ kryo.register(Inet4Address.class, new Serializer() {
+
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ InetAddress ia = (InetAddress) object;
+ kryo.writeObject(output, ia.getAddress());
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
+ try {
+ byte[] buf = kryo.readObject(input, byte[].class);
+ InetAddress addr = Inet4Address.getByAddress(buf);
+ return addr;
+ } catch (UnknownHostException e) {
+ System.out.println("Custom InetAddress read failed");
+ e.printStackTrace();
+ return null;
+ }
+ }
+ });
+
+ kryo.register(PathName.class, new Serializer() {
+
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ PathName pn = (PathName) object;
+ kryo.writeObject(output, pn.getName());
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
+ String addr = input.readString();
+ return new PathName(addr);
+ }
+ });
+
+ kryo.register(byte[].class);
+ kryo.register(ValueContact.class);
+ kryo.register(ModuleType.class);
+
+ kryo.register(AgentMessage.class);
+ kryo.register(GetStateMessage.class);
+ kryo.register(UDUPMessage.class);
+ kryo.register(StanikMessage.Type.class);
+ kryo.register(StanikMessage.class);
+ }
+
+ public UDUPMessage deserialize(byte[] packetData) {
+ ByteArrayInputStream in = new ByteArrayInputStream(packetData);
+ Input kryoInput = new Input(in);
+ UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class);
+ return msg;
+ }
+
+ public byte[] serialize(UDUPMessage msg) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Output kryoOut = new Output(out);
+ kryo.writeObject(kryoOut, msg);
+ kryoOut.flush();
+ kryoOut.close();
+ return out.toByteArray();
+ }
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index f89f986..9a64bbd 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -3,41 +3,43 @@ package pl.edu.mimuw.cloudatlas.agent.modules;
import com.google.common.primitives.Bytes;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.*;
import java.util.HashMap;
public class UDUPServer {
- UDUP udp;
+ private UDUP udp;
+ private UDUPSerializer serializer;
private DatagramSocket socket;
private InetAddress address;
- private byte[] buf;
private HashMap<InetAddress, byte[]> partialPackets;
+ private int bufSize;
- public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException, UnknownHostException {
+ public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException {
this.udp = udp;
this.socket = new DatagramSocket(port, addr);
this.address = addr;
- this.buf = new byte[bufSize];
+ this.bufSize = bufSize;
this.partialPackets = new HashMap<>();
+ this.serializer = new UDUPSerializer();
}
public void acceptMessage() throws IOException, InterruptedException {
+ byte[] buf = new byte[bufSize];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
this.socket.receive(packet);
- System.out.println("UDP received packet: " + packet.getData());
+ System.out.println("UDP " + this.address + " received packet from " + packet.getAddress());
if (packet.getOffset() == 0) {
- UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(packet.getData()));
- // TODO check if not full packet anyway
- // TODO check for errors if it's not the end od transmission
-
- this.udp.addConversation(msg);
+ UDUPMessage msg = this.serializer.deserialize(packet.getData());
System.out.println("UDP received message " + msg.getContent().getMessageId());
- if (msg.getDestinationModule() != ModuleType.UDP) {
- this.udp.sendMessage(msg.getContent());
+ if (packet.getLength() == this.bufSize) {
+ this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
+ } else {
+ if (msg.getDestinationModule() != ModuleType.UDP) {
+ this.udp.sendMessage(msg.getContent());
+ }
}
} else {
this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
@@ -49,7 +51,7 @@ public class UDUPServer {
byte[] previousPacketData = this.partialPackets.get(senderAddress);
byte[] allPacketData = Bytes.concat(previousPacketData, packetData);
try {
- UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(allPacketData));
+ UDUPMessage msg = this.serializer.deserialize(allPacketData);
this.udp.sendMessage(msg.getContent());
this.partialPackets.remove(senderAddress);
} catch (Error | Exception e) {