m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules
diff options
context:
space:
mode:
authorMagdalena Grodzińska <mag.grodzinska@gmail.com>2020-01-05 18:00:56 +0100
committerMagdalena Grodzińska <mag.grodzinska@gmail.com>2020-01-05 18:00:56 +0100
commitbfd4c421004ce35a9eac307c3ac4976caa1820ae (patch)
treebcae85fc193c2c1cd8f410821ae10f66f9b18093 /src/main/java/pl/edu/mimuw/cloudatlas/agent/modules
parent69480d460a698a78b90d8d111f5fb4d761ffda81 (diff)
UDUP with no serialization working
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent/modules')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java11
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java120
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java89
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java68
4 files changed, 282 insertions, 6 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
index 0a934cb..67fdab9 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
@@ -1,12 +1,7 @@
package pl.edu.mimuw.cloudatlas.agent.modules;
import pl.edu.mimuw.cloudatlas.agent.Executor;
-import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.QurnikMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.RemikMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.*;
/*
* A Module is a (potentially stateful) event handler.
@@ -49,6 +44,10 @@ public abstract class Module {
throw new InvalidMessageType("Got a ResponseMessage in module " + moduleType.toString());
}
+ public void handleTyped(UDUPMessage message) throws InterruptedException, InvalidMessageType {
+ throw new InvalidMessageType("Got a ResponseMessage in module " + moduleType.toString());
+ }
+
public void setExecutor(Executor executor) {
this.executor = executor;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
new file mode 100644
index 0000000..e4a7962
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
@@ -0,0 +1,120 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.primitives.Bytes;
+import pl.edu.mimuw.cloudatlas.agent.EventBus;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Communication over UDP
+ *
+ * Client-server exchange pattern:
+ * server: init message
+ * client: ack message
+ *
+ * retry count
+ * retry timeout after which retry happens
+ **
+ * udp sends initiator module success/failure information
+ *
+ * we have udps on different addresses with the same ports
+ * due to ValueContact design
+ */
+
+public class UDUP extends Module implements Runnable {
+ public class InvalidConversation extends Exception {
+ public InvalidConversation(String message) { super(message); }
+ }
+
+ public class InvalidContact extends Exception {
+ public InvalidContact(String message) { super(message); }
+ }
+
+ private UDUPClient client;
+ private UDUPServer server;
+ private HashMap<String, UDUPMessage> currentConversations; // TODO find blocking one
+ private final AtomicBoolean running;
+
+ public UDUP(ModuleType moduleType,
+ InetAddress serverAddr,
+ int serverPort,
+ int retryTimeout,
+ int retriesCount,
+ int bufferSize) {
+ super(moduleType);
+ this.currentConversations = new HashMap<>();
+ this.running = new AtomicBoolean(true);
+ try {
+ this.client = new UDUPClient(this, serverPort, retryTimeout, retriesCount, bufferSize);
+ this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize);
+ } catch (SocketException | UnknownHostException e) {
+ e.printStackTrace();
+ this.client.close();
+ this.server.close();
+ this.running.getAndSet(false);
+ }
+ }
+
+ public void run() {
+ System.out.println("UDP server running");
+ while(this.running.get()) {
+ try {
+ this.server.acceptMessage();
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ this.running.getAndSet(false);
+ this.server.close();
+ }
+ }
+ }
+
+ public void handleTyped(UDUPMessage event) throws InterruptedException {
+// System.out.println("UDP sending message " + event.getContent().getMessageId());
+ this.client.sendMessage(event);
+ }
+
+ // also used for updating
+ public void addConversation(UDUPMessage msg) {
+ this.currentConversations.put(msg.getConversationId(), msg);
+ }
+
+ public UDUPMessage fetchConversation(String conversationId) throws InvalidConversation {
+ UDUPMessage ret = this.currentConversations.get(conversationId);
+ if (ret == null) {
+ throw new InvalidConversation("Conversation does not exist");
+ } else {
+ return ret;
+ }
+ }
+
+ // TODO add conversation removal
+ public void removeConversation(String conversationId) {
+ this.currentConversations.remove(conversationId);
+ }
+
+ public UDUPMessage deserialize(ByteArrayInputStream in) {
+ Kryo kryo = new Kryo();
+ Input kryoInput = new Input(in);
+ UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class);
+ return msg;
+ }
+
+ public void serialize(ByteArrayOutputStream out, UDUPMessage msg) {
+ Kryo kryo = new Kryo();
+ Output kryoOut = new Output(out);
+ kryo.writeObject(kryoOut, msg);
+ kryoOut.flush();
+ }
+
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
new file mode 100644
index 0000000..93f2898
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
@@ -0,0 +1,89 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.*;
+
+public class UDUPClient {
+ private UDUP udp;
+ private int serverPort;
+ private int timeout;
+ private int retriesCount;
+ private DatagramSocket socket;
+ private int bufsize;
+
+ UDUPClient(UDUP udp, int serverPort, int timeout, int retriesCount, int bufferSize) throws SocketException, UnknownHostException {
+ this.udp = udp;
+ this.serverPort = serverPort;
+ this.timeout = timeout;
+ this.retriesCount = retriesCount;
+ this.socket = new DatagramSocket();
+ this.bufsize = bufferSize;
+ }
+
+ // TODO make sure that retry count in message is updated correctly
+ public void sendMessage(UDUPMessage msg) throws InterruptedException {
+ String messageId = msg.getMessageId();
+
+ if (msg.getRetry() >= this.retriesCount) {
+ this.udp.removeConversation(msg.getConversationId());
+ } else {
+ this.udp.addConversation(msg);
+ try {
+ sendUDP(msg);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ msg.setRetry(msg.getRetry() + 1);
+
+ // TODO add sending message to timer with retry
+ /*
+ this.udp.executor.passMessage(new TimerSchedulerMessage(
+ "",
+ 0,
+ "",
+ this.timeout,
+ System.currentTimeMillis() / 1000L,
+ new TimerScheduledTask() {
+ @Override
+ public void run() {
+ try {
+ this.sendMessage(msg);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }));
+
+ */
+ }
+ }
+
+ private void sendUDP(UDUPMessage msg) throws IOException {
+ int offset = 0;
+ int outputSize;
+ byte[] buf = new byte[bufsize];
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+ this.udp.serialize(output, msg);
+ outputSize = output.size();
+
+ do {
+ output.write(buf, offset, bufsize);
+ System.out.println("UDP sends message: " + buf);
+ outputSize =- bufsize;
+ offset += bufsize;
+ DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort);
+ this.socket.send(packet);
+ } while (outputSize > bufsize);
+ }
+
+ void close() {
+ this.socket.close();
+ }
+
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
new file mode 100644
index 0000000..f89f986
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -0,0 +1,68 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import com.google.common.primitives.Bytes;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.*;
+import java.util.HashMap;
+
+public class UDUPServer {
+ UDUP udp;
+ private DatagramSocket socket;
+ private InetAddress address;
+ private byte[] buf;
+ private HashMap<InetAddress, byte[]> partialPackets;
+
+ public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException, UnknownHostException {
+ this.udp = udp;
+ this.socket = new DatagramSocket(port, addr);
+ this.address = addr;
+ this.buf = new byte[bufSize];
+ this.partialPackets = new HashMap<>();
+ }
+
+ public void acceptMessage() throws IOException, InterruptedException {
+ DatagramPacket packet = new DatagramPacket(buf, buf.length);
+ this.socket.receive(packet);
+ System.out.println("UDP received packet: " + packet.getData());
+
+ if (packet.getOffset() == 0) {
+ UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(packet.getData()));
+ // TODO check if not full packet anyway
+ // TODO check for errors if it's not the end od transmission
+
+ this.udp.addConversation(msg);
+ System.out.println("UDP received message " + msg.getContent().getMessageId());
+
+ if (msg.getDestinationModule() != ModuleType.UDP) {
+ this.udp.sendMessage(msg.getContent());
+ }
+ } else {
+ this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
+ }
+ }
+
+ public void addPartialMessageAndCheckSerialization(InetAddress senderAddress, byte[] packetData) {
+ if (this.partialPackets.containsKey(senderAddress)) {
+ byte[] previousPacketData = this.partialPackets.get(senderAddress);
+ byte[] allPacketData = Bytes.concat(previousPacketData, packetData);
+ try {
+ UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(allPacketData));
+ this.udp.sendMessage(msg.getContent());
+ this.partialPackets.remove(senderAddress);
+ } catch (Error | Exception e) {
+ System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest");
+ this.partialPackets.put(senderAddress, allPacketData);
+ }
+ } else {
+ this.partialPackets.put(senderAddress, packetData);
+ }
+ }
+
+ public void close() {
+ this.socket.close();
+ }
+
+}