From bfd4c421004ce35a9eac307c3ac4976caa1820ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Sun, 5 Jan 2020 18:00:56 +0100 Subject: UDUP with no serialization working --- .../cloudatlas/agent/messages/UDUPMessage.java | 61 +++++++++++ .../edu/mimuw/cloudatlas/agent/modules/Module.java | 11 +- .../edu/mimuw/cloudatlas/agent/modules/UDUP.java | 120 +++++++++++++++++++++ .../mimuw/cloudatlas/agent/modules/UDUPClient.java | 89 +++++++++++++++ .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 68 ++++++++++++ .../pl/edu/mimuw/cloudatlas/agent/GossipTest.java | 4 + .../pl/edu/mimuw/cloudatlas/agent/UDUPTest.java | 4 + 7 files changed, 351 insertions(+), 6 deletions(-) create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java create mode 100644 src/test/java/pl/edu/mimuw/cloudatlas/agent/GossipTest.java create mode 100644 src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java new file mode 100644 index 0000000..335d6fe --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java @@ -0,0 +1,61 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; +import pl.edu.mimuw.cloudatlas.model.ValueContact; + +public class UDUPMessage extends AgentMessage { + private ValueContact contact; + private AgentMessage content; + private int retry; + private String conversationId; + + public UDUPMessage(String messageId, long timestamp, ValueContact contact, AgentMessage content, int retry, String conversationId) { + super(messageId, ModuleType.UDP, timestamp); + this.contact = contact; + this.content = content; + this.retry = retry; + this.conversationId = conversationId; + } + + public UDUPMessage(String messageId, ValueContact contact, AgentMessage content, int retry, String conversationId) { + super(messageId, ModuleType.UDP); + this.contact = contact; + this.content = content; + this.retry = retry; + this.conversationId = conversationId; + } + + @Override + public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType { + module.handleTyped(this); + } + + public AgentMessage getContent() { + return content; + } + + public void setContent(AgentMessage content) { + this.content = content; + } + + public int getRetry() { + return retry; + } + + public String getConversationId() { + return conversationId; + } + + public void setRetry(int retry) { this.retry = retry; } + + public ValueContact getContact() { return contact; } + + public void setContact(ValueContact contact) { + this.contact = contact; + } + + public void setConversationId(String conversationId) { + this.conversationId = conversationId; + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java index 0a934cb..67fdab9 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java @@ -1,12 +1,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import pl.edu.mimuw.cloudatlas.agent.Executor; -import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.QurnikMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.RemikMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.*; /* * A Module is a (potentially stateful) event handler. @@ -49,6 +44,10 @@ public abstract class Module { throw new InvalidMessageType("Got a ResponseMessage in module " + moduleType.toString()); } + public void handleTyped(UDUPMessage message) throws InterruptedException, InvalidMessageType { + throw new InvalidMessageType("Got a ResponseMessage in module " + moduleType.toString()); + } + public void setExecutor(Executor executor) { this.executor = executor; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java new file mode 100644 index 0000000..e4a7962 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java @@ -0,0 +1,120 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.primitives.Bytes; +import pl.edu.mimuw.cloudatlas.agent.EventBus; +import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Communication over UDP + * + * Client-server exchange pattern: + * server: init message + * client: ack message + * + * retry count + * retry timeout after which retry happens + ** + * udp sends initiator module success/failure information + * + * we have udps on different addresses with the same ports + * due to ValueContact design + */ + +public class UDUP extends Module implements Runnable { + public class InvalidConversation extends Exception { + public InvalidConversation(String message) { super(message); } + } + + public class InvalidContact extends Exception { + public InvalidContact(String message) { super(message); } + } + + private UDUPClient client; + private UDUPServer server; + private HashMap currentConversations; // TODO find blocking one + private final AtomicBoolean running; + + public UDUP(ModuleType moduleType, + InetAddress serverAddr, + int serverPort, + int retryTimeout, + int retriesCount, + int bufferSize) { + super(moduleType); + this.currentConversations = new HashMap<>(); + this.running = new AtomicBoolean(true); + try { + this.client = new UDUPClient(this, serverPort, retryTimeout, retriesCount, bufferSize); + this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize); + } catch (SocketException | UnknownHostException e) { + e.printStackTrace(); + this.client.close(); + this.server.close(); + this.running.getAndSet(false); + } + } + + public void run() { + System.out.println("UDP server running"); + while(this.running.get()) { + try { + this.server.acceptMessage(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + this.running.getAndSet(false); + this.server.close(); + } + } + } + + public void handleTyped(UDUPMessage event) throws InterruptedException { +// System.out.println("UDP sending message " + event.getContent().getMessageId()); + this.client.sendMessage(event); + } + + // also used for updating + public void addConversation(UDUPMessage msg) { + this.currentConversations.put(msg.getConversationId(), msg); + } + + public UDUPMessage fetchConversation(String conversationId) throws InvalidConversation { + UDUPMessage ret = this.currentConversations.get(conversationId); + if (ret == null) { + throw new InvalidConversation("Conversation does not exist"); + } else { + return ret; + } + } + + // TODO add conversation removal + public void removeConversation(String conversationId) { + this.currentConversations.remove(conversationId); + } + + public UDUPMessage deserialize(ByteArrayInputStream in) { + Kryo kryo = new Kryo(); + Input kryoInput = new Input(in); + UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class); + return msg; + } + + public void serialize(ByteArrayOutputStream out, UDUPMessage msg) { + Kryo kryo = new Kryo(); + Output kryoOut = new Output(out); + kryo.writeObject(kryoOut, msg); + kryoOut.flush(); + } + +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java new file mode 100644 index 0000000..93f2898 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java @@ -0,0 +1,89 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.*; + +public class UDUPClient { + private UDUP udp; + private int serverPort; + private int timeout; + private int retriesCount; + private DatagramSocket socket; + private int bufsize; + + UDUPClient(UDUP udp, int serverPort, int timeout, int retriesCount, int bufferSize) throws SocketException, UnknownHostException { + this.udp = udp; + this.serverPort = serverPort; + this.timeout = timeout; + this.retriesCount = retriesCount; + this.socket = new DatagramSocket(); + this.bufsize = bufferSize; + } + + // TODO make sure that retry count in message is updated correctly + public void sendMessage(UDUPMessage msg) throws InterruptedException { + String messageId = msg.getMessageId(); + + if (msg.getRetry() >= this.retriesCount) { + this.udp.removeConversation(msg.getConversationId()); + } else { + this.udp.addConversation(msg); + try { + sendUDP(msg); + } catch (IOException e) { + e.printStackTrace(); + } + + msg.setRetry(msg.getRetry() + 1); + + // TODO add sending message to timer with retry + /* + this.udp.executor.passMessage(new TimerSchedulerMessage( + "", + 0, + "", + this.timeout, + System.currentTimeMillis() / 1000L, + new TimerScheduledTask() { + @Override + public void run() { + try { + this.sendMessage(msg); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + })); + + */ + } + } + + private void sendUDP(UDUPMessage msg) throws IOException { + int offset = 0; + int outputSize; + byte[] buf = new byte[bufsize]; + ByteArrayOutputStream output = new ByteArrayOutputStream(); + + this.udp.serialize(output, msg); + outputSize = output.size(); + + do { + output.write(buf, offset, bufsize); + System.out.println("UDP sends message: " + buf); + outputSize =- bufsize; + offset += bufsize; + DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort); + this.socket.send(packet); + } while (outputSize > bufsize); + } + + void close() { + this.socket.close(); + } + +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java new file mode 100644 index 0000000..f89f986 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -0,0 +1,68 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import com.google.common.primitives.Bytes; +import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.net.*; +import java.util.HashMap; + +public class UDUPServer { + UDUP udp; + private DatagramSocket socket; + private InetAddress address; + private byte[] buf; + private HashMap partialPackets; + + public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException, UnknownHostException { + this.udp = udp; + this.socket = new DatagramSocket(port, addr); + this.address = addr; + this.buf = new byte[bufSize]; + this.partialPackets = new HashMap<>(); + } + + public void acceptMessage() throws IOException, InterruptedException { + DatagramPacket packet = new DatagramPacket(buf, buf.length); + this.socket.receive(packet); + System.out.println("UDP received packet: " + packet.getData()); + + if (packet.getOffset() == 0) { + UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(packet.getData())); + // TODO check if not full packet anyway + // TODO check for errors if it's not the end od transmission + + this.udp.addConversation(msg); + System.out.println("UDP received message " + msg.getContent().getMessageId()); + + if (msg.getDestinationModule() != ModuleType.UDP) { + this.udp.sendMessage(msg.getContent()); + } + } else { + this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); + } + } + + public void addPartialMessageAndCheckSerialization(InetAddress senderAddress, byte[] packetData) { + if (this.partialPackets.containsKey(senderAddress)) { + byte[] previousPacketData = this.partialPackets.get(senderAddress); + byte[] allPacketData = Bytes.concat(previousPacketData, packetData); + try { + UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(allPacketData)); + this.udp.sendMessage(msg.getContent()); + this.partialPackets.remove(senderAddress); + } catch (Error | Exception e) { + System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest"); + this.partialPackets.put(senderAddress, allPacketData); + } + } else { + this.partialPackets.put(senderAddress, packetData); + } + } + + public void close() { + this.socket.close(); + } + +} diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/GossipTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/GossipTest.java new file mode 100644 index 0000000..c56f00a --- /dev/null +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/GossipTest.java @@ -0,0 +1,4 @@ +package pl.edu.mimuw.cloudatlas.agent; + +public class GossipTest { +} diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java new file mode 100644 index 0000000..975c918 --- /dev/null +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java @@ -0,0 +1,4 @@ +package pl.edu.mimuw.cloudatlas.agent; + +public class UDUPTest { +} -- cgit v1.2.3 From 55fecc86b43cc46fd949ab9b6c6910d2194e6cb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Sun, 5 Jan 2020 18:01:45 +0100 Subject: Add UDUP basic test --- .../pl/edu/mimuw/cloudatlas/agent/UDUPTest.java | 182 +++++++++++++++++++++ 1 file changed, 182 insertions(+) diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java index 975c918..545ad56 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java @@ -1,4 +1,186 @@ package pl.edu.mimuw.cloudatlas.agent; +import org.junit.*; +import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; +import pl.edu.mimuw.cloudatlas.agent.modules.UDUP; +import pl.edu.mimuw.cloudatlas.model.PathName; +import pl.edu.mimuw.cloudatlas.model.ValueContact; + +import java.net.InetAddress; +import java.net.UnknownHostException; + public class UDUPTest { + + @Test + public void messageBetweenUDUPs() { + UDUP udp1 = null; + UDUP udp2 = null; + UDUPMessage msg1 = null; + boolean testSuccess = true; + + try { + System.out.println("Starting udp1"); + + udp1 = new UDUP( + ModuleType.UDP, + InetAddress.getByName("127.0.0.2"), + 5999, + 1000, + 5000, + 20000); + + System.out.println("Starting udp2"); + + udp2 = new UDUP( + ModuleType.UDP, + InetAddress.getByName("127.0.0.3"), + 5999, + 1000, + 5000, + 20000); + + msg1 = new UDUPMessage( + "1", + new ValueContact(new PathName("/udp2"), InetAddress.getByName("127.0.0.3")), + null, //new GetStateMessage("getstate1", 0, ModuleType.UDP, 1), + 0, + "conv1"); + + } catch (UnknownHostException e) { + e.printStackTrace(); + } + + Thread udpThread1 = new Thread(udp1); + udpThread1.start(); + Thread udpThread2 = new Thread(udp2); + udpThread2.start(); + + try { + Thread.sleep(5000); + System.out.println("Sending message"); + if (udp1 == null | udp2 == null) { + Assert.fail("UDPs not initialized"); + } else { + udp1.handle(msg1); + Thread.sleep(10000); + UDUPMessage conv = udp2.fetchConversation("conv1"); + if (!conv.getConversationId().equals("conv1")) { + testSuccess = false; + } + } + } catch (InterruptedException | Module.InvalidMessageType e) { + e.printStackTrace(); + testSuccess = false; + } catch (UDUP.InvalidConversation invalidConversation) { + System.out.println("Invalid conversation"); + testSuccess = false; + } + + udpThread1.interrupt(); + udpThread2.interrupt(); + + if (testSuccess) { + Assert.assertTrue(true); + } else { + Assert.fail(); + } + } +/* + @Test + public void bigMessageBetweenUDUPs() { + UDUP udp1 = new UDUP( + ModuleType.UDP, + 5999, + 1000, + 5000, + 256); + + UDUP udp2 = new UDUP( + ModuleType.UDP, + 5998, + 1000, + 5000, + 256); + + AttributesMap bigAttrib = new AttributesMap(); + for (Long i = 1L; i < 20; i++) { + bigAttrib.add(i.toString(), new ValueInt(i)); + } + + UDUPMessage msg1 = new UDUPMessage( + "udp1", + ModuleType.UDP, + "localhost", + 5998, + new UpdateAttributesMessage("updateattrib1", 0,"/", bigAttrib), + 0, + "conv1"); + + try { + udp1.handle(msg1); + Thread.sleep(1000); + UDUPMessage conv = udp2.fetchConversation("conv1"); + Assert.assertEquals(conv.getConversationId(), "conv1"); + } catch (InterruptedException | Module.InvalidMessageType e) { + e.printStackTrace(); + } catch (UDUP.InvalidConversation noConversationError) { + Assert.fail(); + } + } + + + @Test + public void sendMultipleMessages() { + @Test + public void messageBetweenUDUPs() { + UDUP udp1 = new UDUP( + ModuleType.UDP, + 5999, + 1000, + 5000, + 3, + 256); + + UDUP udp2 = new UDUP( + ModuleType.UDP, + 5998, + 1000, + 5000, + 3, + 256); + + UDUPMessage msg1 = new UDUPMessage( + "1", + ModuleType.UDP, + "localhost", + 5998, + null, + 0, + "conv1"); + + + + + try { + udp1.handle(msg1); + udp1.handle(msg2); + udp1.handle(msg3); + Thread.sleep(1000); + UDUPMessage conv1 = udp2.fetchConversation("conv1"); + Assert.assertEquals(conv1.getConversationId(), "conv1"); + UDUPMessage conv2 = udp2.fetchConversation("conv2"); + Assert.assertEquals(conv2.getConversationId(), "conv2"); + UDUPMessage conv3 = udp2.fetchConversation("conv3"); + Assert.assertEquals(conv3.getConversationId(), "conv3"); + } catch (InterruptedException | Module.InvalidMessageType e) { + e.printStackTrace(); + } catch (UDUP.InvalidConversation invalidConversation) { + Assert.fail(); + } + } + } +*/ } -- cgit v1.2.3 From aa7278e17afb5129034bbe1af1cf4ca3c6ba3e90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 6 Jan 2020 12:18:13 +0100 Subject: Add UDUP basic test --- .../java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java new file mode 100644 index 0000000..3196a97 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java @@ -0,0 +1,4 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +public class UDUPSerializer { +} -- cgit v1.2.3 From ab9a470aa67ef414581145ad671e119d9edb86d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 6 Jan 2020 16:14:44 +0100 Subject: Refactor UDUP module --- .../edu/mimuw/cloudatlas/agent/modules/UDUP.java | 79 ++++-------------- .../mimuw/cloudatlas/agent/modules/UDUPClient.java | 64 +++------------ .../cloudatlas/agent/modules/UDUPSerializer.java | 94 ++++++++++++++++++++++ .../mimuw/cloudatlas/agent/modules/UDUPServer.java | 30 +++---- .../pl/edu/mimuw/cloudatlas/agent/GossipTest.java | 4 - 5 files changed, 137 insertions(+), 134 deletions(-) delete mode 100644 src/test/java/pl/edu/mimuw/cloudatlas/agent/GossipTest.java diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java index e4a7962..e616c93 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java @@ -1,19 +1,10 @@ package pl.edu.mimuw.cloudatlas.agent.modules; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.google.common.primitives.Bytes; -import pl.edu.mimuw.cloudatlas.agent.EventBus; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.SocketException; -import java.net.UnknownHostException; -import java.util.HashMap; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -32,37 +23,28 @@ import java.util.concurrent.atomic.AtomicBoolean; * due to ValueContact design */ -public class UDUP extends Module implements Runnable { - public class InvalidConversation extends Exception { - public InvalidConversation(String message) { super(message); } - } - - public class InvalidContact extends Exception { - public InvalidContact(String message) { super(message); } - } +// TODO set server port in global config - must be the same everywhere +// TODO same with buffer size +public class UDUP extends Module implements Runnable { private UDUPClient client; private UDUPServer server; - private HashMap currentConversations; // TODO find blocking one private final AtomicBoolean running; - public UDUP(ModuleType moduleType, - InetAddress serverAddr, + public UDUP(InetAddress serverAddr, int serverPort, - int retryTimeout, - int retriesCount, + int timeout, int bufferSize) { - super(moduleType); - this.currentConversations = new HashMap<>(); - this.running = new AtomicBoolean(true); + super(ModuleType.UDP); + this.running = new AtomicBoolean(false); try { - this.client = new UDUPClient(this, serverPort, retryTimeout, retriesCount, bufferSize); + this.client = new UDUPClient(this, serverPort, bufferSize); this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize); - } catch (SocketException | UnknownHostException e) { + this.running.getAndSet(true); + } catch (SocketException e) { e.printStackTrace(); this.client.close(); this.server.close(); - this.running.getAndSet(false); } } @@ -80,41 +62,12 @@ public class UDUP extends Module implements Runnable { } public void handleTyped(UDUPMessage event) throws InterruptedException { -// System.out.println("UDP sending message " + event.getContent().getMessageId()); - this.client.sendMessage(event); - } - - // also used for updating - public void addConversation(UDUPMessage msg) { - this.currentConversations.put(msg.getConversationId(), msg); - } - - public UDUPMessage fetchConversation(String conversationId) throws InvalidConversation { - UDUPMessage ret = this.currentConversations.get(conversationId); - if (ret == null) { - throw new InvalidConversation("Conversation does not exist"); - } else { - return ret; + System.out.println("UDP sending message " + event.getContent().getMessageId()); + try { + this.client.sendMessage(event); + } catch (IOException e) { + System.out.println("UDP send message failed"); + e.printStackTrace(); } } - - // TODO add conversation removal - public void removeConversation(String conversationId) { - this.currentConversations.remove(conversationId); - } - - public UDUPMessage deserialize(ByteArrayInputStream in) { - Kryo kryo = new Kryo(); - Input kryoInput = new Input(in); - UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class); - return msg; - } - - public void serialize(ByteArrayOutputStream out, UDUPMessage msg) { - Kryo kryo = new Kryo(); - Output kryoOut = new Output(out); - kryo.writeObject(kryoOut, msg); - kryoOut.flush(); - } - } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java index 93f2898..82aaeb1 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java @@ -1,83 +1,41 @@ package pl.edu.mimuw.cloudatlas.agent.modules; -import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.*; public class UDUPClient { private UDUP udp; + private UDUPSerializer serializer; private int serverPort; - private int timeout; - private int retriesCount; private DatagramSocket socket; private int bufsize; - UDUPClient(UDUP udp, int serverPort, int timeout, int retriesCount, int bufferSize) throws SocketException, UnknownHostException { + UDUPClient(UDUP udp, int serverPort, int bufferSize) throws SocketException { this.udp = udp; this.serverPort = serverPort; - this.timeout = timeout; - this.retriesCount = retriesCount; this.socket = new DatagramSocket(); this.bufsize = bufferSize; + this.serializer = new UDUPSerializer(); } - // TODO make sure that retry count in message is updated correctly - public void sendMessage(UDUPMessage msg) throws InterruptedException { - String messageId = msg.getMessageId(); - - if (msg.getRetry() >= this.retriesCount) { - this.udp.removeConversation(msg.getConversationId()); - } else { - this.udp.addConversation(msg); - try { - sendUDP(msg); - } catch (IOException e) { - e.printStackTrace(); - } - - msg.setRetry(msg.getRetry() + 1); - - // TODO add sending message to timer with retry - /* - this.udp.executor.passMessage(new TimerSchedulerMessage( - "", - 0, - "", - this.timeout, - System.currentTimeMillis() / 1000L, - new TimerScheduledTask() { - @Override - public void run() { - try { - this.sendMessage(msg); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - })); - - */ - } - } - - private void sendUDP(UDUPMessage msg) throws IOException { + public void sendMessage(UDUPMessage msg) throws IOException { int offset = 0; int outputSize; - byte[] buf = new byte[bufsize]; - ByteArrayOutputStream output = new ByteArrayOutputStream(); - this.udp.serialize(output, msg); - outputSize = output.size(); + byte[] buf = this.serializer.serialize(msg); + outputSize = buf.length; do { - output.write(buf, offset, bufsize); - System.out.println("UDP sends message: " + buf); outputSize =- bufsize; offset += bufsize; DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort); + System.out.println("UDP sends message: "); + for (byte b : buf) { + System.out.print(b); + } + System.out.println("to " + packet.getAddress()); this.socket.send(packet); } while (outputSize > bufsize); } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java index 3196a97..ac35265 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java @@ -1,4 +1,98 @@ package pl.edu.mimuw.cloudatlas.agent.modules; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.model.PathName; +import pl.edu.mimuw.cloudatlas.model.ValueContact; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.UnknownHostException; + +/** + * Serializes classes to and from byte arrays for UDP use + */ public class UDUPSerializer { + private Kryo kryo; + + UDUPSerializer() { + kryo = new Kryo(); + kryo.setReferences(true); + kryo.setRegistrationRequired(true); + registerClasses(); + } + + private void registerClasses() { + + kryo.register(Inet4Address.class, new Serializer() { + + @Override + public void write(Kryo kryo, Output output, Object object) { + InetAddress ia = (InetAddress) object; + kryo.writeObject(output, ia.getAddress()); + } + + @Override + public Object read(Kryo kryo, Input input, Class type) { + try { + byte[] buf = kryo.readObject(input, byte[].class); + InetAddress addr = Inet4Address.getByAddress(buf); + return addr; + } catch (UnknownHostException e) { + System.out.println("Custom InetAddress read failed"); + e.printStackTrace(); + return null; + } + } + }); + + kryo.register(PathName.class, new Serializer() { + + @Override + public void write(Kryo kryo, Output output, Object object) { + PathName pn = (PathName) object; + kryo.writeObject(output, pn.getName()); + } + + @Override + public Object read(Kryo kryo, Input input, Class type) { + String addr = input.readString(); + return new PathName(addr); + } + }); + + kryo.register(byte[].class); + kryo.register(ValueContact.class); + kryo.register(ModuleType.class); + + kryo.register(AgentMessage.class); + kryo.register(GetStateMessage.class); + kryo.register(UDUPMessage.class); + kryo.register(StanikMessage.Type.class); + kryo.register(StanikMessage.class); + } + + public UDUPMessage deserialize(byte[] packetData) { + ByteArrayInputStream in = new ByteArrayInputStream(packetData); + Input kryoInput = new Input(in); + UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class); + return msg; + } + + public byte[] serialize(UDUPMessage msg) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Output kryoOut = new Output(out); + kryo.writeObject(kryoOut, msg); + kryoOut.flush(); + kryoOut.close(); + return out.toByteArray(); + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index f89f986..9a64bbd 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -3,41 +3,43 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import com.google.common.primitives.Bytes; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.*; import java.util.HashMap; public class UDUPServer { - UDUP udp; + private UDUP udp; + private UDUPSerializer serializer; private DatagramSocket socket; private InetAddress address; - private byte[] buf; private HashMap partialPackets; + private int bufSize; - public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException, UnknownHostException { + public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException { this.udp = udp; this.socket = new DatagramSocket(port, addr); this.address = addr; - this.buf = new byte[bufSize]; + this.bufSize = bufSize; this.partialPackets = new HashMap<>(); + this.serializer = new UDUPSerializer(); } public void acceptMessage() throws IOException, InterruptedException { + byte[] buf = new byte[bufSize]; DatagramPacket packet = new DatagramPacket(buf, buf.length); this.socket.receive(packet); - System.out.println("UDP received packet: " + packet.getData()); + System.out.println("UDP " + this.address + " received packet from " + packet.getAddress()); if (packet.getOffset() == 0) { - UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(packet.getData())); - // TODO check if not full packet anyway - // TODO check for errors if it's not the end od transmission - - this.udp.addConversation(msg); + UDUPMessage msg = this.serializer.deserialize(packet.getData()); System.out.println("UDP received message " + msg.getContent().getMessageId()); - if (msg.getDestinationModule() != ModuleType.UDP) { - this.udp.sendMessage(msg.getContent()); + if (packet.getLength() == this.bufSize) { + this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); + } else { + if (msg.getDestinationModule() != ModuleType.UDP) { + this.udp.sendMessage(msg.getContent()); + } } } else { this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); @@ -49,7 +51,7 @@ public class UDUPServer { byte[] previousPacketData = this.partialPackets.get(senderAddress); byte[] allPacketData = Bytes.concat(previousPacketData, packetData); try { - UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(allPacketData)); + UDUPMessage msg = this.serializer.deserialize(allPacketData); this.udp.sendMessage(msg.getContent()); this.partialPackets.remove(senderAddress); } catch (Error | Exception e) { diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/GossipTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/GossipTest.java deleted file mode 100644 index c56f00a..0000000 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/GossipTest.java +++ /dev/null @@ -1,4 +0,0 @@ -package pl.edu.mimuw.cloudatlas.agent; - -public class GossipTest { -} -- cgit v1.2.3 From f8e0f371515a825ead0bf385105b461223c55e71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 6 Jan 2020 16:17:18 +0100 Subject: Refactor UDUP message --- .../cloudatlas/agent/messages/UDUPMessage.java | 28 +++++----------------- 1 file changed, 6 insertions(+), 22 deletions(-) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java index 335d6fe..fa8d1fa 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java @@ -7,23 +7,21 @@ import pl.edu.mimuw.cloudatlas.model.ValueContact; public class UDUPMessage extends AgentMessage { private ValueContact contact; private AgentMessage content; - private int retry; - private String conversationId; - public UDUPMessage(String messageId, long timestamp, ValueContact contact, AgentMessage content, int retry, String conversationId) { + public UDUPMessage(String messageId, long timestamp, ValueContact contact, AgentMessage content) { super(messageId, ModuleType.UDP, timestamp); this.contact = contact; this.content = content; - this.retry = retry; - this.conversationId = conversationId; } - public UDUPMessage(String messageId, ValueContact contact, AgentMessage content, int retry, String conversationId) { + public UDUPMessage(String messageId, ValueContact contact, AgentMessage content) { super(messageId, ModuleType.UDP); this.contact = contact; this.content = content; - this.retry = retry; - this.conversationId = conversationId; + } + + public UDUPMessage() { + super("", ModuleType.UDP); } @Override @@ -39,23 +37,9 @@ public class UDUPMessage extends AgentMessage { this.content = content; } - public int getRetry() { - return retry; - } - - public String getConversationId() { - return conversationId; - } - - public void setRetry(int retry) { this.retry = retry; } - public ValueContact getContact() { return contact; } public void setContact(ValueContact contact) { this.contact = contact; } - - public void setConversationId(String conversationId) { - this.conversationId = conversationId; - } } -- cgit v1.2.3 From 6027e8ffcc91495dc1cfe76da925e846f57646e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 6 Jan 2020 16:32:45 +0100 Subject: Fix passing messages in server --- src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index 9a64bbd..6807a86 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -37,7 +37,9 @@ public class UDUPServer { if (packet.getLength() == this.bufSize) { this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); } else { - if (msg.getDestinationModule() != ModuleType.UDP) { + if (msg.getContent().getDestinationModule() == ModuleType.TEST) { + System.out.println("UDP server: test message received"); + } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) { this.udp.sendMessage(msg.getContent()); } } -- cgit v1.2.3 From db22fe6c09a33c2cf5a7936442ccb7b8307ae84e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 6 Jan 2020 16:35:16 +0100 Subject: Improve existing test --- .../pl/edu/mimuw/cloudatlas/agent/UDUPTest.java | 26 +++++++--------------- 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java index 545ad56..e4f601a 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java @@ -1,7 +1,6 @@ package pl.edu.mimuw.cloudatlas.agent; import org.junit.*; -import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; import pl.edu.mimuw.cloudatlas.agent.modules.Module; import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; @@ -25,29 +24,27 @@ public class UDUPTest { System.out.println("Starting udp1"); udp1 = new UDUP( - ModuleType.UDP, InetAddress.getByName("127.0.0.2"), 5999, - 1000, 5000, - 20000); + 1000); System.out.println("Starting udp2"); udp2 = new UDUP( - ModuleType.UDP, InetAddress.getByName("127.0.0.3"), 5999, - 1000, 5000, - 20000); + 1000); + + UDUPMessage testContent = new UDUPMessage(); + testContent.setDestinationModule(ModuleType.TEST); msg1 = new UDUPMessage( - "1", + "udup1", new ValueContact(new PathName("/udp2"), InetAddress.getByName("127.0.0.3")), - null, //new GetStateMessage("getstate1", 0, ModuleType.UDP, 1), - 0, - "conv1"); + testContent + ); } catch (UnknownHostException e) { e.printStackTrace(); @@ -66,17 +63,10 @@ public class UDUPTest { } else { udp1.handle(msg1); Thread.sleep(10000); - UDUPMessage conv = udp2.fetchConversation("conv1"); - if (!conv.getConversationId().equals("conv1")) { - testSuccess = false; - } } } catch (InterruptedException | Module.InvalidMessageType e) { e.printStackTrace(); testSuccess = false; - } catch (UDUP.InvalidConversation invalidConversation) { - System.out.println("Invalid conversation"); - testSuccess = false; } udpThread1.interrupt(); -- cgit v1.2.3 From 360cb66d187b64ae51c792d87d0d425e73b48d0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magdalena=20Grodzi=C5=84ska?= Date: Mon, 6 Jan 2020 18:02:20 +0100 Subject: Add udup module initialization --- src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 0efa710..ef4f48d 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -1,5 +1,8 @@ package pl.edu.mimuw.cloudatlas.agent; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; import java.rmi.server.UnicastRemoteObject; @@ -12,14 +15,8 @@ import pl.edu.mimuw.cloudatlas.agent.NewApiImplementation; import pl.edu.mimuw.cloudatlas.agent.messages.RunQueriesMessage; import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; +import pl.edu.mimuw.cloudatlas.agent.modules.*; import pl.edu.mimuw.cloudatlas.agent.modules.Module; -import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; -import pl.edu.mimuw.cloudatlas.agent.modules.Qurnik; -import pl.edu.mimuw.cloudatlas.agent.modules.Remik; -import pl.edu.mimuw.cloudatlas.agent.modules.Stanik; -import pl.edu.mimuw.cloudatlas.agent.modules.RecursiveScheduledTask; -import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask; -import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduler; import pl.edu.mimuw.cloudatlas.api.Api; import pl.edu.mimuw.cloudatlas.interpreter.Main; import pl.edu.mimuw.cloudatlas.model.PathName; @@ -48,6 +45,11 @@ public class Agent { modules.put(ModuleType.RMI, new Remik()); modules.put(ModuleType.STATE, new Stanik()); modules.put(ModuleType.QUERY, new Qurnik()); + try { + modules.put(ModuleType.UDP, new UDUP(InetAddress.getByName("127.0.0.1"), 5988, 5000, 20000)); + } catch (UnknownHostException e) { + e.printStackTrace(); + } // TODO add modules as we implement them return modules; } -- cgit v1.2.3