m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagdalena Grodzińska <mag.grodzinska@gmail.com>2020-01-15 01:24:27 +0100
committerMagdalena Grodzińska <mag.grodzinska@gmail.com>2020-01-15 01:24:27 +0100
commitb5fa21f2b22a97fb5e43bfd02b77827c158d02f7 (patch)
treece2079b2a93a527a793cfa7b305effd810a6763a
parent1986aaadaf6dd4316dce0186616a515e6721628c (diff)
Fix value query serialization
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/ByteSerializer.java75
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java6
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java236
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java9
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java44
6 files changed, 98 insertions, 274 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/ByteSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/ByteSerializer.java
index 94c8d65..74aaf31 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/ByteSerializer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/ByteSerializer.java
@@ -10,6 +10,7 @@ import pl.edu.mimuw.cloudatlas.agent.modules.RecursiveScheduledTask;
import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask;
import pl.edu.mimuw.cloudatlas.model.*;
import pl.edu.mimuw.cloudatlas.querysigner.QueryData;
+import pl.edu.mimuw.cloudatlas.querysigner.QueryUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -17,10 +18,8 @@ import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.rmi.Remote;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
+import java.text.DateFormat;
+import java.util.*;
/**
* Serializes classes to and from byte arrays
@@ -79,12 +78,16 @@ public class ByteSerializer {
public void write(Kryo kryo, Output output, Object object) {
ValueList vl = (ValueList) object;
kryo.writeObject(output, ((TypeCollection) vl.getType()).getElementType());
- kryo.writeObject(output, vl.getValue());
+ ArrayList<Value> al = new ArrayList<>();
+ for (Value v : vl.getValue()) {
+ al.add(v);
+ }
+ kryo.writeObject(output, al);
}
@Override
public Object read(Kryo kryo, Input input, Class type) {
- Type t = kryo.readObject(input, Type.class);
+ TypePrimitive t = kryo.readObject(input, TypePrimitive.class);
ArrayList list = kryo.readObject(input, ArrayList.class);
return new ValueList(list, t);
}
@@ -95,25 +98,75 @@ public class ByteSerializer {
public void write(Kryo kryo, Output output, Object object) {
ValueSet vs = (ValueSet) object;
kryo.writeObject(output, ((TypeCollection) vs.getType()).getElementType());
- kryo.writeObject(output, vs.getValue());
+ HashSet<Value> hs = new HashSet();
+ for (Value v : vs.getValue()) {
+ hs.add(v);
+ }
+ kryo.writeObject(output, hs);
}
@Override
public Object read(Kryo kryo, Input input, Class type) {
- Type t = kryo.readObject(input, Type.class);
+ TypePrimitive t = kryo.readObject(input, TypePrimitive.class);
HashSet set = kryo.readObject(input, HashSet.class);
return new ValueSet(set, t);
}
});
+ kryo.register(AttributesMap.class, new Serializer() {
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ AttributesMap attribMap = (AttributesMap) object;
+ HashMap<Attribute, Value> hashMap = new HashMap<>();
+
+ for (Map.Entry<Attribute, Value> e : attribMap) {
+ hashMap.put(e.getKey(), e.getValue());
+ }
+
+ kryo.writeObject(output, hashMap);
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
+ HashMap<Attribute, Value> hashMap = kryo.readObject(input, HashMap.class);
+ AttributesMap attribMap = new AttributesMap();
+ for (Map.Entry<Attribute, Value> e : hashMap.entrySet()) {
+ attribMap.add(e.getKey(), e.getValue());
+ }
+ return attribMap;
+ }
+ });
+
+ kryo.register(ValueQuery.class, new Serializer() {
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ ValueQuery vq = (ValueQuery) object;
+ kryo.writeObject(output, QueryUtils.constructQueryData(vq));
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
+ QueryData qd = kryo.readObject(input, QueryData.class);
+ ValueQuery vq = null;
+ try {
+ vq = new ValueQuery(qd);
+ } catch (Exception e) {
+ System.out.println("Value query deserialization failed");
+ e.printStackTrace();
+ }
+ return vq;
+ }
+ });
+
// model
kryo.register(Value.class);
kryo.register(ValueBoolean.class);
kryo.register(ValueContact.class);
+ kryo.register(ValueDouble.class);
kryo.register(ValueDuration.class);
+ kryo.register(ValueDouble.class);
kryo.register(ValueInt.class);
kryo.register(ValueNull.class);
- kryo.register(ValueQuery.class);
kryo.register(ValueSet.class);
kryo.register(ValueString.class);
kryo.register(ValueTime.class);
@@ -125,6 +178,7 @@ public class ByteSerializer {
kryo.register(AttributesUtil.class);
kryo.register(Type.class);
+ kryo.register(Type.PrimaryType.class);
kryo.register(TypeCollection.class);
kryo.register(TypePrimitive.class);
@@ -160,7 +214,10 @@ public class ByteSerializer {
kryo.register(byte[].class);
kryo.register(LinkedHashMap.class);
kryo.register(HashMap.class);
+ kryo.register(HashSet.class);
kryo.register(ModuleType.class);
+ kryo.register(DateFormat.class);
+ kryo.register(ArrayList.class);
kryo.register(QueryData.class);
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
index 382160e..c9c6cc7 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java
@@ -56,7 +56,7 @@ public class Stanik extends Module {
private void setDefaultQuery(String name, String query) {
try {
- ValueQuery queryValue = new ValueQuery(query);
+ ValueQuery queryValue = new ValueQuery(query, 0);
queries.put(new Attribute(name), queryValue);
} catch (Exception e) {
System.out.println("ERROR: failed to compile default query");
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
index 089cad2..97b68d1 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java
@@ -1,17 +1,19 @@
package pl.edu.mimuw.cloudatlas.agent.modules;
+import pl.edu.mimuw.cloudatlas.ByteSerializer;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
import pl.edu.mimuw.cloudatlas.model.ValueTime;
import pl.edu.mimuw.cloudatlas.model.ValueUtils;
import javax.xml.crypto.Data;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
public class UDUPClient {
private UDUP udp;
- private UDUPSerializer serializer;
+ private ByteSerializer serializer;
private int serverPort;
private DatagramSocket socket;
private int bufsize;
@@ -22,7 +24,7 @@ public class UDUPClient {
this.serverPort = serverPort;
this.socket = new DatagramSocket();
this.bufsize = bufferSize;
- this.serializer = new UDUPSerializer();
+ this.serializer = new ByteSerializer();
this.lastTransmission = 0;
}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
deleted file mode 100644
index cfaa37e..0000000
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java
+++ /dev/null
@@ -1,236 +0,0 @@
-package pl.edu.mimuw.cloudatlas.agent.modules;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import org.assertj.core.data.MapEntry;
-import pl.edu.mimuw.cloudatlas.agent.messages.*;
-import pl.edu.mimuw.cloudatlas.interpreter.query.Absyn.Program;
-import pl.edu.mimuw.cloudatlas.model.*;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.rmi.Remote;
-import java.text.DateFormat;
-import java.util.*;
-
-/**
- * Serializes classes to and from byte arrays for UDP use
- */
-public class UDUPSerializer {
- private Kryo kryo;
-
- UDUPSerializer() {
- kryo = new Kryo();
- kryo.setReferences(true);
- kryo.setRegistrationRequired(true);
- registerClasses();
- }
-
- private void registerClasses() {
-
- kryo.register(Inet4Address.class, new Serializer() {
-
- @Override
- public void write(Kryo kryo, Output output, Object object) {
- InetAddress ia = (InetAddress) object;
- kryo.writeObject(output, ia.getAddress());
- }
-
- @Override
- public Object read(Kryo kryo, Input input, Class type) {
- try {
- byte[] buf = kryo.readObject(input, byte[].class);
- InetAddress addr = Inet4Address.getByAddress(buf);
- return addr;
- } catch (UnknownHostException e) {
- System.out.println("Custom InetAddress read failed");
- e.printStackTrace();
- return null;
- }
- }
- });
-
- kryo.register(PathName.class, new Serializer() {
-
- @Override
- public void write(Kryo kryo, Output output, Object object) {
- PathName pn = (PathName) object;
- kryo.writeObject(output, pn.getName());
- }
-
- @Override
- public Object read(Kryo kryo, Input input, Class type) {
- String addr = input.readString();
- return new PathName(addr);
- }
- });
-
- kryo.register(ValueList.class, new Serializer() {
- @Override
- public void write(Kryo kryo, Output output, Object object) {
- ValueList vl = (ValueList) object;
- kryo.writeObject(output, ((TypeCollection) vl.getType()).getElementType());
- ArrayList<Value> al = new ArrayList<>();
- for (Value v : vl.getValue()) {
- al.add(v);
- }
- kryo.writeObject(output, al);
- }
-
- @Override
- public Object read(Kryo kryo, Input input, Class type) {
- TypePrimitive t = kryo.readObject(input, TypePrimitive.class);
- ArrayList list = kryo.readObject(input, ArrayList.class);
- return new ValueList(list, t);
- }
- });
-
- kryo.register(ValueSet.class, new Serializer() {
- @Override
- public void write(Kryo kryo, Output output, Object object) {
- ValueSet vs = (ValueSet) object;
- kryo.writeObject(output, ((TypeCollection) vs.getType()).getElementType());
- HashSet<Value> hs = new HashSet();
- for (Value v : vs.getValue()) {
- hs.add(v);
- }
- kryo.writeObject(output, hs);
- }
-
- @Override
- public Object read(Kryo kryo, Input input, Class type) {
- TypePrimitive t = kryo.readObject(input, TypePrimitive.class);
- HashSet set = kryo.readObject(input, HashSet.class);
- return new ValueSet(set, t);
- }
- });
-
- kryo.register(AttributesMap.class, new Serializer() {
- @Override
- public void write(Kryo kryo, Output output, Object object) {
- AttributesMap attribMap = (AttributesMap) object;
- HashMap<Attribute, Value> hashMap = new HashMap<>();
-
- for (Map.Entry<Attribute, Value> e : attribMap) {
- hashMap.put(e.getKey(), e.getValue());
- }
-
- kryo.writeObject(output, hashMap);
- }
-
- @Override
- public Object read(Kryo kryo, Input input, Class type) {
- HashMap<Attribute, Value> hashMap = kryo.readObject(input, HashMap.class);
- AttributesMap attribMap = new AttributesMap();
- for (Map.Entry<Attribute, Value> e : hashMap.entrySet()) {
- attribMap.add(e.getKey(), e.getValue());
- }
- return attribMap;
- }
- });
-
- kryo.register(ValueQuery.class, new Serializer() {
- @Override
- public void write(Kryo kryo, Output output, Object object) {
- ValueQuery vq = (ValueQuery) object;
- kryo.writeObject(output, vq.getCode());
- }
-
- @Override
- public Object read(Kryo kryo, Input input, Class type) {
- String code = kryo.readObject(input, String.class);
- ValueQuery vq = null;
- try {
- vq = new ValueQuery(code);
- } catch (Exception e) {
- System.out.println("Value query deserialization failed");
- e.printStackTrace();
- }
- return vq;
- }
- });
-
- // model
- kryo.register(Value.class);
- kryo.register(ValueBoolean.class);
- kryo.register(ValueContact.class);
- kryo.register(ValueDouble.class);
- kryo.register(ValueDuration.class);
- kryo.register(ValueDouble.class);
- kryo.register(ValueInt.class);
- kryo.register(ValueNull.class);
- kryo.register(ValueSet.class);
- kryo.register(ValueString.class);
- kryo.register(ValueTime.class);
- kryo.register(ValueUtils.class);
- kryo.register(ZMI.class);
-
- kryo.register(Attribute.class);
- kryo.register(AttributesMap.class);
- kryo.register(AttributesUtil.class);
-
- kryo.register(Type.class);
- kryo.register(Type.PrimaryType.class);
- kryo.register(TypeCollection.class);
- kryo.register(TypePrimitive.class);
-
- // messages in chronological order so it's easier to keep track
- kryo.register(AgentMessage.class);
- kryo.register(AttributesMessage.class);
- kryo.register(GetStateMessage.class);
- kryo.register(HejkaMessage.class);
- kryo.register(NoCoTamMessage.class);
- kryo.register(QueryMessage.class);
- kryo.register(QurnikMessage.class);
- kryo.register(RemikMessage.class);
- kryo.register(RemoveZMIMessage.class);
- kryo.register(RequestStateMessage.class);
- kryo.register(ResponseMessage.class);
- kryo.register(RunQueriesMessage.class);
- kryo.register(SetAttributeMessage.class);
- kryo.register(StanikMessage.Type.class);
- kryo.register(StanikMessage.class);
- kryo.register(TimerSchedulerMessage.class);
- kryo.register(UDUPMessage.class);
- kryo.register(UpdateAttributesMessage.class);
- kryo.register(UpdateQueriesMessage.class);
- kryo.register(GossipGirlMessage.class);
- kryo.register(GossipGirlMessage.Type.class);
- kryo.register(RemoteGossipGirlMessage.class);
-
- // modules
- kryo.register(TimerScheduledTask.class);
- kryo.register(RecursiveScheduledTask.class);
-
- // other
- kryo.register(byte[].class);
- kryo.register(LinkedHashMap.class);
- kryo.register(HashMap.class);
- kryo.register(HashSet.class);
- kryo.register(ModuleType.class);
- kryo.register(DateFormat.class);
- kryo.register(ArrayList.class);
- }
-
- public UDUPMessage deserialize(byte[] packetData) {
- ByteArrayInputStream in = new ByteArrayInputStream(packetData);
- Input kryoInput = new Input(in);
- UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class);
- return msg;
- }
-
- public byte[] serialize(UDUPMessage msg) {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- Output kryoOut = new Output(out);
- System.out.println("SERIALIZING " + msg.getContent());
- kryo.writeObject(kryoOut, msg);
- kryoOut.flush();
- kryoOut.close();
- return out.toByteArray();
- }
-}
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
index 3da380c..dae0420 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java
@@ -1,6 +1,7 @@
package pl.edu.mimuw.cloudatlas.agent.modules;
import com.google.common.primitives.Bytes;
+import pl.edu.mimuw.cloudatlas.ByteSerializer;
import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage;
import pl.edu.mimuw.cloudatlas.model.ValueUtils;
@@ -15,7 +16,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class UDUPServer implements Runnable {
private UDUP udp;
- private UDUPSerializer serializer;
+ private ByteSerializer serializer;
private DatagramSocket socket;
private InetAddress address;
private HashMap<String, ArrayList<byte[]>> partialPackets;
@@ -27,7 +28,7 @@ public class UDUPServer implements Runnable {
this.address = addr;
this.bufSize = bufSize;
this.partialPackets = new HashMap<>();
- this.serializer = new UDUPSerializer();
+ this.serializer = new ByteSerializer();
this.running = new AtomicBoolean(false);
}
@@ -58,7 +59,7 @@ public class UDUPServer implements Runnable {
UDUPMessage msg;
if (packetNo == 1 && packet.getLength() < this.bufSize) {
- msg = this.serializer.deserialize(packetData);
+ msg = (UDUPMessage) this.serializer.deserialize(packetData, UDUPMessage.class);
System.out.println("UDP received message " + msg.getContent().getMessageId());
} else {
System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo);
@@ -137,7 +138,7 @@ public class UDUPServer implements Runnable {
if (this.partialPackets.containsKey(transmissionID)) {
try {
byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData);
- msg = this.serializer.deserialize(allPacketData);
+ msg = (UDUPMessage) this.serializer.deserialize(allPacketData, UDUPMessage.class);
this.partialPackets.remove(transmissionID);
System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId());
} catch (Error | Exception e) {
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java
index 95f826a..e577c13 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java
@@ -28,27 +28,27 @@ public class ValueQuery extends Value {
*
* @param query the code of the query
*/
- public ValueQuery(String query) throws Exception {
- this.code = query;
- if (!query.isEmpty()) {
- Yylex lex = new Yylex(new ByteArrayInputStream(query.getBytes()));
- this.query = (new parser(lex)).pProgram();
- }
- this.signature = null;
- this.timestamp = System.currentTimeMillis();
- this.installed = true;
- }
-
- public ValueQuery(String query, byte[] querySignature) throws Exception {
- this.code = query;
- if (!query.isEmpty()) {
- Yylex lex = new Yylex(new ByteArrayInputStream(query.getBytes()));
- this.query = (new parser(lex)).pProgram();
- }
- this.signature = querySignature;
- this.timestamp = System.currentTimeMillis();
- this.installed = true;
- }
+// public ValueQuery(String query) throws Exception {
+// this.code = query;
+// if (!query.isEmpty()) {
+// Yylex lex = new Yylex(new ByteArrayInputStream(query.getBytes()));
+// this.query = (new parser(lex)).pProgram();
+// }
+// this.signature = null;
+// this.timestamp = System.currentTimeMillis();
+// this.installed = true;
+// }
+
+// public ValueQuery(String query, byte[] querySignature) throws Exception {
+// this.code = query;
+// if (!query.isEmpty()) {
+// Yylex lex = new Yylex(new ByteArrayInputStream(query.getBytes()));
+// this.query = (new parser(lex)).pProgram();
+// }
+// this.signature = querySignature;
+// this.timestamp = System.currentTimeMillis();
+// this.installed = true;
+// }
public ValueQuery(QueryData queryData) throws Exception {
this.code = queryData.getCode();
@@ -57,7 +57,7 @@ public class ValueQuery extends Value {
this.query = (new parser(lex)).pProgram();
}
this.signature = queryData.getSignature();
- this.timestamp = System.currentTimeMillis();
+ this.timestamp = queryData.getTimestamp();
this.installed = queryData.isInstalled();
}