diff options
| author | Marcin Chrzanowski <marcin.j.chrzanowski@gmail.com> | 2020-01-10 20:37:00 +0100 | 
|---|---|---|
| committer | Marcin Chrzanowski <marcin.j.chrzanowski@gmail.com> | 2020-01-10 20:37:00 +0100 | 
| commit | 52f74f179179b580e6171466c221b20f4e2a6b29 (patch) | |
| tree | bee114943b6f52c5e54ab0c51badd5b65ca92e8f /src | |
| parent | 7956fb8b67e6f10760431cbe77db2fcf33d5e9e0 (diff) | |
| parent | 69fa53941cfa42f3b0f511f6abe549919241123b (diff) | |
Merge branch 'master' into gossip-girl-2
Diffstat (limited to 'src')
9 files changed, 171 insertions, 97 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java index 256fa6b..62cd544 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/Agent.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent;  import java.net.Inet4Address;  import java.net.InetAddress; +import java.net.SocketException;  import java.net.UnknownHostException;  import java.rmi.registry.LocateRegistry;  import java.rmi.registry.Registry; @@ -39,18 +40,19 @@ public class Agent {          }      } -    public static HashMap<ModuleType, Module> initializeModules() { +    public static HashMap<ModuleType, Module> initializeModules() throws UnknownHostException, SocketException, NullPointerException {          HashMap<ModuleType, Module> modules = new HashMap<ModuleType, Module>();          modules.put(ModuleType.TIMER_SCHEDULER, new TimerScheduler(ModuleType.TIMER_SCHEDULER));          modules.put(ModuleType.RMI, new Remik()); -        Long freshnessPeriod = new Long(System.getProperty("freshness_period")); +        Long freshnessPeriod = Long.getLong("freshness_period");          modules.put(ModuleType.STATE, new Stanik(freshnessPeriod));          modules.put(ModuleType.QUERY, new Qurnik()); -        try { -            modules.put(ModuleType.UDP, new UDUP(InetAddress.getByName("127.0.0.1"), 5988, 5000, 20000)); -        } catch (UnknownHostException e) { -            e.printStackTrace(); -        } + +        Integer port = Integer.getInteger("UDUPServer.port"); +        Integer timeout = Integer.getInteger("UDUPServer.timeout"); +        Integer bufsize = Integer.getInteger("UDUPServer.bufsize"); +        UDUPServer server = new UDUPServer(InetAddress.getByName("127.0.0.1"), port, bufsize); +        modules.put(ModuleType.UDP, new UDUP(port, timeout, bufsize, server));          // TODO add modules as we implement them          return modules;      } @@ -89,14 +91,24 @@ public class Agent {      }      public static void runModulesAsThreads() { -        HashMap<ModuleType, Module> modules = initializeModules(); +        HashMap<ModuleType, Module> modules = null; + +        try { +            modules = initializeModules(); +        } catch (UnknownHostException | SocketException e) { +            System.out.println("Module initialization failed"); +            e.printStackTrace(); +            return; +        } +          HashMap<ModuleType, Executor> executors = initializeExecutors(modules);          ArrayList<Thread> executorThreads = initializeExecutorThreads(executors); -          eventBus = new EventBus(executors); +        Thread UDUPServerThread = new Thread(((UDUP) modules.get(ModuleType.UDP)).getServer());          Thread eventBusThread = new Thread(eventBus);          System.out.println("Initializing event bus");          eventBusThread.start(); +        UDUPServerThread.start();      }      private static void initZones() { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java index 508fe88..03525bb 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/GossipGirlMessage.java @@ -19,6 +19,8 @@ public abstract class GossipGirlMessage extends AgentMessage {          this.type = type;      } +    public GossipGirlMessage() {}; +      public Type getType() {          return type;      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java index 0a3a868..4c223f5 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/RemoteGossipGirlMessage.java @@ -10,6 +10,8 @@ public class RemoteGossipGirlMessage extends GossipGirlMessage {          super(messageId, timestamp, type);      } +    public RemoteGossipGirlMessage() {}; +      public void setSentTimestamp(ValueTime sentTimestamp) {          this.sentTimestamp = sentTimestamp;      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java index 3751b3c..b955340 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/messages/UDUPMessage.java @@ -6,15 +6,15 @@ import pl.edu.mimuw.cloudatlas.model.ValueContact;  public class UDUPMessage extends AgentMessage {      private ValueContact contact; -    private AgentMessage content; +    private RemoteGossipGirlMessage content; -    public UDUPMessage(String messageId, long timestamp, ValueContact contact, AgentMessage content) { +    public UDUPMessage(String messageId, long timestamp, ValueContact contact, RemoteGossipGirlMessage content) {          super(messageId, ModuleType.UDP, timestamp);          this.contact = contact;          this.content = content;      } -    public UDUPMessage(String messageId, ValueContact contact, AgentMessage content) { +    public UDUPMessage(String messageId, ValueContact contact, RemoteGossipGirlMessage content) {          super(messageId, ModuleType.UDP);          this.contact = contact;          this.content = content; @@ -27,11 +27,11 @@ public class UDUPMessage extends AgentMessage {          module.handleTyped(this);      } -    public AgentMessage getContent() { +    public RemoteGossipGirlMessage getContent() {          return content;      } -    public void setContent(AgentMessage content) { +    public void setContent(RemoteGossipGirlMessage content) {          this.content = content;      } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java index 8c6db8f..501c76e 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUP.java @@ -23,30 +23,19 @@ import java.util.concurrent.atomic.AtomicBoolean;   *  due to ValueContact design   */ -// TODO set server port in global config - must be the same everywhere -// TODO same with buffer size - -// TODO separate server like newapiimpl -// TODO add timestamps as close to sending as possible - -// TODO wysylac tylko remotegossipgirl message -// TODO update timestampow odpowiedni w tym remotegossipgirlmessage - -public class UDUP extends Module implements Runnable { +public class UDUP extends Module {      private UDUPClient client;      private UDUPServer server; -    private final AtomicBoolean running; -    public UDUP(InetAddress serverAddr, -                int serverPort, +    public UDUP(int serverPort,                  int timeout, -                int bufferSize) { +                int bufferSize, +                UDUPServer server) {          super(ModuleType.UDP); -        this.running = new AtomicBoolean(false);          try {              this.client = new UDUPClient(this, serverPort, bufferSize); -            this.server = new UDUPServer(this, serverAddr, serverPort, bufferSize); -            this.running.getAndSet(true); +            this.server = server; +            this.server.setUDUP(this);          } catch (SocketException e) {              e.printStackTrace();              this.client.close(); @@ -54,17 +43,8 @@ public class UDUP extends Module implements Runnable {          }      } -    public void run() { -        System.out.println("UDP server running"); -        while(this.running.get()) { -            try { -                this.server.acceptMessage(); -            } catch (IOException | InterruptedException e) { -                e.printStackTrace(); -                this.running.getAndSet(false); -                this.server.close(); -            } -        } +    public UDUPServer getServer() { +        return this.server;      }      public void handleTyped(UDUPMessage event) throws InterruptedException { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java index 2e4f0b4..089cad2 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java @@ -1,6 +1,8 @@  package pl.edu.mimuw.cloudatlas.agent.modules;  import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.model.ValueTime; +import pl.edu.mimuw.cloudatlas.model.ValueUtils;  import javax.xml.crypto.Data;  import java.io.IOException; @@ -67,9 +69,12 @@ public class UDUPClient {      public void sendMessage(UDUPMessage msg) throws IOException {          int packetNo = 1;          byte[] sendBuf; -        byte[] dataBuf = this.serializer.serialize(msg); +        byte[] dataBuf;          this.lastTransmission++; +        msg.getContent().setSentTimestamp(ValueUtils.currentTime()); +        dataBuf = this.serializer.serialize(msg); +          do {              sendBuf = packSendBuffer(this.lastTransmission, packetNo, dataBuf);              DatagramPacket packet = new DatagramPacket(sendBuf, 0, sendBuf.length, msg.getContact().getAddress(), this.serverPort); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java index 40c4d7c..0f7b99d 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java @@ -12,6 +12,7 @@ import java.io.ByteArrayOutputStream;  import java.net.Inet4Address;  import java.net.InetAddress;  import java.net.UnknownHostException; +import java.rmi.Remote;  import java.util.ArrayList;  import java.util.HashSet;  import java.util.LinkedHashMap; @@ -138,6 +139,9 @@ public class UDUPSerializer {          kryo.register(UDUPMessage.class);          kryo.register(UpdateAttributesMessage.class);          kryo.register(UpdateQueriesMessage.class); +        kryo.register(GossipGirlMessage.class); +        kryo.register(GossipGirlMessage.Type.class); +        kryo.register(RemoteGossipGirlMessage.class);          // modules          kryo.register(TimerScheduledTask.class); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index b71a475..94882e4 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -2,6 +2,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules;  import com.google.common.primitives.Bytes;  import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; +import pl.edu.mimuw.cloudatlas.model.ValueUtils;  import java.io.ByteArrayInputStream;  import java.io.IOException; @@ -10,22 +11,42 @@ import java.nio.ByteBuffer;  import java.util.ArrayList;  import java.util.HashMap;  import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicBoolean; -public class UDUPServer { +public class UDUPServer implements Runnable {      private UDUP udp;      private UDUPSerializer serializer;      private DatagramSocket socket;      private InetAddress address;      private HashMap<String, ArrayList<byte[]>> partialPackets;      private int bufSize; +    private final AtomicBoolean running; -    public UDUPServer(UDUP udp, InetAddress addr, int port, int bufSize) throws SocketException { -        this.udp = udp; +    public UDUPServer(InetAddress addr, int port, int bufSize) throws SocketException {          this.socket = new DatagramSocket(port, addr);          this.address = addr;          this.bufSize = bufSize;          this.partialPackets = new HashMap<>();          this.serializer = new UDUPSerializer(); +        this.running = new AtomicBoolean(false); +    } + +    public void setUDUP(UDUP udup) { +        this.udp = udup; +    } + +    public void run() { +        System.out.println("UDP server running"); +        this.running.getAndSet(true); +        while(this.running.get()) { +            try { +                this.acceptMessage(); +            } catch (IOException | InterruptedException e) { +                e.printStackTrace(); +                this.running.getAndSet(false); +                this.close(); +            } +        }      }      public void acceptMessage() throws IOException, InterruptedException { @@ -34,14 +55,19 @@ public class UDUPServer {          String transmissionID = packTransmissionID(transmissionNo, packet.getAddress());          int packetNo = readPacketNo(packet.getData());          byte[] packetData = trimPacketBuffer(packet.getData()); +        UDUPMessage msg;          if (packetNo == 1 && packet.getLength() < this.bufSize) { -            UDUPMessage msg = this.serializer.deserialize(packetData); +            msg = this.serializer.deserialize(packetData); +            msg.getContent().setReceivedTimestamp(ValueUtils.currentTime());              System.out.println("UDP received message " + msg.getContent().getMessageId()); -            sendMessageFurther(msg);          } else {              System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo); -            this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); +            msg = this.addPartialMessageAndCheckSerialization(transmissionID, packetNo, packetData); +        } + +        if (msg != null) { +            sendMessageFurther(msg);          }      } @@ -104,14 +130,16 @@ public class UDUPServer {          return fullData;      } -    public void addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { +    public UDUPMessage addPartialMessageAndCheckSerialization(String transmissionID, int newPacketNo, byte[] packetData) { +        UDUPMessage msg = null; +          if (this.partialPackets.containsKey(transmissionID)) {              try {                  byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData); -                UDUPMessage msg = this.serializer.deserialize(allPacketData); +                msg = this.serializer.deserialize(allPacketData); +                msg.getContent().setReceivedTimestamp(ValueUtils.currentTime());                  this.partialPackets.remove(transmissionID);                  System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId()); -                this.udp.sendMessage(msg.getContent());              } catch (Error | Exception e) {                  System.out.println("Kryo didn't deserialize partial message, waiting to receive the rest");              } @@ -120,6 +148,8 @@ public class UDUPServer {              newTransmission.add(newPacketNo-1, packetData);              this.partialPackets.put(transmissionID, newTransmission);          } + +        return msg;      }      public void close() { diff --git a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java index 13a322b..ac2c587 100644 --- a/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java +++ b/src/test/java/pl/edu/mimuw/cloudatlas/agent/UDUPTest.java @@ -1,17 +1,21 @@  package pl.edu.mimuw.cloudatlas.agent;  import org.junit.*; +import pl.edu.mimuw.cloudatlas.agent.messages.GossipGirlMessage; +import pl.edu.mimuw.cloudatlas.agent.messages.RemoteGossipGirlMessage;  import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;  import pl.edu.mimuw.cloudatlas.agent.messages.UpdateAttributesMessage;  import pl.edu.mimuw.cloudatlas.agent.modules.Module;  import pl.edu.mimuw.cloudatlas.agent.modules.ModuleType;  import pl.edu.mimuw.cloudatlas.agent.modules.UDUP; +import pl.edu.mimuw.cloudatlas.agent.modules.UDUPServer;  import pl.edu.mimuw.cloudatlas.model.AttributesMap;  import pl.edu.mimuw.cloudatlas.model.PathName;  import pl.edu.mimuw.cloudatlas.model.ValueContact;  import pl.edu.mimuw.cloudatlas.model.ValueInt;  import java.net.InetAddress; +import java.net.SocketException;  import java.net.UnknownHostException;  // TODO add serialization tests that target custom serializers (type collections!) @@ -22,29 +26,34 @@ public class UDUPTest {      public void messageBetweenUDUPs() {          UDUP udp1 = null;          UDUP udp2 = null; +        UDUPServer server1 = null; +        UDUPServer server2 = null;          UDUPMessage msg1 = null;          boolean testSuccess = true; +        int timeout = 5000;          try {              System.out.println("Starting udp1"); +            server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5996, 1000);              udp1 = new UDUP( -                    InetAddress.getByName("127.0.0.2"), -                    5999, -                    5000, -                    1000); +                    5997, +                    timeout, +                    1000, +                    server1);              System.out.println("Starting udp2"); +            server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5996, 1000);              udp2 = new UDUP( -                    InetAddress.getByName("127.0.0.3"), -                    5999, -                    5000, -                    1000); +                    5997, +                    timeout, +                    1000, +                    server2); -            UDUPMessage testContent = new UDUPMessage(); +            RemoteGossipGirlMessage testContent = +                    new RemoteGossipGirlMessage("singleMsgTest", 0, GossipGirlMessage.Type.NO_CO_TAM);              testContent.setDestinationModule(ModuleType.TEST); -            testContent.setMessageId("singleMsgTest");              msg1 = new UDUPMessage(                      "udup1", @@ -52,23 +61,23 @@ public class UDUPTest {                      testContent              ); -        } catch (UnknownHostException e) { +        } catch (UnknownHostException | SocketException e) {              e.printStackTrace(); +            testSuccess = false;          } -        Thread udpThread1 = new Thread(udp1); +        Thread udpThread1 = new Thread(server1);          udpThread1.start(); -        Thread udpThread2 = new Thread(udp2); +        Thread udpThread2 = new Thread(server2);          udpThread2.start();          try { -            Thread.sleep(5000); -            System.out.println("Sending message"); +            Thread.sleep(500);              if (udp1 == null | udp2 == null) { -                Assert.fail("UDPs not initialized"); +                testSuccess = false;              } else {                  udp1.handle(msg1); -                Thread.sleep(10000); +                Thread.sleep(timeout);              }          } catch (InterruptedException | Module.InvalidMessageType e) {              e.printStackTrace(); @@ -89,30 +98,34 @@ public class UDUPTest {      public void bigMessageBetweenUDUPs() {          UDUP udp1 = null;          UDUP udp2 = null; +        UDUPServer server1 = null; +        UDUPServer server2 = null;          UDUPMessage msg1 = null;          boolean testSuccess = true; -        int timeout = 5000; +        int timeout = 3000;          try {              System.out.println("Starting udp1"); +            server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5991, 1000);              udp1 = new UDUP( -                    InetAddress.getByName("127.0.0.2"), -                    5998, +                    5997,                      timeout, -                    30); +                    30, +                    server1);              System.out.println("Starting udp2"); +            server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5991, 1000);              udp2 = new UDUP( -                    InetAddress.getByName("127.0.0.3"), -                    5998, +                    5997,                      timeout, -                    30); +                    30, +                    server2); -            UDUPMessage testContent = new UDUPMessage(); +            RemoteGossipGirlMessage testContent = +                    new RemoteGossipGirlMessage("bigMsgTest", 0, GossipGirlMessage.Type.NO_CO_TAM);              testContent.setDestinationModule(ModuleType.TEST); -            testContent.setMessageId("bigMsgTest");              msg1 = new UDUPMessage(                      "udup1", @@ -120,18 +133,24 @@ public class UDUPTest {                      testContent              ); -        } catch (UnknownHostException e) { +        } catch (UnknownHostException | SocketException e) {              e.printStackTrace(); +            testSuccess = false;          } -        Thread udpThread1 = new Thread(udp1); +        Thread udpThread1 = new Thread(server1);          udpThread1.start(); -        Thread udpThread2 = new Thread(udp2); +        Thread udpThread2 = new Thread(server2);          udpThread2.start();          try { -            udp1.handle(msg1); -            Thread.sleep(timeout + 1000); +            Thread.sleep(500); +            if (udp1 == null | udp2 == null) { +                testSuccess = false; +            } else { +                udp1.handle(msg1); +                Thread.sleep(timeout); +            }          } catch (InterruptedException | Module.InvalidMessageType e) {              e.printStackTrace();              testSuccess = false; @@ -152,32 +171,36 @@ public class UDUPTest {      public void sendMultipleMessages() {          UDUP udp1 = null;          UDUP udp2 = null; +        UDUPServer server1 = null; +        UDUPServer server2 = null;          UDUPMessage msg1 = null;          UDUPMessage msg2 = null;          UDUPMessage msg3 = null;          boolean testSuccess = true; -        int timeout = 5000; +        int timeout = 3000;          try {              System.out.println("Starting udp1"); +            server1 = new UDUPServer(InetAddress.getByName("127.0.0.2"), 5997, 1000);              udp1 = new UDUP( -                    InetAddress.getByName("127.0.0.2"),                      5997,                      timeout, -                    1000); +                    1000, +                    server1);              System.out.println("Starting udp2"); +            server2 = new UDUPServer(InetAddress.getByName("127.0.0.3"), 5997, 1000);              udp2 = new UDUP( -                    InetAddress.getByName("127.0.0.3"),                      5997,                      timeout, -                    1000); +                    1000, +                    server2); -            UDUPMessage testContent = new UDUPMessage(); +            RemoteGossipGirlMessage testContent = +                    new RemoteGossipGirlMessage("multipleMsgTest", 0, GossipGirlMessage.Type.NO_CO_TAM);              testContent.setDestinationModule(ModuleType.TEST); -            testContent.setMessageId("multipleMsgTest");              msg1 = new UDUPMessage(                      "udup1", @@ -197,22 +220,38 @@ public class UDUPTest {                      testContent              ); -        } catch (UnknownHostException e) { +        } catch (UnknownHostException | SocketException e) {              e.printStackTrace(); +            testSuccess = false;          } -        Thread udpThread1 = new Thread(udp1); +        Thread udpThread1 = new Thread(server1);          udpThread1.start(); -        Thread udpThread2 = new Thread(udp2); +        Thread udpThread2 = new Thread(server2);          udpThread2.start();          try { -            udp1.handle(msg1); -            udp1.handle(msg2); -            udp1.handle(msg3); -            Thread.sleep(timeout + 2000); +            Thread.sleep(500); +            if (udp1 == null | udp2 == null) { +                testSuccess = false; +            } else { +                udp1.handle(msg1); +                udp1.handle(msg2); +                udp1.handle(msg3); +                Thread.sleep(timeout); +            }          } catch (InterruptedException | Module.InvalidMessageType e) {              e.printStackTrace(); +            testSuccess = false; +        } + +        udpThread1.interrupt(); +        udpThread2.interrupt(); + +        if (testSuccess) { +            Assert.assertTrue(true); +        } else { +            Assert.fail();          }      }  }  |