diff options
6 files changed, 98 insertions, 274 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/ByteSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/ByteSerializer.java index 94c8d65..74aaf31 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/ByteSerializer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/ByteSerializer.java @@ -10,6 +10,7 @@ import pl.edu.mimuw.cloudatlas.agent.modules.RecursiveScheduledTask; import pl.edu.mimuw.cloudatlas.agent.modules.TimerScheduledTask; import pl.edu.mimuw.cloudatlas.model.*; import pl.edu.mimuw.cloudatlas.querysigner.QueryData; +import pl.edu.mimuw.cloudatlas.querysigner.QueryUtils; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -17,10 +18,8 @@ import java.net.Inet4Address; import java.net.InetAddress; import java.net.UnknownHostException; import java.rmi.Remote; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.HashMap; -import java.util.LinkedHashMap; +import java.text.DateFormat; +import java.util.*; /** * Serializes classes to and from byte arrays @@ -79,12 +78,16 @@ public class ByteSerializer { public void write(Kryo kryo, Output output, Object object) { ValueList vl = (ValueList) object; kryo.writeObject(output, ((TypeCollection) vl.getType()).getElementType()); - kryo.writeObject(output, vl.getValue()); + ArrayList<Value> al = new ArrayList<>(); + for (Value v : vl.getValue()) { + al.add(v); + } + kryo.writeObject(output, al); } @Override public Object read(Kryo kryo, Input input, Class type) { - Type t = kryo.readObject(input, Type.class); + TypePrimitive t = kryo.readObject(input, TypePrimitive.class); ArrayList list = kryo.readObject(input, ArrayList.class); return new ValueList(list, t); } @@ -95,25 +98,75 @@ public class ByteSerializer { public void write(Kryo kryo, Output output, Object object) { ValueSet vs = (ValueSet) object; kryo.writeObject(output, ((TypeCollection) vs.getType()).getElementType()); - kryo.writeObject(output, vs.getValue()); + HashSet<Value> hs = new HashSet(); + for (Value v : vs.getValue()) { + hs.add(v); + } + kryo.writeObject(output, hs); } @Override public Object read(Kryo kryo, Input input, Class type) { - Type t = kryo.readObject(input, Type.class); + TypePrimitive t = kryo.readObject(input, TypePrimitive.class); HashSet set = kryo.readObject(input, HashSet.class); return new ValueSet(set, t); } }); + kryo.register(AttributesMap.class, new Serializer() { + @Override + public void write(Kryo kryo, Output output, Object object) { + AttributesMap attribMap = (AttributesMap) object; + HashMap<Attribute, Value> hashMap = new HashMap<>(); + + for (Map.Entry<Attribute, Value> e : attribMap) { + hashMap.put(e.getKey(), e.getValue()); + } + + kryo.writeObject(output, hashMap); + } + + @Override + public Object read(Kryo kryo, Input input, Class type) { + HashMap<Attribute, Value> hashMap = kryo.readObject(input, HashMap.class); + AttributesMap attribMap = new AttributesMap(); + for (Map.Entry<Attribute, Value> e : hashMap.entrySet()) { + attribMap.add(e.getKey(), e.getValue()); + } + return attribMap; + } + }); + + kryo.register(ValueQuery.class, new Serializer() { + @Override + public void write(Kryo kryo, Output output, Object object) { + ValueQuery vq = (ValueQuery) object; + kryo.writeObject(output, QueryUtils.constructQueryData(vq)); + } + + @Override + public Object read(Kryo kryo, Input input, Class type) { + QueryData qd = kryo.readObject(input, QueryData.class); + ValueQuery vq = null; + try { + vq = new ValueQuery(qd); + } catch (Exception e) { + System.out.println("Value query deserialization failed"); + e.printStackTrace(); + } + return vq; + } + }); + // model kryo.register(Value.class); kryo.register(ValueBoolean.class); kryo.register(ValueContact.class); + kryo.register(ValueDouble.class); kryo.register(ValueDuration.class); + kryo.register(ValueDouble.class); kryo.register(ValueInt.class); kryo.register(ValueNull.class); - kryo.register(ValueQuery.class); kryo.register(ValueSet.class); kryo.register(ValueString.class); kryo.register(ValueTime.class); @@ -125,6 +178,7 @@ public class ByteSerializer { kryo.register(AttributesUtil.class); kryo.register(Type.class); + kryo.register(Type.PrimaryType.class); kryo.register(TypeCollection.class); kryo.register(TypePrimitive.class); @@ -160,7 +214,10 @@ public class ByteSerializer { kryo.register(byte[].class); kryo.register(LinkedHashMap.class); kryo.register(HashMap.class); + kryo.register(HashSet.class); kryo.register(ModuleType.class); + kryo.register(DateFormat.class); + kryo.register(ArrayList.class); kryo.register(QueryData.class); } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java index 382160e..c9c6cc7 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java @@ -56,7 +56,7 @@ public class Stanik extends Module { private void setDefaultQuery(String name, String query) { try { - ValueQuery queryValue = new ValueQuery(query); + ValueQuery queryValue = new ValueQuery(query, 0); queries.put(new Attribute(name), queryValue); } catch (Exception e) { System.out.println("ERROR: failed to compile default query"); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java index 089cad2..97b68d1 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java @@ -1,17 +1,19 @@ package pl.edu.mimuw.cloudatlas.agent.modules; +import pl.edu.mimuw.cloudatlas.ByteSerializer; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; import pl.edu.mimuw.cloudatlas.model.ValueTime; import pl.edu.mimuw.cloudatlas.model.ValueUtils; import javax.xml.crypto.Data; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.*; import java.nio.ByteBuffer; public class UDUPClient { private UDUP udp; - private UDUPSerializer serializer; + private ByteSerializer serializer; private int serverPort; private DatagramSocket socket; private int bufsize; @@ -22,7 +24,7 @@ public class UDUPClient { this.serverPort = serverPort; this.socket = new DatagramSocket(); this.bufsize = bufferSize; - this.serializer = new UDUPSerializer(); + this.serializer = new ByteSerializer(); this.lastTransmission = 0; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java deleted file mode 100644 index cfaa37e..0000000 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java +++ /dev/null @@ -1,236 +0,0 @@ -package pl.edu.mimuw.cloudatlas.agent.modules; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import org.assertj.core.data.MapEntry; -import pl.edu.mimuw.cloudatlas.agent.messages.*; -import pl.edu.mimuw.cloudatlas.interpreter.query.Absyn.Program; -import pl.edu.mimuw.cloudatlas.model.*; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.net.Inet4Address; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.rmi.Remote; -import java.text.DateFormat; -import java.util.*; - -/** - * Serializes classes to and from byte arrays for UDP use - */ -public class UDUPSerializer { - private Kryo kryo; - - UDUPSerializer() { - kryo = new Kryo(); - kryo.setReferences(true); - kryo.setRegistrationRequired(true); - registerClasses(); - } - - private void registerClasses() { - - kryo.register(Inet4Address.class, new Serializer() { - - @Override - public void write(Kryo kryo, Output output, Object object) { - InetAddress ia = (InetAddress) object; - kryo.writeObject(output, ia.getAddress()); - } - - @Override - public Object read(Kryo kryo, Input input, Class type) { - try { - byte[] buf = kryo.readObject(input, byte[].class); - InetAddress addr = Inet4Address.getByAddress(buf); - return addr; - } catch (UnknownHostException e) { - System.out.println("Custom InetAddress read failed"); - e.printStackTrace(); - return null; - } - } - }); - - kryo.register(PathName.class, new Serializer() { - - @Override - public void write(Kryo kryo, Output output, Object object) { - PathName pn = (PathName) object; - kryo.writeObject(output, pn.getName()); - } - - @Override - public Object read(Kryo kryo, Input input, Class type) { - String addr = input.readString(); - return new PathName(addr); - } - }); - - kryo.register(ValueList.class, new Serializer() { - @Override - public void write(Kryo kryo, Output output, Object object) { - ValueList vl = (ValueList) object; - kryo.writeObject(output, ((TypeCollection) vl.getType()).getElementType()); - ArrayList<Value> al = new ArrayList<>(); - for (Value v : vl.getValue()) { - al.add(v); - } - kryo.writeObject(output, al); - } - - @Override - public Object read(Kryo kryo, Input input, Class type) { - TypePrimitive t = kryo.readObject(input, TypePrimitive.class); - ArrayList list = kryo.readObject(input, ArrayList.class); - return new ValueList(list, t); - } - }); - - kryo.register(ValueSet.class, new Serializer() { - @Override - public void write(Kryo kryo, Output output, Object object) { - ValueSet vs = (ValueSet) object; - kryo.writeObject(output, ((TypeCollection) vs.getType()).getElementType()); - HashSet<Value> hs = new HashSet(); - for (Value v : vs.getValue()) { - hs.add(v); - } - kryo.writeObject(output, hs); - } - - @Override - public Object read(Kryo kryo, Input input, Class type) { - TypePrimitive t = kryo.readObject(input, TypePrimitive.class); - HashSet set = kryo.readObject(input, HashSet.class); - return new ValueSet(set, t); - } - }); - - kryo.register(AttributesMap.class, new Serializer() { - @Override - public void write(Kryo kryo, Output output, Object object) { - AttributesMap attribMap = (AttributesMap) object; - HashMap<Attribute, Value> hashMap = new HashMap<>(); - - for (Map.Entry<Attribute, Value> e : attribMap) { - hashMap.put(e.getKey(), e.getValue()); - } - - kryo.writeObject(output, hashMap); - } - - @Override - public Object read(Kryo kryo, Input input, Class type) { - HashMap<Attribute, Value> hashMap = kryo.readObject(input, HashMap.class); - AttributesMap attribMap = new AttributesMap(); - for (Map.Entry<Attribute, Value> e : hashMap.entrySet()) { - attribMap.add(e.getKey(), e.getValue()); - } - return attribMap; - } - }); - - kryo.register(ValueQuery.class, new Serializer() { - @Override - public void write(Kryo kryo, Output output, Object object) { - ValueQuery vq = (ValueQuery) object; - kryo.writeObject(output, vq.getCode()); - } - - @Override - public Object read(Kryo kryo, Input input, Class type) { - String code = kryo.readObject(input, String.class); - ValueQuery vq = null; - try { - vq = new ValueQuery(code); - } catch (Exception e) { - System.out.println("Value query deserialization failed"); - e.printStackTrace(); - } - return vq; - } - }); - - // model - kryo.register(Value.class); - kryo.register(ValueBoolean.class); - kryo.register(ValueContact.class); - kryo.register(ValueDouble.class); - kryo.register(ValueDuration.class); - kryo.register(ValueDouble.class); - kryo.register(ValueInt.class); - kryo.register(ValueNull.class); - kryo.register(ValueSet.class); - kryo.register(ValueString.class); - kryo.register(ValueTime.class); - kryo.register(ValueUtils.class); - kryo.register(ZMI.class); - - kryo.register(Attribute.class); - kryo.register(AttributesMap.class); - kryo.register(AttributesUtil.class); - - kryo.register(Type.class); - kryo.register(Type.PrimaryType.class); - kryo.register(TypeCollection.class); - kryo.register(TypePrimitive.class); - - // messages in chronological order so it's easier to keep track - kryo.register(AgentMessage.class); - kryo.register(AttributesMessage.class); - kryo.register(GetStateMessage.class); - kryo.register(HejkaMessage.class); - kryo.register(NoCoTamMessage.class); - kryo.register(QueryMessage.class); - kryo.register(QurnikMessage.class); - kryo.register(RemikMessage.class); - kryo.register(RemoveZMIMessage.class); - kryo.register(RequestStateMessage.class); - kryo.register(ResponseMessage.class); - kryo.register(RunQueriesMessage.class); - kryo.register(SetAttributeMessage.class); - kryo.register(StanikMessage.Type.class); - kryo.register(StanikMessage.class); - kryo.register(TimerSchedulerMessage.class); - kryo.register(UDUPMessage.class); - kryo.register(UpdateAttributesMessage.class); - kryo.register(UpdateQueriesMessage.class); - kryo.register(GossipGirlMessage.class); - kryo.register(GossipGirlMessage.Type.class); - kryo.register(RemoteGossipGirlMessage.class); - - // modules - kryo.register(TimerScheduledTask.class); - kryo.register(RecursiveScheduledTask.class); - - // other - kryo.register(byte[].class); - kryo.register(LinkedHashMap.class); - kryo.register(HashMap.class); - kryo.register(HashSet.class); - kryo.register(ModuleType.class); - kryo.register(DateFormat.class); - kryo.register(ArrayList.class); - } - - public UDUPMessage deserialize(byte[] packetData) { - ByteArrayInputStream in = new ByteArrayInputStream(packetData); - Input kryoInput = new Input(in); - UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class); - return msg; - } - - public byte[] serialize(UDUPMessage msg) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Output kryoOut = new Output(out); - System.out.println("SERIALIZING " + msg.getContent()); - kryo.writeObject(kryoOut, msg); - kryoOut.flush(); - kryoOut.close(); - return out.toByteArray(); - } -} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index 3da380c..dae0420 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -1,6 +1,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import com.google.common.primitives.Bytes; +import pl.edu.mimuw.cloudatlas.ByteSerializer; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; import pl.edu.mimuw.cloudatlas.model.ValueUtils; @@ -15,7 +16,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class UDUPServer implements Runnable { private UDUP udp; - private UDUPSerializer serializer; + private ByteSerializer serializer; private DatagramSocket socket; private InetAddress address; private HashMap<String, ArrayList<byte[]>> partialPackets; @@ -27,7 +28,7 @@ public class UDUPServer implements Runnable { this.address = addr; this.bufSize = bufSize; this.partialPackets = new HashMap<>(); - this.serializer = new UDUPSerializer(); + this.serializer = new ByteSerializer(); this.running = new AtomicBoolean(false); } @@ -58,7 +59,7 @@ public class UDUPServer implements Runnable { UDUPMessage msg; if (packetNo == 1 && packet.getLength() < this.bufSize) { - msg = this.serializer.deserialize(packetData); + msg = (UDUPMessage) this.serializer.deserialize(packetData, UDUPMessage.class); System.out.println("UDP received message " + msg.getContent().getMessageId()); } else { System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo); @@ -137,7 +138,7 @@ public class UDUPServer implements Runnable { if (this.partialPackets.containsKey(transmissionID)) { try { byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData); - msg = this.serializer.deserialize(allPacketData); + msg = (UDUPMessage) this.serializer.deserialize(allPacketData, UDUPMessage.class); this.partialPackets.remove(transmissionID); System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId()); } catch (Error | Exception e) { diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java index 95f826a..e577c13 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/model/ValueQuery.java @@ -28,27 +28,27 @@ public class ValueQuery extends Value { * * @param query the code of the query */ - public ValueQuery(String query) throws Exception { - this.code = query; - if (!query.isEmpty()) { - Yylex lex = new Yylex(new ByteArrayInputStream(query.getBytes())); - this.query = (new parser(lex)).pProgram(); - } - this.signature = null; - this.timestamp = System.currentTimeMillis(); - this.installed = true; - } - - public ValueQuery(String query, byte[] querySignature) throws Exception { - this.code = query; - if (!query.isEmpty()) { - Yylex lex = new Yylex(new ByteArrayInputStream(query.getBytes())); - this.query = (new parser(lex)).pProgram(); - } - this.signature = querySignature; - this.timestamp = System.currentTimeMillis(); - this.installed = true; - } +// public ValueQuery(String query) throws Exception { +// this.code = query; +// if (!query.isEmpty()) { +// Yylex lex = new Yylex(new ByteArrayInputStream(query.getBytes())); +// this.query = (new parser(lex)).pProgram(); +// } +// this.signature = null; +// this.timestamp = System.currentTimeMillis(); +// this.installed = true; +// } + +// public ValueQuery(String query, byte[] querySignature) throws Exception { +// this.code = query; +// if (!query.isEmpty()) { +// Yylex lex = new Yylex(new ByteArrayInputStream(query.getBytes())); +// this.query = (new parser(lex)).pProgram(); +// } +// this.signature = querySignature; +// this.timestamp = System.currentTimeMillis(); +// this.installed = true; +// } public ValueQuery(QueryData queryData) throws Exception { this.code = queryData.getCode(); @@ -57,7 +57,7 @@ public class ValueQuery extends Value { this.query = (new parser(lex)).pProgram(); } this.signature = queryData.getSignature(); - this.timestamp = System.currentTimeMillis(); + this.timestamp = queryData.getTimestamp(); this.installed = queryData.isInstalled(); } |