diff options
Diffstat (limited to 'src')
8 files changed, 525 insertions, 13 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 26f0e0b..256fa6b 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -1,5 +1,8 @@ package pl.edu.mimuw.cloudatlas.agent; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; import java.rmi.server.UnicastRemoteObject; @@ -12,14 +15,8 @@ import pl.edu.mimuw.cloudatlas.agent.NewApiImplementation; import pl.edu.mimuw.cloudatlas.agent.messages.RunQueriesMessage; import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage; +import pl.edu.mimuw.cloudatlas.agent.modules.*; import pl.edu.mimuw.cloudatlas.agent.modules.Module; -import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; -import pl.edu.mimuw.cloudatlas.agent.modules.Qurnik; -import pl.edu.mimuw.cloudatlas.agent.modules.Remik; -import pl.edu.mimuw.cloudatlas.agent.modules.Stanik; -import pl.edu.mimuw.cloudatlas.agent.modules.RecursiveScheduledTask; -import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask; -import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduler; import pl.edu.mimuw.cloudatlas.api.Api; import pl.edu.mimuw.cloudatlas.interpreter.Main; import pl.edu.mimuw.cloudatlas.model.PathName; @@ -49,6 +46,11 @@ public class Agent { Long freshnessPeriod = new Long(System.getProperty("freshness_period")); modules.put(ModuleType.STATE, new Stanik(freshnessPeriod)); modules.put(ModuleType.QUERY, new Qurnik()); + try { + modules.put(ModuleType.UDP, new UDUP(InetAddress.getByName("127.0.0.1"), 5988, 5000, 20000)); + } catch (UnknownHostException e) { + e.printStackTrace(); + } // TODO add modules as we implement them return modules; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java new file mode 100644 index 0000000..fa8d1fa --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java @@ -0,0 +1,45 @@ +package pl.edu.mimuw.cloudatlas.agent.messages; + +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; +import pl.edu.mimuw.cloudatlas.model.ValueContact; + +public class UDUPMessage extends AgentMessage { + private ValueContact contact; + private AgentMessage content; + + public UDUPMessage(String messageId, long timestamp, ValueContact contact, AgentMessage content) { + super(messageId, ModuleType.UDP, timestamp); + this.contact = contact; + this.content = content; + } + + public UDUPMessage(String messageId, ValueContact contact, AgentMessage content) { + super(messageId, ModuleType.UDP); + this.contact = contact; + this.content = content; + } + + public UDUPMessage() { + super("", ModuleType.UDP); + } + + @Override + public void callMe(Module module) throws InterruptedException, Module.InvalidMessageType { + module.handleTyped(this); + } + + public AgentMessage getContent() { + return content; + } + + public void setContent(AgentMessage content) { + this.content = content; + } + + public ValueContact getContact() { return contact; } + + public void setContact(ValueContact contact) { + this.contact = contact; + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java index 0a934cb..67fdab9 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Module.java @@ -1,12 +1,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import pl.edu.mimuw.cloudatlas.agent.Executor; -import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.QurnikMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.RemikMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.ResponseMessage; -import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.*; /* * A Module is a (potentially stateful) event handler. @@ -49,6 +44,10 @@ public abstract class Module { throw new InvalidMessageType("Got a ResponseMessage in module " + moduleType.toString()); } + public void handleTyped(UDUPMessage message) throws InterruptedException, InvalidMessageType { + throw new InvalidMessageType("Got a ResponseMessage in module " + moduleType.toString()); + } + public void setExecutor(Executor executor) { this.executor = executor; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java new file mode 100644 index 0000000..e616c93 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java @@ -0,0 +1,73 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.SocketException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Communication over UDP + * + * Client-server exchange pattern: + * server: init message + * client: ack message + * + * retry count + * retry timeout after which retry happens + ** + * udp sends initiator module success/failure information + * + * we have udps on different addresses with the same ports + * due to ValueContact design + */ + +// TODO set server port in global config - must be the same everywhere +// TODO same with buffer size + +public class UDUP extends Module implements Runnable { + private UDUPClient client; + private UDUPServer server; + private final AtomicBoolean running; + + public UDUP(InetAddress serverAddr, + int serverPort, + int timeout, + int bufferSize) { + super(ModuleType.UDP); + this.running = new AtomicBoolean(false); + try { + this.client = new UDUPClient(this, serverPort, bufferSize); + this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize); + this.running.getAndSet(true); + } catch (SocketException e) { + e.printStackTrace(); + this.client.close(); + this.server.close(); + } + } + + public void run() { + System.out.println("UDP server running"); + while(this.running.get()) { + try { + this.server.acceptMessage(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + this.running.getAndSet(false); + this.server.close(); + } + } + } + + public void handleTyped(UDUPMessage event) throws InterruptedException { + System.out.println("UDP sending message " + event.getContent().getMessageId()); + try { + this.client.sendMessage(event); + } catch (IOException e) { + System.out.println("UDP send message failed"); + e.printStackTrace(); + } + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java new file mode 100644 index 0000000..82aaeb1 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java @@ -0,0 +1,47 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; + +import java.io.IOException; +import java.net.*; + +public class UDUPClient { + private UDUP udp; + private UDUPSerializer serializer; + private int serverPort; + private DatagramSocket socket; + private int bufsize; + + UDUPClient(UDUP udp, int serverPort, int bufferSize) throws SocketException { + this.udp = udp; + this.serverPort = serverPort; + this.socket = new DatagramSocket(); + this.bufsize = bufferSize; + this.serializer = new UDUPSerializer(); + } + + public void sendMessage(UDUPMessage msg) throws IOException { + int offset = 0; + int outputSize; + + byte[] buf = this.serializer.serialize(msg); + outputSize = buf.length; + + do { + outputSize =- bufsize; + offset += bufsize; + DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort); + System.out.println("UDP sends message: "); + for (byte b : buf) { + System.out.print(b); + } + System.out.println("to " + packet.getAddress()); + this.socket.send(packet); + } while (outputSize > bufsize); + } + + void close() { + this.socket.close(); + } + +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java new file mode 100644 index 0000000..ac35265 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java @@ -0,0 +1,98 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.model.PathName; +import pl.edu.mimuw.cloudatlas.model.ValueContact; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.UnknownHostException; + +/** + * Serializes classes to and from byte arrays for UDP use + */ +public class UDUPSerializer { + private Kryo kryo; + + UDUPSerializer() { + kryo = new Kryo(); + kryo.setReferences(true); + kryo.setRegistrationRequired(true); + registerClasses(); + } + + private void registerClasses() { + + kryo.register(Inet4Address.class, new Serializer() { + + @Override + public void write(Kryo kryo, Output output, Object object) { + InetAddress ia = (InetAddress) object; + kryo.writeObject(output, ia.getAddress()); + } + + @Override + public Object read(Kryo kryo, Input input, Class type) { + try { + byte[] buf = kryo.readObject(input, byte[].class); + InetAddress addr = Inet4Address.getByAddress(buf); + return addr; + } catch (UnknownHostException e) { + System.out.println("Custom InetAddress read failed"); + e.printStackTrace(); + return null; + } + } + }); + + kryo.register(PathName.class, new Serializer() { + + @Override + public void write(Kryo kryo, Output output, Object object) { + PathName pn = (PathName) object; + kryo.writeObject(output, pn.getName()); + } + + @Override + public Object read(Kryo kryo, Input input, Class type) { + String addr = input.readString(); + return new PathName(addr); + } + }); + + kryo.register(byte[].class); + kryo.register(ValueContact.class); + kryo.register(ModuleType.class); + + kryo.register(AgentMessage.class); + kryo.register(GetStateMessage.class); + kryo.register(UDUPMessage.class); + kryo.register(StanikMessage.Type.class); + kryo.register(StanikMessage.class); + } + + public UDUPMessage deserialize(byte[] packetData) { + ByteArrayInputStream in = new ByteArrayInputStream(packetData); + Input kryoInput = new Input(in); + UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class); + return msg; + } + + public byte[] serialize(UDUPMessage msg) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Output kryoOut = new Output(out); + kryo.writeObject(kryoOut, msg); + kryoOut.flush(); + kryoOut.close(); + return out.toByteArray(); + } +} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java new file mode 100644 index 0000000..6807a86 --- /dev/null +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -0,0 +1,72 @@ +package pl.edu.mimuw.cloudatlas.agent.modules; + +import com.google.common.primitives.Bytes; +import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; + +import java.io.IOException; +import java.net.*; +import java.util.HashMap; + +public class UDUPServer { + private UDUP udp; + private UDUPSerializer serializer; + private DatagramSocket socket; + private InetAddress address; + private HashMap<InetAddress, byte[]> partialPackets; + private int bufSize; + + public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException { + this.udp = udp; + this.socket = new DatagramSocket(port, addr); + this.address = addr; + this.bufSize = bufSize; + this.partialPackets = new HashMap<>(); + this.serializer = new UDUPSerializer(); + } + + public void acceptMessage() throws IOException, InterruptedException { + byte[] buf = new byte[bufSize]; + DatagramPacket packet = new DatagramPacket(buf, buf.length); + this.socket.receive(packet); + System.out.println("UDP " + this.address + " received packet from " + packet.getAddress()); + + if (packet.getOffset() == 0) { + UDUPMessage msg = this.serializer.deserialize(packet.getData()); + System.out.println("UDP received message " + msg.getContent().getMessageId()); + + if (packet.getLength() == this.bufSize) { + this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); + } else { + if (msg.getContent().getDestinationModule() == ModuleType.TEST) { + System.out.println("UDP server: test message received"); + } else if (msg.getContent().getDestinationModule() != ModuleType.UDP) { + this.udp.sendMessage(msg.getContent()); + } + } + } else { + this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); + } + } + + public void addPartialMessageAndCheckSerialization(InetAddress senderAddress, byte[] packetData) { + if (this.partialPackets.containsKey(senderAddress)) { + byte[] previousPacketData = this.partialPackets.get(senderAddress); + byte[] allPacketData = Bytes.concat(previousPacketData, packetData); + try { + UDUPMessage msg = this.serializer.deserialize(allPacketData); + this.udp.sendMessage(msg.getContent()); + this.partialPackets.remove(senderAddress); + } catch (Error | Exception e) { + System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest"); + this.partialPackets.put(senderAddress, allPacketData); + } + } else { + this.partialPackets.put(senderAddress, packetData); + } + } + + public void close() { + this.socket.close(); + } + +} diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java new file mode 100644 index 0000000..e4f601a --- /dev/null +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java @@ -0,0 +1,176 @@ +package pl.edu.mimuw.cloudatlas.agent; + +import org.junit.*; +import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.agent.modules.Module; +import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType; +import pl.edu.mimuw.cloudatlas.agent.modules.UDUP; +import pl.edu.mimuw.cloudatlas.model.PathName; +import pl.edu.mimuw.cloudatlas.model.ValueContact; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class UDUPTest { + + @Test + public void messageBetweenUDUPs() { + UDUP udp1 = null; + UDUP udp2 = null; + UDUPMessage msg1 = null; + boolean testSuccess = true; + + try { + System.out.println("Starting udp1"); + + udp1 = new UDUP( + InetAddress.getByName("127.0.0.2"), + 5999, + 5000, + 1000); + + System.out.println("Starting udp2"); + + udp2 = new UDUP( + InetAddress.getByName("127.0.0.3"), + 5999, + 5000, + 1000); + + UDUPMessage testContent = new UDUPMessage(); + testContent.setDestinationModule(ModuleType.TEST); + + msg1 = new UDUPMessage( + "udup1", + new ValueContact(new PathName("/udp2"), InetAddress.getByName("127.0.0.3")), + testContent + ); + + } catch (UnknownHostException e) { + e.printStackTrace(); + } + + Thread udpThread1 = new Thread(udp1); + udpThread1.start(); + Thread udpThread2 = new Thread(udp2); + udpThread2.start(); + + try { + Thread.sleep(5000); + System.out.println("Sending message"); + if (udp1 == null | udp2 == null) { + Assert.fail("UDPs not initialized"); + } else { + udp1.handle(msg1); + Thread.sleep(10000); + } + } catch (InterruptedException | Module.InvalidMessageType e) { + e.printStackTrace(); + testSuccess = false; + } + + udpThread1.interrupt(); + udpThread2.interrupt(); + + if (testSuccess) { + Assert.assertTrue(true); + } else { + Assert.fail(); + } + } +/* + @Test + public void bigMessageBetweenUDUPs() { + UDUP udp1 = new UDUP( + ModuleType.UDP, + 5999, + 1000, + 5000, + 256); + + UDUP udp2 = new UDUP( + ModuleType.UDP, + 5998, + 1000, + 5000, + 256); + + AttributesMap bigAttrib = new AttributesMap(); + for (Long i = 1L; i < 20; i++) { + bigAttrib.add(i.toString(), new ValueInt(i)); + } + + UDUPMessage msg1 = new UDUPMessage( + "udp1", + ModuleType.UDP, + "localhost", + 5998, + new UpdateAttributesMessage("updateattrib1", 0,"/", bigAttrib), + 0, + "conv1"); + + try { + udp1.handle(msg1); + Thread.sleep(1000); + UDUPMessage conv = udp2.fetchConversation("conv1"); + Assert.assertEquals(conv.getConversationId(), "conv1"); + } catch (InterruptedException | Module.InvalidMessageType e) { + e.printStackTrace(); + } catch (UDUP.InvalidConversation noConversationError) { + Assert.fail(); + } + } + + + @Test + public void sendMultipleMessages() { + @Test + public void messageBetweenUDUPs() { + UDUP udp1 = new UDUP( + ModuleType.UDP, + 5999, + 1000, + 5000, + 3, + 256); + + UDUP udp2 = new UDUP( + ModuleType.UDP, + 5998, + 1000, + 5000, + 3, + 256); + + UDUPMessage msg1 = new UDUPMessage( + "1", + ModuleType.UDP, + "localhost", + 5998, + null, + 0, + "conv1"); + + + + + try { + udp1.handle(msg1); + udp1.handle(msg2); + udp1.handle(msg3); + Thread.sleep(1000); + UDUPMessage conv1 = udp2.fetchConversation("conv1"); + Assert.assertEquals(conv1.getConversationId(), "conv1"); + UDUPMessage conv2 = udp2.fetchConversation("conv2"); + Assert.assertEquals(conv2.getConversationId(), "conv2"); + UDUPMessage conv3 = udp2.fetchConversation("conv3"); + Assert.assertEquals(conv3.getConversationId(), "conv3"); + } catch (InterruptedException | Module.InvalidMessageType e) { + e.printStackTrace(); + } catch (UDUP.InvalidConversation invalidConversation) { + Assert.fail(); + } + } + } +*/ +} |