m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMagdalena GrodziƄska <mag.grodzinska@gmail.com>2020-01-06 18:28:13 +0100
committerGitHub <noreply@github.com>2020-01-06 18:28:13 +0100
commit2feff1aa41c41008fcda2dd60c718cf09deb3fa1 (patch)
tree36c774a7b2cda6ace2d4510267d09ef0e0bba6d8 /src
parentedd8c9d09b89a68a0d654dd6b08dc23a22d50293 (diff)
parent360cb66d187b64ae51c792d87d0d425e73b48d0a (diff)
Merge pull request #86 from m-chrzan/UDUP
Udup
Diffstat (limited to 'src')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java16
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java45
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java11
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java73
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java47
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java98
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java72
-rw-r--r--src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java176
8 files changed, 525 insertions, 13 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
index 26f0e0b..256fa6b 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
@@ -1,5 +1,8 @@
package pl.edu.mimuw.cloudatlas.agent;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
@@ -12,14 +15,8 @@ import pl.edu.mimuw.cloudatlas.agent.NewApiImplementation;
import pl.edu.mimuw.cloudatlas.agent.messages.RunQueriesMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage;
+import pl.edu.mimuw.cloudatlas.agent.modules.*;
import pl.edu.mimuw.cloudatlas.agent.modules.Module;
-import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
-import pl.edu.mimuw.cloudatlas.agent.modules.Qurnik;
-import pl.edu.mimuw.cloudatlas.agent.modules.Remik;
-import pl.edu.mimuw.cloudatlas.agent.modules.Stanik;
-import pl.edu.mimuw.cloudatlas.agent.modules.RecursiveScheduledTask;
-import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask;
-import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduler;
import pl.edu.mimuw.cloudatlas.api.Api;
import pl.edu.mimuw.cloudatlas.interpreter.Main;
import pl.edu.mimuw.cloudatlas.model.PathName;
@@ -49,6 +46,11 @@ public class Agent {
Long freshnessPeriod = new Long(System.getProperty("freshness_period"));
modules.put(ModuleType.STATE, new Stanik(freshnessPeriod));
modules.put(ModuleType.QUERY, new Qurnik());
+ try {
+ modules.put(ModuleType.UDP, new UDUP(InetAddress.getByName("127.0.0.1"), 5988, 5000, 20000));
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
// TODO add modules as we implement them
return modules;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
new file mode 100644
index 0000000..fa8d1fa
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
@@ -0,0 +1,45 @@
+package pl.edu.mimuw.cloudatlas.agent.messages;
+
+import pl.edu.mimuw.cloudatlas.agent.modules.Module;
+import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
+import pl.edu.mimuw.cloudatlas.model.ValueContact;
+
+public class UDUPMessage extends AgentMessage {
+ private ValueContact contact;
+ private AgentMessage content;
+
+ public UDUPMessage(String messageId, long timestamp, ValueContact contact, AgentMessage content) {
+ super(messageId, ModuleType.UDP, timestamp);
+ this.contact = contact;
+ this.content = content;
+ }
+
+ public UDUPMessage(String messageId, ValueContact contact, AgentMessage content) {
+ super(messageId, ModuleType.UDP);
+ this.contact = contact;
+ this.content = content;
+ }
+
+ public UDUPMessage() {
+ super("", ModuleType.UDP);
+ }
+
+ @Override
+ public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType {
+ module.handleTyped(this);
+ }
+
+ public AgentMessage getContent() {
+ return content;
+ }
+
+ public void setContent(AgentMessage content) {
+ this.content = content;
+ }
+
+ public ValueContact getContact() { return contact; }
+
+ public void setContact(ValueContact contact) {
+ this.contact = contact;
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
index 0a934cb..67fdab9 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
@@ -1,12 +1,7 @@
package pl.edu.mimuw.cloudatlas.agent.modules;
import pl.edu.mimuw.cloudatlas.agent.Executor;
-import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.QurnikMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.RemikMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.*;
/*
* A Module is a (potentially stateful) event handler.
@@ -49,6 +44,10 @@ public abstract class Module {
throw new InvalidMessageType("Got a ResponseMessage in module " + moduleType.toString());
}
+ public void handleTyped(UDUPMessage message) throws InterruptedException, InvalidMessageType {
+ throw new InvalidMessageType("Got a ResponseMessage in module " + moduleType.toString());
+ }
+
public void setExecutor(Executor executor) {
this.executor = executor;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
new file mode 100644
index 0000000..e616c93
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
@@ -0,0 +1,73 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Communication over UDP
+ *
+ * Client-server exchange pattern:
+ * server: init message
+ * client: ack message
+ *
+ * retry count
+ * retry timeout after which retry happens
+ **
+ * udp sends initiator module success/failure information
+ *
+ * we have udps on different addresses with the same ports
+ * due to ValueContact design
+ */
+
+// TODO set server port in global config - must be the same everywhere
+// TODO same with buffer size
+
+public class UDUP extends Module implements Runnable {
+ private UDUPClient client;
+ private UDUPServer server;
+ private final AtomicBoolean running;
+
+ public UDUP(InetAddress serverAddr,
+ int serverPort,
+ int timeout,
+ int bufferSize) {
+ super(ModuleType.UDP);
+ this.running = new AtomicBoolean(false);
+ try {
+ this.client = new UDUPClient(this, serverPort, bufferSize);
+ this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize);
+ this.running.getAndSet(true);
+ } catch (SocketException e) {
+ e.printStackTrace();
+ this.client.close();
+ this.server.close();
+ }
+ }
+
+ public void run() {
+ System.out.println("UDP server running");
+ while(this.running.get()) {
+ try {
+ this.server.acceptMessage();
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ this.running.getAndSet(false);
+ this.server.close();
+ }
+ }
+ }
+
+ public void handleTyped(UDUPMessage event) throws InterruptedException {
+ System.out.println("UDP sending message " + event.getContent().getMessageId());
+ try {
+ this.client.sendMessage(event);
+ } catch (IOException e) {
+ System.out.println("UDP send message failed");
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
new file mode 100644
index 0000000..82aaeb1
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
@@ -0,0 +1,47 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+
+import java.io.IOException;
+import java.net.*;
+
+public class UDUPClient {
+ private UDUP udp;
+ private UDUPSerializer serializer;
+ private int serverPort;
+ private DatagramSocket socket;
+ private int bufsize;
+
+ UDUPClient(UDUP udp, int serverPort, int bufferSize) throws SocketException {
+ this.udp = udp;
+ this.serverPort = serverPort;
+ this.socket = new DatagramSocket();
+ this.bufsize = bufferSize;
+ this.serializer = new UDUPSerializer();
+ }
+
+ public void sendMessage(UDUPMessage msg) throws IOException {
+ int offset = 0;
+ int outputSize;
+
+ byte[] buf = this.serializer.serialize(msg);
+ outputSize = buf.length;
+
+ do {
+ outputSize =- bufsize;
+ offset += bufsize;
+ DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort);
+ System.out.println("UDP sends message: ");
+ for (byte b : buf) {
+ System.out.print(b);
+ }
+ System.out.println("to " + packet.getAddress());
+ this.socket.send(packet);
+ } while (outputSize > bufsize);
+ }
+
+ void close() {
+ this.socket.close();
+ }
+
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
new file mode 100644
index 0000000..ac35265
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
@@ -0,0 +1,98 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+import pl.edu.mimuw.cloudatlas.model.PathName;
+import pl.edu.mimuw.cloudatlas.model.ValueContact;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * Serializes classes to and from byte arrays for UDP use
+ */
+public class UDUPSerializer {
+ private Kryo kryo;
+
+ UDUPSerializer() {
+ kryo = new Kryo();
+ kryo.setReferences(true);
+ kryo.setRegistrationRequired(true);
+ registerClasses();
+ }
+
+ private void registerClasses() {
+
+ kryo.register(Inet4Address.class, new Serializer() {
+
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ InetAddress ia = (InetAddress) object;
+ kryo.writeObject(output, ia.getAddress());
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
+ try {
+ byte[] buf = kryo.readObject(input, byte[].class);
+ InetAddress addr = Inet4Address.getByAddress(buf);
+ return addr;
+ } catch (UnknownHostException e) {
+ System.out.println("Custom InetAddress read failed");
+ e.printStackTrace();
+ return null;
+ }
+ }
+ });
+
+ kryo.register(PathName.class, new Serializer() {
+
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ PathName pn = (PathName) object;
+ kryo.writeObject(output, pn.getName());
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
+ String addr = input.readString();
+ return new PathName(addr);
+ }
+ });
+
+ kryo.register(byte[].class);
+ kryo.register(ValueContact.class);
+ kryo.register(ModuleType.class);
+
+ kryo.register(AgentMessage.class);
+ kryo.register(GetStateMessage.class);
+ kryo.register(UDUPMessage.class);
+ kryo.register(StanikMessage.Type.class);
+ kryo.register(StanikMessage.class);
+ }
+
+ public UDUPMessage deserialize(byte[] packetData) {
+ ByteArrayInputStream in = new ByteArrayInputStream(packetData);
+ Input kryoInput = new Input(in);
+ UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class);
+ return msg;
+ }
+
+ public byte[] serialize(UDUPMessage msg) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Output kryoOut = new Output(out);
+ kryo.writeObject(kryoOut, msg);
+ kryoOut.flush();
+ kryoOut.close();
+ return out.toByteArray();
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
new file mode 100644
index 0000000..6807a86
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -0,0 +1,72 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import com.google.common.primitives.Bytes;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+
+import java.io.IOException;
+import java.net.*;
+import java.util.HashMap;
+
+public class UDUPServer {
+ private UDUP udp;
+ private UDUPSerializer serializer;
+ private DatagramSocket socket;
+ private InetAddress address;
+ private HashMap<InetAddress, byte[]> partialPackets;
+ private int bufSize;
+
+ public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException {
+ this.udp = udp;
+ this.socket = new DatagramSocket(port, addr);
+ this.address = addr;
+ this.bufSize = bufSize;
+ this.partialPackets = new HashMap<>();
+ this.serializer = new UDUPSerializer();
+ }
+
+ public void acceptMessage() throws IOException, InterruptedException {
+ byte[] buf = new byte[bufSize];
+ DatagramPacket packet = new DatagramPacket(buf, buf.length);
+ this.socket.receive(packet);
+ System.out.println("UDP " + this.address + " received packet from " + packet.getAddress());
+
+ if (packet.getOffset() == 0) {
+ UDUPMessage msg = this.serializer.deserialize(packet.getData());
+ System.out.println("UDP received message " + msg.getContent().getMessageId());
+
+ if (packet.getLength() == this.bufSize) {
+ this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
+ } else {
+ if (msg.getContent().getDestinationModule() == ModuleType.TEST) {
+ System.out.println("UDP server: test message received");
+ } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) {
+ this.udp.sendMessage(msg.getContent());
+ }
+ }
+ } else {
+ this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
+ }
+ }
+
+ public void addPartialMessageAndCheckSerialization(InetAddress senderAddress, byte[] packetData) {
+ if (this.partialPackets.containsKey(senderAddress)) {
+ byte[] previousPacketData = this.partialPackets.get(senderAddress);
+ byte[] allPacketData = Bytes.concat(previousPacketData, packetData);
+ try {
+ UDUPMessage msg = this.serializer.deserialize(allPacketData);
+ this.udp.sendMessage(msg.getContent());
+ this.partialPackets.remove(senderAddress);
+ } catch (Error | Exception e) {
+ System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest");
+ this.partialPackets.put(senderAddress, allPacketData);
+ }
+ } else {
+ this.partialPackets.put(senderAddress, packetData);
+ }
+ }
+
+ public void close() {
+ this.socket.close();
+ }
+
+}
diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java
new file mode 100644
index 0000000..e4f601a
--- /dev/null
+++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java
@@ -0,0 +1,176 @@
+package pl.edu.mimuw.cloudatlas.agent;
+
+import org.junit.*;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+import pl.edu.mimuw.cloudatlas.agent.modules.Module;
+import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
+import pl.edu.mimuw.cloudatlas.agent.modules.UDUP;
+import pl.edu.mimuw.cloudatlas.model.PathName;
+import pl.edu.mimuw.cloudatlas.model.ValueContact;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class UDUPTest {
+
+ @Test
+ public void messageBetweenUDUPs() {
+ UDUP udp1 = null;
+ UDUP udp2 = null;
+ UDUPMessage msg1 = null;
+ boolean testSuccess = true;
+
+ try {
+ System.out.println("Starting udp1");
+
+ udp1 = new UDUP(
+ InetAddress.getByName("127.0.0.2"),
+ 5999,
+ 5000,
+ 1000);
+
+ System.out.println("Starting udp2");
+
+ udp2 = new UDUP(
+ InetAddress.getByName("127.0.0.3"),
+ 5999,
+ 5000,
+ 1000);
+
+ UDUPMessage testContent = new UDUPMessage();
+ testContent.setDestinationModule(ModuleType.TEST);
+
+ msg1 = new UDUPMessage(
+ "udup1",
+ new ValueContact(new PathName("/udp2"), InetAddress.getByName("127.0.0.3")),
+ testContent
+ );
+
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+
+ Thread udpThread1 = new Thread(udp1);
+ udpThread1.start();
+ Thread udpThread2 = new Thread(udp2);
+ udpThread2.start();
+
+ try {
+ Thread.sleep(5000);
+ System.out.println("Sending message");
+ if (udp1 == null | udp2 == null) {
+ Assert.fail("UDPs not initialized");
+ } else {
+ udp1.handle(msg1);
+ Thread.sleep(10000);
+ }
+ } catch (InterruptedException | Module.InvalidMessageType e) {
+ e.printStackTrace();
+ testSuccess = false;
+ }
+
+ udpThread1.interrupt();
+ udpThread2.interrupt();
+
+ if (testSuccess) {
+ Assert.assertTrue(true);
+ } else {
+ Assert.fail();
+ }
+ }
+/*
+ @Test
+ public void bigMessageBetweenUDUPs() {
+ UDUP udp1 = new UDUP(
+ ModuleType.UDP,
+ 5999,
+ 1000,
+ 5000,
+ 256);
+
+ UDUP udp2 = new UDUP(
+ ModuleType.UDP,
+ 5998,
+ 1000,
+ 5000,
+ 256);
+
+ AttributesMap bigAttrib = new AttributesMap();
+ for (Long i = 1L; i < 20; i++) {
+ bigAttrib.add(i.toString(), new ValueInt(i));
+ }
+
+ UDUPMessage msg1 = new UDUPMessage(
+ "udp1",
+ ModuleType.UDP,
+ "localhost",
+ 5998,
+ new UpdateAttributesMessage("updateattrib1", 0,"/", bigAttrib),
+ 0,
+ "conv1");
+
+ try {
+ udp1.handle(msg1);
+ Thread.sleep(1000);
+ UDUPMessage conv = udp2.fetchConversation("conv1");
+ Assert.assertEquals(conv.getConversationId(), "conv1");
+ } catch (InterruptedException | Module.InvalidMessageType e) {
+ e.printStackTrace();
+ } catch (UDUP.InvalidConversation noConversationError) {
+ Assert.fail();
+ }
+ }
+
+
+ @Test
+ public void sendMultipleMessages() {
+ @Test
+ public void messageBetweenUDUPs() {
+ UDUP udp1 = new UDUP(
+ ModuleType.UDP,
+ 5999,
+ 1000,
+ 5000,
+ 3,
+ 256);
+
+ UDUP udp2 = new UDUP(
+ ModuleType.UDP,
+ 5998,
+ 1000,
+ 5000,
+ 3,
+ 256);
+
+ UDUPMessage msg1 = new UDUPMessage(
+ "1",
+ ModuleType.UDP,
+ "localhost",
+ 5998,
+ null,
+ 0,
+ "conv1");
+
+
+
+
+ try {
+ udp1.handle(msg1);
+ udp1.handle(msg2);
+ udp1.handle(msg3);
+ Thread.sleep(1000);
+ UDUPMessage conv1 = udp2.fetchConversation("conv1");
+ Assert.assertEquals(conv1.getConversationId(), "conv1");
+ UDUPMessage conv2 = udp2.fetchConversation("conv2");
+ Assert.assertEquals(conv2.getConversationId(), "conv2");
+ UDUPMessage conv3 = udp2.fetchConversation("conv3");
+ Assert.assertEquals(conv3.getConversationId(), "conv3");
+ } catch (InterruptedException | Module.InvalidMessageType e) {
+ e.printStackTrace();
+ } catch (UDUP.InvalidConversation invalidConversation) {
+ Assert.fail();
+ }
+ }
+ }
+*/
+}