m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/pl/edu/mimuw/cloudatlas/agent
diff options
context:
space:
mode:
authorMagdalena GrodziƄska <mag.grodzinska@gmail.com>2020-01-15 02:57:12 +0100
committerGitHub <noreply@github.com>2020-01-15 02:57:12 +0100
commit8678d8c922b439ff4a58e4b139c4085515a890f0 (patch)
tree8c1cffd6adadaf17b09880e8fb9487b0fe553f33 /src/main/java/pl/edu/mimuw/cloudatlas/agent
parent1986aaadaf6dd4316dce0186616a515e6721628c (diff)
parent5096d5fdc1a550a53511ca4478394f778cef10be (diff)
Merge pull request #122 from m-chrzan/value_query_serialization_fix
Value query serialization fix
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent')
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java6
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java236
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java9
4 files changed, 10 insertions, 243 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
index 382160e..c9c6cc7 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
@@ -56,7 +56,7 @@ public class Stanik extends Module {
private void setDefaultQuery(String name, String query) {
try {
- ValueQuery queryValue = new ValueQuery(query);
+ ValueQuery queryValue = new ValueQuery(query, 0);
queries.put(new Attribute(name), queryValue);
} catch (Exception e) {
System.out.println("ERROR: failed to compile default query");
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
index 089cad2..97b68d1 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
@@ -1,17 +1,19 @@
package pl.edu.mimuw.cloudatlas.agent.modules;
+import pl.edu.mimuw.cloudatlas.ByteSerializer;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
import pl.edu.mimuw.cloudatlas.model.ValueTime;
import pl.edu.mimuw.cloudatlas.model.ValueUtils;
import javax.xml.crypto.Data;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
public class UDUPClient {
private UDUP udp;
- private UDUPSerializer serializer;
+ private ByteSerializer serializer;
private int serverPort;
private DatagramSocket socket;
private int bufsize;
@@ -22,7 +24,7 @@ public class UDUPClient {
this.serverPort = serverPort;
this.socket = new DatagramSocket();
this.bufsize = bufferSize;
- this.serializer = new UDUPSerializer();
+ this.serializer = new ByteSerializer();
this.lastTransmission = 0;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
deleted file mode 100644
index cfaa37e..0000000
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
+++ /dev/null
@@ -1,236 +0,0 @@
-package pl.edu.mimuw.cloudatlas.agent.modules;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import org.assertj.core.data.MapEntry;
-import pl.edu.mimuw.cloudatlas.agent.messages.*;
-import pl.edu.mimuw.cloudatlas.interpreter.query.Absyn.Program;
-import pl.edu.mimuw.cloudatlas.model.*;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.rmi.Remote;
-import java.text.DateFormat;
-import java.util.*;
-
-/**
- * Serializes classes to and from byte arrays for UDP use
- */
-public class UDUPSerializer {
- private Kryo kryo;
-
- UDUPSerializer() {
- kryo = new Kryo();
- kryo.setReferences(true);
- kryo.setRegistrationRequired(true);
- registerClasses();
- }
-
- private void registerClasses() {
-
- kryo.register(Inet4Address.class, new Serializer() {
-
- @Override
- public void write(Kryo kryo, Output output, Object object) {
- InetAddress ia = (InetAddress) object;
- kryo.writeObject(output, ia.getAddress());
- }
-
- @Override
- public Object read(Kryo kryo, Input input, Class type) {
- try {
- byte[] buf = kryo.readObject(input, byte[].class);
- InetAddress addr = Inet4Address.getByAddress(buf);
- return addr;
- } catch (UnknownHostException e) {
- System.out.println("Custom InetAddress read failed");
- e.printStackTrace();
- return null;
- }
- }
- });
-
- kryo.register(PathName.class, new Serializer() {
-
- @Override
- public void write(Kryo kryo, Output output, Object object) {
- PathName pn = (PathName) object;
- kryo.writeObject(output, pn.getName());
- }
-
- @Override
- public Object read(Kryo kryo, Input input, Class type) {
- String addr = input.readString();
- return new PathName(addr);
- }
- });
-
- kryo.register(ValueList.class, new Serializer() {
- @Override
- public void write(Kryo kryo, Output output, Object object) {
- ValueList vl = (ValueList) object;
- kryo.writeObject(output, ((TypeCollection) vl.getType()).getElementType());
- ArrayList<Value> al = new ArrayList<>();
- for (Value v : vl.getValue()) {
- al.add(v);
- }
- kryo.writeObject(output, al);
- }
-
- @Override
- public Object read(Kryo kryo, Input input, Class type) {
- TypePrimitive t = kryo.readObject(input, TypePrimitive.class);
- ArrayList list = kryo.readObject(input, ArrayList.class);
- return new ValueList(list, t);
- }
- });
-
- kryo.register(ValueSet.class, new Serializer() {
- @Override
- public void write(Kryo kryo, Output output, Object object) {
- ValueSet vs = (ValueSet) object;
- kryo.writeObject(output, ((TypeCollection) vs.getType()).getElementType());
- HashSet<Value> hs = new HashSet();
- for (Value v : vs.getValue()) {
- hs.add(v);
- }
- kryo.writeObject(output, hs);
- }
-
- @Override
- public Object read(Kryo kryo, Input input, Class type) {
- TypePrimitive t = kryo.readObject(input, TypePrimitive.class);
- HashSet set = kryo.readObject(input, HashSet.class);
- return new ValueSet(set, t);
- }
- });
-
- kryo.register(AttributesMap.class, new Serializer() {
- @Override
- public void write(Kryo kryo, Output output, Object object) {
- AttributesMap attribMap = (AttributesMap) object;
- HashMap<Attribute, Value> hashMap = new HashMap<>();
-
- for (Map.Entry<Attribute, Value> e : attribMap) {
- hashMap.put(e.getKey(), e.getValue());
- }
-
- kryo.writeObject(output, hashMap);
- }
-
- @Override
- public Object read(Kryo kryo, Input input, Class type) {
- HashMap<Attribute, Value> hashMap = kryo.readObject(input, HashMap.class);
- AttributesMap attribMap = new AttributesMap();
- for (Map.Entry<Attribute, Value> e : hashMap.entrySet()) {
- attribMap.add(e.getKey(), e.getValue());
- }
- return attribMap;
- }
- });
-
- kryo.register(ValueQuery.class, new Serializer() {
- @Override
- public void write(Kryo kryo, Output output, Object object) {
- ValueQuery vq = (ValueQuery) object;
- kryo.writeObject(output, vq.getCode());
- }
-
- @Override
- public Object read(Kryo kryo, Input input, Class type) {
- String code = kryo.readObject(input, String.class);
- ValueQuery vq = null;
- try {
- vq = new ValueQuery(code);
- } catch (Exception e) {
- System.out.println("Value query deserialization failed");
- e.printStackTrace();
- }
- return vq;
- }
- });
-
- // model
- kryo.register(Value.class);
- kryo.register(ValueBoolean.class);
- kryo.register(ValueContact.class);
- kryo.register(ValueDouble.class);
- kryo.register(ValueDuration.class);
- kryo.register(ValueDouble.class);
- kryo.register(ValueInt.class);
- kryo.register(ValueNull.class);
- kryo.register(ValueSet.class);
- kryo.register(ValueString.class);
- kryo.register(ValueTime.class);
- kryo.register(ValueUtils.class);
- kryo.register(ZMI.class);
-
- kryo.register(Attribute.class);
- kryo.register(AttributesMap.class);
- kryo.register(AttributesUtil.class);
-
- kryo.register(Type.class);
- kryo.register(Type.PrimaryType.class);
- kryo.register(TypeCollection.class);
- kryo.register(TypePrimitive.class);
-
- // messages in chronological order so it's easier to keep track
- kryo.register(AgentMessage.class);
- kryo.register(AttributesMessage.class);
- kryo.register(GetStateMessage.class);
- kryo.register(HejkaMessage.class);
- kryo.register(NoCoTamMessage.class);
- kryo.register(QueryMessage.class);
- kryo.register(QurnikMessage.class);
- kryo.register(RemikMessage.class);
- kryo.register(RemoveZMIMessage.class);
- kryo.register(RequestStateMessage.class);
- kryo.register(ResponseMessage.class);
- kryo.register(RunQueriesMessage.class);
- kryo.register(SetAttributeMessage.class);
- kryo.register(StanikMessage.Type.class);
- kryo.register(StanikMessage.class);
- kryo.register(TimerSchedulerMessage.class);
- kryo.register(UDUPMessage.class);
- kryo.register(UpdateAttributesMessage.class);
- kryo.register(UpdateQueriesMessage.class);
- kryo.register(GossipGirlMessage.class);
- kryo.register(GossipGirlMessage.Type.class);
- kryo.register(RemoteGossipGirlMessage.class);
-
- // modules
- kryo.register(TimerScheduledTask.class);
- kryo.register(RecursiveScheduledTask.class);
-
- // other
- kryo.register(byte[].class);
- kryo.register(LinkedHashMap.class);
- kryo.register(HashMap.class);
- kryo.register(HashSet.class);
- kryo.register(ModuleType.class);
- kryo.register(DateFormat.class);
- kryo.register(ArrayList.class);
- }
-
- public UDUPMessage deserialize(byte[] packetData) {
- ByteArrayInputStream in = new ByteArrayInputStream(packetData);
- Input kryoInput = new Input(in);
- UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class);
- return msg;
- }
-
- public byte[] serialize(UDUPMessage msg) {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- Output kryoOut = new Output(out);
- System.out.println("SERIALIZING " + msg.getContent());
- kryo.writeObject(kryoOut, msg);
- kryoOut.flush();
- kryoOut.close();
- return out.toByteArray();
- }
-}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index 3da380c..dae0420 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -1,6 +1,7 @@
package pl.edu.mimuw.cloudatlas.agent.modules;
import com.google.common.primitives.Bytes;
+import pl.edu.mimuw.cloudatlas.ByteSerializer;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
import pl.edu.mimuw.cloudatlas.model.ValueUtils;
@@ -15,7 +16,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class UDUPServer implements Runnable {
private UDUP udp;
- private UDUPSerializer serializer;
+ private ByteSerializer serializer;
private DatagramSocket socket;
private InetAddress address;
private HashMap<String, ArrayList<byte[]>> partialPackets;
@@ -27,7 +28,7 @@ public class UDUPServer implements Runnable {
this.address = addr;
this.bufSize = bufSize;
this.partialPackets = new HashMap<>();
- this.serializer = new UDUPSerializer();
+ this.serializer = new ByteSerializer();
this.running = new AtomicBoolean(false);
}
@@ -58,7 +59,7 @@ public class UDUPServer implements Runnable {
UDUPMessage msg;
if (packetNo == 1 && packet.getLength() < this.bufSize) {
- msg = this.serializer.deserialize(packetData);
+ msg = (UDUPMessage) this.serializer.deserialize(packetData, UDUPMessage.class);
System.out.println("UDP received message " + msg.getContent().getMessageId());
} else {
System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo);
@@ -137,7 +138,7 @@ public class UDUPServer implements Runnable {
if (this.partialPackets.containsKey(transmissionID)) {
try {
byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData);
- msg = this.serializer.deserialize(allPacketData);
+ msg = (UDUPMessage) this.serializer.deserialize(allPacketData, UDUPMessage.class);
this.partialPackets.remove(transmissionID);
System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId());
} catch (Error | Exception e) {