From bfd4c421004ce35a9eac307c3ac4976caa1820ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Sun, 5 Jan 2020 18:00:56 +0100 Subject: UDUP with no serialization working --- .../edu/mimuw/cloudatlas/agent/modules/UDUP.java | 120 +++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java new file mode 100644 index 0000000..e4a7962 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java @@ -0,0 +1,120 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.primitives.Bytes; +import pl.edu.mimuw.cloudatlas.agent.EventBus; +import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Communication over UDP + * + * Client-server exchange pattern: + * server: init message + * client: ack message + * + * retry count + * retry timeout after which retry happens + ** + * udp sends initiator module success/failure information + * + * we have udps on different addresses with the same ports + * due to ValueContact design + */ + +public class UDUP extends Module implements Runnable { + public class InvalidConversation extends Exception { + public InvalidConversation(String message) { super(message); } + } + + public class InvalidContact extends Exception { + public InvalidContact(String message) { super(message); } + } + + private UDUPClient client; + private UDUPServer server; + private HashMap currentConversations; // TODO find blocking one + private final AtomicBoolean running; + + public UDUP(ModuleType moduleType, + InetAddress serverAddr, + int serverPort, + int retryTimeout, + int retriesCount, + int bufferSize) { + super(moduleType); + this.currentConversations = new HashMap<>(); + this.running = new AtomicBoolean(true); + try { + this.client = new UDUPClient(this, serverPort, retryTimeout, retriesCount, bufferSize); + this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize); + } catch (SocketException | UnknownHostException e) { + e.printStackTrace(); + this.client.close(); + this.server.close(); + this.running.getAndSet(false); + } + } + + public void run() { + System.out.println("UDP server running"); + while(this.running.get()) { + try { + this.server.acceptMessage(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + this.running.getAndSet(false); + this.server.close(); + } + } + } + + public void handleTyped(UDUPMessage event) throws InterruptedException { +// System.out.println("UDP sending message " + event.getContent().getMessageId()); + this.client.sendMessage(event); + } + + // also used for updating + public void addConversation(UDUPMessage msg) { + this.currentConversations.put(msg.getConversationId(), msg); + } + + public UDUPMessage fetchConversation(String conversationId) throws InvalidConversation { + UDUPMessage ret = this.currentConversations.get(conversationId); + if (ret == null) { + throw new InvalidConversation("Conversation does not exist"); + } else { + return ret; + } + } + + // TODO add conversation removal + public void removeConversation(String conversationId) { + this.currentConversations.remove(conversationId); + } + + public UDUPMessage deserialize(ByteArrayInputStream in) { + Kryo kryo = new Kryo(); + Input kryoInput = new Input(in); + UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class); + return msg; + } + + public void serialize(ByteArrayOutputStream out, UDUPMessage msg) { + Kryo kryo = new Kryo(); + Output kryoOut = new Output(out); + kryo.writeObject(kryoOut, msg); + kryoOut.flush(); + } + +} -- cgit v1.2.3 From ab9a470aa67ef414581145ad671e119d9edb86d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 6 Jan 2020 16:14:44 +0100 Subject: Refactor UDUP module --- .../edu/mimuw/cloudatlas/agent/modules/UDUP.java | 79 +++++----------------- 1 file changed, 16 insertions(+), 63 deletions(-) (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java') diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java index e4a7962..e616c93 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java @@ -1,19 +1,10 @@ package pl.edu.mimuw.cloudatlas.agent.modules; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.google.common.primitives.Bytes; -import pl.edu.mimuw.cloudatlas.agent.EventBus; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.SocketException; -import java.net.UnknownHostException; -import java.util.HashMap; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -32,37 +23,28 @@ import java.util.concurrent.atomic.AtomicBoolean; * due to ValueContact design */ -public class UDUP extends Module implements Runnable { - public class InvalidConversation extends Exception { - public InvalidConversation(String message) { super(message); } - } - - public class InvalidContact extends Exception { - public InvalidContact(String message) { super(message); } - } +// TODO set server port in global config - must be the same everywhere +// TODO same with buffer size +public class UDUP extends Module implements Runnable { private UDUPClient client; private UDUPServer server; - private HashMap currentConversations; // TODO find blocking one private final AtomicBoolean running; - public UDUP(ModuleType moduleType, - InetAddress serverAddr, + public UDUP(InetAddress serverAddr, int serverPort, - int retryTimeout, - int retriesCount, + int timeout, int bufferSize) { - super(moduleType); - this.currentConversations = new HashMap<>(); - this.running = new AtomicBoolean(true); + super(ModuleType.UDP); + this.running = new AtomicBoolean(false); try { - this.client = new UDUPClient(this, serverPort, retryTimeout, retriesCount, bufferSize); + this.client = new UDUPClient(this, serverPort, bufferSize); this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize); - } catch (SocketException | UnknownHostException e) { + this.running.getAndSet(true); + } catch (SocketException e) { e.printStackTrace(); this.client.close(); this.server.close(); - this.running.getAndSet(false); } } @@ -80,41 +62,12 @@ public class UDUP extends Module implements Runnable { } public void handleTyped(UDUPMessage event) throws InterruptedException { -// System.out.println("UDP sending message " + event.getContent().getMessageId()); - this.client.sendMessage(event); - } - - // also used for updating - public void addConversation(UDUPMessage msg) { - this.currentConversations.put(msg.getConversationId(), msg); - } - - public UDUPMessage fetchConversation(String conversationId) throws InvalidConversation { - UDUPMessage ret = this.currentConversations.get(conversationId); - if (ret == null) { - throw new InvalidConversation("Conversation does not exist"); - } else { - return ret; + System.out.println("UDP sending message " + event.getContent().getMessageId()); + try { + this.client.sendMessage(event); + } catch (IOException e) { + System.out.println("UDP send message failed"); + e.printStackTrace(); } } - - // TODO add conversation removal - public void removeConversation(String conversationId) { - this.currentConversations.remove(conversationId); - } - - public UDUPMessage deserialize(ByteArrayInputStream in) { - Kryo kryo = new Kryo(); - Input kryoInput = new Input(in); - UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class); - return msg; - } - - public void serialize(ByteArrayOutputStream out, UDUPMessage msg) { - Kryo kryo = new Kryo(); - Output kryoOut = new Output(out); - kryo.writeObject(kryoOut, msg); - kryoOut.flush(); - } - } -- cgit v1.2.3