diff options
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent/modules')
4 files changed, 137 insertions, 130 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java index e4a7962..e616c93 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java @@ -1,19 +1,10 @@ package pl.edu.mimuw.cloudatlas.agent.modules; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.google.common.primitives.Bytes; -import pl.edu.mimuw.cloudatlas.agent.EventBus; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.SocketException; -import java.net.UnknownHostException; -import java.util.HashMap; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -32,37 +23,28 @@ import java.util.concurrent.atomic.AtomicBoolean; * due to ValueContact design */ -public class UDUP extends Module implements Runnable { - public class InvalidConversation extends Exception { - public InvalidConversation(String message) { super(message); } - } - - public class InvalidContact extends Exception { - public InvalidContact(String message) { super(message); } - } +// TODO set server port in global config - must be the same everywhere +// TODO same with buffer size +public class UDUP extends Module implements Runnable { private UDUPClient client; private UDUPServer server; - private HashMap<String, UDUPMessage> currentConversations; // TODO find blocking one private final AtomicBoolean running; - public UDUP(ModuleType moduleType, - InetAddress serverAddr, + public UDUP(InetAddress serverAddr, int serverPort, - int retryTimeout, - int retriesCount, + int timeout, int bufferSize) { - super(moduleType); - this.currentConversations = new HashMap<>(); - this.running = new AtomicBoolean(true); + super(ModuleType.UDP); + this.running = new AtomicBoolean(false); try { - this.client = new UDUPClient(this, serverPort, retryTimeout, retriesCount, bufferSize); + this.client = new UDUPClient(this, serverPort, bufferSize); this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize); - } catch (SocketException | UnknownHostException e) { + this.running.getAndSet(true); + } catch (SocketException e) { e.printStackTrace(); this.client.close(); this.server.close(); - this.running.getAndSet(false); } } @@ -80,41 +62,12 @@ public class UDUP extends Module implements Runnable { } public void handleTyped(UDUPMessage event) throws InterruptedException { -// System.out.println("UDP sending message " + event.getContent().getMessageId()); - this.client.sendMessage(event); - } - - // also used for updating - public void addConversation(UDUPMessage msg) { - this.currentConversations.put(msg.getConversationId(), msg); - } - - public UDUPMessage fetchConversation(String conversationId) throws InvalidConversation { - UDUPMessage ret = this.currentConversations.get(conversationId); - if (ret == null) { - throw new InvalidConversation("Conversation does not exist"); - } else { - return ret; + System.out.println("UDP sending message " + event.getContent().getMessageId()); + try { + this.client.sendMessage(event); + } catch (IOException e) { + System.out.println("UDP send message failed"); + e.printStackTrace(); } } - - // TODO add conversation removal - public void removeConversation(String conversationId) { - this.currentConversations.remove(conversationId); - } - - public UDUPMessage deserialize(ByteArrayInputStream in) { - Kryo kryo = new Kryo(); - Input kryoInput = new Input(in); - UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class); - return msg; - } - - public void serialize(ByteArrayOutputStream out, UDUPMessage msg) { - Kryo kryo = new Kryo(); - Output kryoOut = new Output(out); - kryo.writeObject(kryoOut, msg); - kryoOut.flush(); - } - } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java index 93f2898..82aaeb1 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java @@ -1,83 +1,41 @@ package pl.edu.mimuw.cloudatlas.agent.modules; -import pl.edu.mimuw.cloudatlas.agent.messages.TimerSchedulerMessage; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.*; public class UDUPClient { private UDUP udp; + private UDUPSerializer serializer; private int serverPort; - private int timeout; - private int retriesCount; private DatagramSocket socket; private int bufsize; - UDUPClient(UDUP udp, int serverPort, int timeout, int retriesCount, int bufferSize) throws SocketException, UnknownHostException { + UDUPClient(UDUP udp, int serverPort, int bufferSize) throws SocketException { this.udp = udp; this.serverPort = serverPort; - this.timeout = timeout; - this.retriesCount = retriesCount; this.socket = new DatagramSocket(); this.bufsize = bufferSize; + this.serializer = new UDUPSerializer(); } - // TODO make sure that retry count in message is updated correctly - public void sendMessage(UDUPMessage msg) throws InterruptedException { - String messageId = msg.getMessageId(); - - if (msg.getRetry() >= this.retriesCount) { - this.udp.removeConversation(msg.getConversationId()); - } else { - this.udp.addConversation(msg); - try { - sendUDP(msg); - } catch (IOException e) { - e.printStackTrace(); - } - - msg.setRetry(msg.getRetry() + 1); - - // TODO add sending message to timer with retry - /* - this.udp.executor.passMessage(new TimerSchedulerMessage( - "", - 0, - "", - this.timeout, - System.currentTimeMillis() / 1000L, - new TimerScheduledTask() { - @Override - public void run() { - try { - this.sendMessage(msg); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - })); - - */ - } - } - - private void sendUDP(UDUPMessage msg) throws IOException { + public void sendMessage(UDUPMessage msg) throws IOException { int offset = 0; int outputSize; - byte[] buf = new byte[bufsize]; - ByteArrayOutputStream output = new ByteArrayOutputStream(); - this.udp.serialize(output, msg); - outputSize = output.size(); + byte[] buf = this.serializer.serialize(msg); + outputSize = buf.length; do { - output.write(buf, offset, bufsize); - System.out.println("UDP sends message: " + buf); outputSize =- bufsize; offset += bufsize; DatagramPacket packet = new DatagramPacket(buf, buf.length, msg.getContact().getAddress(), this.serverPort); + System.out.println("UDP sends message: "); + for (byte b : buf) { + System.out.print(b); + } + System.out.println("to " + packet.getAddress()); this.socket.send(packet); } while (outputSize > bufsize); } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java index 3196a97..ac35265 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java @@ -1,4 +1,98 @@ package pl.edu.mimuw.cloudatlas.agent.modules; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import pl.edu.mimuw.cloudatlas.agent.messages.AgentMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.GetStateMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.StanikMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.model.PathName; +import pl.edu.mimuw.cloudatlas.model.ValueContact; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.UnknownHostException; + +/** + * Serializes classes to and from byte arrays for UDP use + */ public class UDUPSerializer { + private Kryo kryo; + + UDUPSerializer() { + kryo = new Kryo(); + kryo.setReferences(true); + kryo.setRegistrationRequired(true); + registerClasses(); + } + + private void registerClasses() { + + kryo.register(Inet4Address.class, new Serializer() { + + @Override + public void write(Kryo kryo, Output output, Object object) { + InetAddress ia = (InetAddress) object; + kryo.writeObject(output, ia.getAddress()); + } + + @Override + public Object read(Kryo kryo, Input input, Class type) { + try { + byte[] buf = kryo.readObject(input, byte[].class); + InetAddress addr = Inet4Address.getByAddress(buf); + return addr; + } catch (UnknownHostException e) { + System.out.println("Custom InetAddress read failed"); + e.printStackTrace(); + return null; + } + } + }); + + kryo.register(PathName.class, new Serializer() { + + @Override + public void write(Kryo kryo, Output output, Object object) { + PathName pn = (PathName) object; + kryo.writeObject(output, pn.getName()); + } + + @Override + public Object read(Kryo kryo, Input input, Class type) { + String addr = input.readString(); + return new PathName(addr); + } + }); + + kryo.register(byte[].class); + kryo.register(ValueContact.class); + kryo.register(ModuleType.class); + + kryo.register(AgentMessage.class); + kryo.register(GetStateMessage.class); + kryo.register(UDUPMessage.class); + kryo.register(StanikMessage.Type.class); + kryo.register(StanikMessage.class); + } + + public UDUPMessage deserialize(byte[] packetData) { + ByteArrayInputStream in = new ByteArrayInputStream(packetData); + Input kryoInput = new Input(in); + UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class); + return msg; + } + + public byte[] serialize(UDUPMessage msg) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Output kryoOut = new Output(out); + kryo.writeObject(kryoOut, msg); + kryoOut.flush(); + kryoOut.close(); + return out.toByteArray(); + } } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index f89f986..9a64bbd 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -3,41 +3,43 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import com.google.common.primitives.Bytes; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.*; import java.util.HashMap; public class UDUPServer { - UDUP udp; + private UDUP udp; + private UDUPSerializer serializer; private DatagramSocket socket; private InetAddress address; - private byte[] buf; private HashMap<InetAddress, byte[]> partialPackets; + private int bufSize; - public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException, UnknownHostException { + public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException { this.udp = udp; this.socket = new DatagramSocket(port, addr); this.address = addr; - this.buf = new byte[bufSize]; + this.bufSize = bufSize; this.partialPackets = new HashMap<>(); + this.serializer = new UDUPSerializer(); } public void acceptMessage() throws IOException, InterruptedException { + byte[] buf = new byte[bufSize]; DatagramPacket packet = new DatagramPacket(buf, buf.length); this.socket.receive(packet); - System.out.println("UDP received packet: " + packet.getData()); + System.out.println("UDP " + this.address + " received packet from " + packet.getAddress()); if (packet.getOffset() == 0) { - UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(packet.getData())); - // TODO check if not full packet anyway - // TODO check for errors if it's not the end od transmission - - this.udp.addConversation(msg); + UDUPMessage msg = this.serializer.deserialize(packet.getData()); System.out.println("UDP received message " + msg.getContent().getMessageId()); - if (msg.getDestinationModule() != ModuleType.UDP) { - this.udp.sendMessage(msg.getContent()); + if (packet.getLength() == this.bufSize) { + this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); + } else { + if (msg.getDestinationModule() != ModuleType.UDP) { + this.udp.sendMessage(msg.getContent()); + } } } else { this.addPartialMessageAndCheckSerialization(packet.getAddress(), packet.getData()); @@ -49,7 +51,7 @@ public class UDUPServer { byte[] previousPacketData = this.partialPackets.get(senderAddress); byte[] allPacketData = Bytes.concat(previousPacketData, packetData); try { - UDUPMessage msg = this.udp.deserialize(new ByteArrayInputStream(allPacketData)); + UDUPMessage msg = this.serializer.deserialize(allPacketData); this.udp.sendMessage(msg.getContent()); this.partialPackets.remove(senderAddress); } catch (Error | Exception e) { |