diff options
Diffstat (limited to 'src/main/java/pl/edu/mimuw')
5 files changed, 343 insertions, 6 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java new file mode 100644 index 0000000..335d6fe --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java @@ -0,0 +1,61 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; +import pl.edu.mimuw.cloudatlas.model.ValueContact; + +public class UDUPMessage extends AgentMessage { + private ValueContact contact; + private AgentMessage content; + private int retry; + private String conversationId; + + public UDUPMessage(String messageId, long timestamp, ValueContact contact, AgentMessage content, int retry, String conversationId) { + super(messageId, ModuleType.UDP, timestamp); + this.contact = contact; + this.content = content; + this.retry = retry; + this.conversationId = conversationId; + } + + public UDUPMessage(String messageId, ValueContact contact, AgentMessage content, int retry, String conversationId) { + super(messageId, ModuleType.UDP); + this.contact = contact; + this.content = content; + this.retry = retry; + this.conversationId = conversationId; + } + + @Override + public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType { + module.handleTyped(this); + } + + public AgentMessage getContent() { + return content; + } + + public void setContent(AgentMessage content) { + this.content = content; + } + + public int getRetry() { + return retry; + } + + public String getConversationId() { + return conversationId; + } + + public void setRetry(int retry) { this.retry = retry; } + + public ValueContact getContact() { return contact; } + + public void setContact(ValueContact contact) { + this.contact = contact; + } + + public void setConversationId(String conversationId) { + this.conversationId = conversationId; + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java index 0a934cb..67fdab9 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java @@ -1,12 +1,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import pl.edu.mimuw.cloudatlas.agent.Executor; -import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.QurnikMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.RemikMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.*; /* * A Module is a (potentially stateful) event handler. @@ -49,6 +44,10 @@ public abstract class Module { throw new InvalidMessageType("Got a ResponseMessage in module " + moduleType.toString()); } + public void handleTyped(UDUPMessage message) throws InterruptedException, InvalidMessageType { + throw new InvalidMessageType("Got a ResponseMessage in module " + moduleType.toString()); + } + public void setExecutor(Executor executor) { this.executor = executor; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java new file mode 100644 index 0000000..e4a7962 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java @@ -0,0 +1,120 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.primitives.Bytes; +import pl.edu.mimuw.cloudatlas.agent.EventBus; +import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Communication over UDP + * + * Client-server exchange pattern: + * server: init message + * client: ack message + * + * retry count + * retry timeout after which retry happens + ** + * udp sends initiator module success/failure information + * + * we have udps on different addresses with the same ports + * due to ValueContact design + */ + +public class UDUP extends Module implements Runnable { + public class InvalidConversation extends Exception { + public InvalidConversation(String message) { super(message); } + } + + public class InvalidContact extends Exception { + public InvalidContact(String message) { super(message); } + } + + private UDUPClient client; + private UDUPServer server; + private HashMap<String, UDUPMessage> currentConversations; // TODO find blocking one + private final AtomicBoolean running; + + public UDUP(ModuleType moduleType, + InetAddress serverAddr, + int serverPort, + int retryTimeout, + int retriesCount, + int bufferSize) { + super(moduleType); + this.currentConversations = new HashMap<>(); + this.running = new AtomicBoolean(true); + try { + this.client = new UDUPClient(this, serverPort, retryTimeout, retriesCount, bufferSize); + this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize); + } catch (SocketException | UnknownHostException e) { + e.printStackTrace(); + this.client.close(); + this.server.close(); + this.running.getAndSet(false); + } + } + + public void run() { + System.out.println("UDP server running"); + while(this.running.get()) { + try { + this.server.acceptMessage(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + this.running.getAndSet(false); + this.server.close(); + } + } + } + + public void handleTyped(UDUPMessage event) throws InterruptedException { +// System.out.println("UDP sending message " + event.getContent().getMessageId()); + this.client.sendMessage(event); + } + + // also used for updating + public void addConversation(UDUPMessage msg) { + this.currentConversations.put(msg.getConversationId(), msg); + } + + public UDUPMessage fetchConversation(String conversationId) throws InvalidConversation { + UDUPMessage ret = this.currentConversations.get(conversationId); + if (ret == null) { + throw new InvalidConversation("Conversation does not exist"); + } else { + return ret; + } + } + + // TODO add conversation removal + public void removeConversation(String conversationId) { + this.currentConversations.remove(conversationId); + } + + public UDUPMessage deserialize(ByteArrayInputStream in) { + Kryo kryo = new Kryo(); + Input kryoInput = new Input(in); + UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class); + return msg; + } + + public void serialize(ByteArrayOutputStream out, UDUPMessage msg) { + Kryo kryo = new Kryo(); + Output kryoOut = new Output(out); + kryo.writeObject(kryoOut, msg); + kryoOut.flush(); + } + +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java new file mode 100644 index 0000000..93f2898 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java @@ -0,0 +1,89 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.*; + +public class UDUPClient { + private UDUP udp; + private int serverPort; + private int timeout; + private int retriesCount; + private DatagramSocket socket; + private int bufsize; + + UDUPClient(UDUP udp, int serverPort, int timeout, int retriesCount, int bufferSize) throws SocketException, UnknownHostException { + this.udp = udp; + this.serverPort = serverPort; + this.timeout = timeout; + this.retriesCount = retriesCount; + this.socket = new DatagramSocket(); + this.bufsize = bufferSize; + } + + // TODO make sure that retry count in message is updated correctly + public void sendMessage(UDUPMessage msg) throws InterruptedException { + String messageId = msg.getMessageId(); + + if (msg.getRetry() >= this.retriesCount) { + this.udp.removeConversation(msg.getConversationId()); + } else { + this.udp.addConversation(msg); + try { + sendUDP(msg); + } catch (IOException e) { + e.printStackTrace(); + } + + msg.setRetry(msg.getRetry() + 1); + + // TODO add sending message to timer with retry + /* + this.udp.executor.passMessage(new TimerSchedulerMessage( + "", + 0, + "", + this.timeout, + System.currentTimeMillis() / 1000L, + new TimerScheduledTask() { + @Override + public void run() { + try { + this.sendMessage(msg); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + })); + + */ + } + } + + private void sendUDP(UDUPMessage msg) throws IOException { + int offset = 0; + int outputSize; + byte[] buf = new byte[bufsize]; + ByteArrayOutputStream output = new ByteArrayOutputStream(); + + this.udp.serialize(output, msg); + outputSize = output.size(); + + do { + output.write(buf, offset, bufsize); + System.out.println("UDP sends message: " + buf); + outputSize =- bufsize; + offset += bufsize; + DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort); + this.socket.send(packet); + } while (outputSize > bufsize); + } + + void close() { + this.socket.close(); + } + +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java new file mode 100644 index 0000000..f89f986 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -0,0 +1,68 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import com.google.common.primitives.Bytes; +import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.net.*; +import java.util.HashMap; + +public class UDUPServer { + UDUP udp; + private DatagramSocket socket; + private InetAddress address; + private byte[] buf; + private HashMap<InetAddress, byte[]> partialPackets; + + public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException, UnknownHostException { + this.udp = udp; + this.socket = new DatagramSocket(port, addr); + this.address = addr; + this.buf = new byte[bufSize]; + this.partialPackets = new HashMap<>(); + } + + public void acceptMessage() throws IOException, InterruptedException { + DatagramPacket packet = new DatagramPacket(buf, buf.length); + this.socket.receive(packet); + System.out.println("UDP received packet: " + packet.getData()); + + if (packet.getOffset() == 0) { + UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(packet.getData())); + // TODO check if not full packet anyway + // TODO check for errors if it's not the end od transmission + + this.udp.addConversation(msg); + System.out.println("UDP received message " + msg.getContent().getMessageId()); + + if (msg.getDestinationModule() != ModuleType.UDP) { + this.udp.sendMessage(msg.getContent()); + } + } else { + this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); + } + } + + public void addPartialMessageAndCheckSerialization(InetAddress senderAddress, byte[] packetData) { + if (this.partialPackets.containsKey(senderAddress)) { + byte[] previousPacketData = this.partialPackets.get(senderAddress); + byte[] allPacketData = Bytes.concat(previousPacketData, packetData); + try { + UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(allPacketData)); + this.udp.sendMessage(msg.getContent()); + this.partialPackets.remove(senderAddress); + } catch (Error | Exception e) { + System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest"); + this.partialPackets.put(senderAddress, allPacketData); + } + } else { + this.partialPackets.put(senderAddress, packetData); + } + } + + public void close() { + this.socket.close(); + } + +} |