m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/pl/edu/mimuw/cloudatlas
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java16
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java45
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java11
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java73
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java47
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java98
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java72
7 files changed, 349 insertions, 13 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
index 26f0e0b..256fa6b 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java
@@ -1,5 +1,8 @@
package pl.edu.mimuw.cloudatlas.agent;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
@@ -12,14 +15,8 @@ import pl.edu.mimuw.cloudatlas.agent.NewApiImplementation;
import pl.edu.mimuw.cloudatlas.agent.messages.RunQueriesMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage;
+import pl.edu.mimuw.cloudatlas.agent.modules.*;
import pl.edu.mimuw.cloudatlas.agent.modules.Module;
-import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
-import pl.edu.mimuw.cloudatlas.agent.modules.Qurnik;
-import pl.edu.mimuw.cloudatlas.agent.modules.Remik;
-import pl.edu.mimuw.cloudatlas.agent.modules.Stanik;
-import pl.edu.mimuw.cloudatlas.agent.modules.RecursiveScheduledTask;
-import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask;
-import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduler;
import pl.edu.mimuw.cloudatlas.api.Api;
import pl.edu.mimuw.cloudatlas.interpreter.Main;
import pl.edu.mimuw.cloudatlas.model.PathName;
@@ -49,6 +46,11 @@ public class Agent {
Long freshnessPeriod = new Long(System.getProperty("freshness_period"));
modules.put(ModuleType.STATE, new Stanik(freshnessPeriod));
modules.put(ModuleType.QUERY, new Qurnik());
+ try {
+ modules.put(ModuleType.UDP, new UDUP(InetAddress.getByName("127.0.0.1"), 5988, 5000, 20000));
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
// TODO add modules as we implement them
return modules;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
new file mode 100644
index 0000000..fa8d1fa
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java
@@ -0,0 +1,45 @@
+package pl.edu.mimuw.cloudatlas.agent.messages;
+
+import pl.edu.mimuw.cloudatlas.agent.modules.Module;
+import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;
+import pl.edu.mimuw.cloudatlas.model.ValueContact;
+
+public class UDUPMessage extends AgentMessage {
+ private ValueContact contact;
+ private AgentMessage content;
+
+ public UDUPMessage(String messageId, long timestamp, ValueContact contact, AgentMessage content) {
+ super(messageId, ModuleType.UDP, timestamp);
+ this.contact = contact;
+ this.content = content;
+ }
+
+ public UDUPMessage(String messageId, ValueContact contact, AgentMessage content) {
+ super(messageId, ModuleType.UDP);
+ this.contact = contact;
+ this.content = content;
+ }
+
+ public UDUPMessage() {
+ super("", ModuleType.UDP);
+ }
+
+ @Override
+ public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType {
+ module.handleTyped(this);
+ }
+
+ public AgentMessage getContent() {
+ return content;
+ }
+
+ public void setContent(AgentMessage content) {
+ this.content = content;
+ }
+
+ public ValueContact getContact() { return contact; }
+
+ public void setContact(ValueContact contact) {
+ this.contact = contact;
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
index 0a934cb..67fdab9 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java
@@ -1,12 +1,7 @@
package pl.edu.mimuw.cloudatlas.agent.modules;
import pl.edu.mimuw.cloudatlas.agent.Executor;
-import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.QurnikMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.RemikMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage;
-import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.*;
/*
* A Module is a (potentially stateful) event handler.
@@ -49,6 +44,10 @@ public abstract class Module {
throw new InvalidMessageType("Got a ResponseMessage in module " + moduleType.toString());
}
+ public void handleTyped(UDUPMessage message) throws InterruptedException, InvalidMessageType {
+ throw new InvalidMessageType("Got a ResponseMessage in module " + moduleType.toString());
+ }
+
public void setExecutor(Executor executor) {
this.executor = executor;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
new file mode 100644
index 0000000..e616c93
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java
@@ -0,0 +1,73 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Communication over UDP
+ *
+ * Client-server exchange pattern:
+ * server: init message
+ * client: ack message
+ *
+ * retry count
+ * retry timeout after which retry happens
+ **
+ * udp sends initiator module success/failure information
+ *
+ * we have udps on different addresses with the same ports
+ * due to ValueContact design
+ */
+
+// TODO set server port in global config - must be the same everywhere
+// TODO same with buffer size
+
+public class UDUP extends Module implements Runnable {
+ private UDUPClient client;
+ private UDUPServer server;
+ private final AtomicBoolean running;
+
+ public UDUP(InetAddress serverAddr,
+ int serverPort,
+ int timeout,
+ int bufferSize) {
+ super(ModuleType.UDP);
+ this.running = new AtomicBoolean(false);
+ try {
+ this.client = new UDUPClient(this, serverPort, bufferSize);
+ this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize);
+ this.running.getAndSet(true);
+ } catch (SocketException e) {
+ e.printStackTrace();
+ this.client.close();
+ this.server.close();
+ }
+ }
+
+ public void run() {
+ System.out.println("UDP server running");
+ while(this.running.get()) {
+ try {
+ this.server.acceptMessage();
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ this.running.getAndSet(false);
+ this.server.close();
+ }
+ }
+ }
+
+ public void handleTyped(UDUPMessage event) throws InterruptedException {
+ System.out.println("UDP sending message " + event.getContent().getMessageId());
+ try {
+ this.client.sendMessage(event);
+ } catch (IOException e) {
+ System.out.println("UDP send message failed");
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
new file mode 100644
index 0000000..82aaeb1
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
@@ -0,0 +1,47 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+
+import java.io.IOException;
+import java.net.*;
+
+public class UDUPClient {
+ private UDUP udp;
+ private UDUPSerializer serializer;
+ private int serverPort;
+ private DatagramSocket socket;
+ private int bufsize;
+
+ UDUPClient(UDUP udp, int serverPort, int bufferSize) throws SocketException {
+ this.udp = udp;
+ this.serverPort = serverPort;
+ this.socket = new DatagramSocket();
+ this.bufsize = bufferSize;
+ this.serializer = new UDUPSerializer();
+ }
+
+ public void sendMessage(UDUPMessage msg) throws IOException {
+ int offset = 0;
+ int outputSize;
+
+ byte[] buf = this.serializer.serialize(msg);
+ outputSize = buf.length;
+
+ do {
+ outputSize =- bufsize;
+ offset += bufsize;
+ DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort);
+ System.out.println("UDP sends message: ");
+ for (byte b : buf) {
+ System.out.print(b);
+ }
+ System.out.println("to " + packet.getAddress());
+ this.socket.send(packet);
+ } while (outputSize > bufsize);
+ }
+
+ void close() {
+ this.socket.close();
+ }
+
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
new file mode 100644
index 0000000..ac35265
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
@@ -0,0 +1,98 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+import pl.edu.mimuw.cloudatlas.model.PathName;
+import pl.edu.mimuw.cloudatlas.model.ValueContact;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * Serializes classes to and from byte arrays for UDP use
+ */
+public class UDUPSerializer {
+ private Kryo kryo;
+
+ UDUPSerializer() {
+ kryo = new Kryo();
+ kryo.setReferences(true);
+ kryo.setRegistrationRequired(true);
+ registerClasses();
+ }
+
+ private void registerClasses() {
+
+ kryo.register(Inet4Address.class, new Serializer() {
+
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ InetAddress ia = (InetAddress) object;
+ kryo.writeObject(output, ia.getAddress());
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
+ try {
+ byte[] buf = kryo.readObject(input, byte[].class);
+ InetAddress addr = Inet4Address.getByAddress(buf);
+ return addr;
+ } catch (UnknownHostException e) {
+ System.out.println("Custom InetAddress read failed");
+ e.printStackTrace();
+ return null;
+ }
+ }
+ });
+
+ kryo.register(PathName.class, new Serializer() {
+
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ PathName pn = (PathName) object;
+ kryo.writeObject(output, pn.getName());
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
+ String addr = input.readString();
+ return new PathName(addr);
+ }
+ });
+
+ kryo.register(byte[].class);
+ kryo.register(ValueContact.class);
+ kryo.register(ModuleType.class);
+
+ kryo.register(AgentMessage.class);
+ kryo.register(GetStateMessage.class);
+ kryo.register(UDUPMessage.class);
+ kryo.register(StanikMessage.Type.class);
+ kryo.register(StanikMessage.class);
+ }
+
+ public UDUPMessage deserialize(byte[] packetData) {
+ ByteArrayInputStream in = new ByteArrayInputStream(packetData);
+ Input kryoInput = new Input(in);
+ UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class);
+ return msg;
+ }
+
+ public byte[] serialize(UDUPMessage msg) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Output kryoOut = new Output(out);
+ kryo.writeObject(kryoOut, msg);
+ kryoOut.flush();
+ kryoOut.close();
+ return out.toByteArray();
+ }
+}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
new file mode 100644
index 0000000..6807a86
--- /dev/null
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -0,0 +1,72 @@
+package pl.edu.mimuw.cloudatlas.agent.modules;
+
+import com.google.common.primitives.Bytes;
+import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
+
+import java.io.IOException;
+import java.net.*;
+import java.util.HashMap;
+
+public class UDUPServer {
+ private UDUP udp;
+ private UDUPSerializer serializer;
+ private DatagramSocket socket;
+ private InetAddress address;
+ private HashMap<InetAddress, byte[]> partialPackets;
+ private int bufSize;
+
+ public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException {
+ this.udp = udp;
+ this.socket = new DatagramSocket(port, addr);
+ this.address = addr;
+ this.bufSize = bufSize;
+ this.partialPackets = new HashMap<>();
+ this.serializer = new UDUPSerializer();
+ }
+
+ public void acceptMessage() throws IOException, InterruptedException {
+ byte[] buf = new byte[bufSize];
+ DatagramPacket packet = new DatagramPacket(buf, buf.length);
+ this.socket.receive(packet);
+ System.out.println("UDP " + this.address + " received packet from " + packet.getAddress());
+
+ if (packet.getOffset() == 0) {
+ UDUPMessage msg = this.serializer.deserialize(packet.getData());
+ System.out.println("UDP received message " + msg.getContent().getMessageId());
+
+ if (packet.getLength() == this.bufSize) {
+ this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
+ } else {
+ if (msg.getContent().getDestinationModule() == ModuleType.TEST) {
+ System.out.println("UDP server: test message received");
+ } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) {
+ this.udp.sendMessage(msg.getContent());
+ }
+ }
+ } else {
+ this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData());
+ }
+ }
+
+ public void addPartialMessageAndCheckSerialization(InetAddress senderAddress, byte[] packetData) {
+ if (this.partialPackets.containsKey(senderAddress)) {
+ byte[] previousPacketData = this.partialPackets.get(senderAddress);
+ byte[] allPacketData = Bytes.concat(previousPacketData, packetData);
+ try {
+ UDUPMessage msg = this.serializer.deserialize(allPacketData);
+ this.udp.sendMessage(msg.getContent());
+ this.partialPackets.remove(senderAddress);
+ } catch (Error | Exception e) {
+ System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest");
+ this.partialPackets.put(senderAddress, allPacketData);
+ }
+ } else {
+ this.partialPackets.put(senderAddress, packetData);
+ }
+ }
+
+ public void close() {
+ this.socket.close();
+ }
+
+}