diff options
author | Magdalena GrodziĆska <mag.grodzinska@gmail.com> | 2020-01-15 02:57:12 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-15 02:57:12 +0100 |
commit | 8678d8c922b439ff4a58e4b139c4085515a890f0 (patch) | |
tree | 8c1cffd6adadaf17b09880e8fb9487b0fe553f33 /src/main/java/pl/edu/mimuw/cloudatlas/agent | |
parent | 1986aaadaf6dd4316dce0186616a515e6721628c (diff) | |
parent | 5096d5fdc1a550a53511ca4478394f778cef10be (diff) |
Merge pull request #122 from m-chrzan/value_query_serialization_fix
Value query serialization fix
Diffstat (limited to 'src/main/java/pl/edu/mimuw/cloudatlas/agent')
4 files changed, 10 insertions, 243 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java index 382160e..c9c6cc7 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/Stanik.java @@ -56,7 +56,7 @@ public class Stanik extends Module { private void setDefaultQuery(String name, String query) { try { - ValueQuery queryValue = new ValueQuery(query); + ValueQuery queryValue = new ValueQuery(query, 0); queries.put(new Attribute(name), queryValue); } catch (Exception e) { System.out.println("ERROR: failed to compile default query"); diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java index 089cad2..97b68d1 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPClient.java @@ -1,17 +1,19 @@ package pl.edu.mimuw.cloudatlas.agent.modules; +import pl.edu.mimuw.cloudatlas.ByteSerializer; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; import pl.edu.mimuw.cloudatlas.model.ValueTime; import pl.edu.mimuw.cloudatlas.model.ValueUtils; import javax.xml.crypto.Data; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.*; import java.nio.ByteBuffer; public class UDUPClient { private UDUP udp; - private UDUPSerializer serializer; + private ByteSerializer serializer; private int serverPort; private DatagramSocket socket; private int bufsize; @@ -22,7 +24,7 @@ public class UDUPClient { this.serverPort = serverPort; this.socket = new DatagramSocket(); this.bufsize = bufferSize; - this.serializer = new UDUPSerializer(); + this.serializer = new ByteSerializer(); this.lastTransmission = 0; } diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java deleted file mode 100644 index cfaa37e..0000000 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPSerializer.java +++ /dev/null @@ -1,236 +0,0 @@ -package pl.edu.mimuw.cloudatlas.agent.modules; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import org.assertj.core.data.MapEntry; -import pl.edu.mimuw.cloudatlas.agent.messages.*; -import pl.edu.mimuw.cloudatlas.interpreter.query.Absyn.Program; -import pl.edu.mimuw.cloudatlas.model.*; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.net.Inet4Address; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.rmi.Remote; -import java.text.DateFormat; -import java.util.*; - -/** - * Serializes classes to and from byte arrays for UDP use - */ -public class UDUPSerializer { - private Kryo kryo; - - UDUPSerializer() { - kryo = new Kryo(); - kryo.setReferences(true); - kryo.setRegistrationRequired(true); - registerClasses(); - } - - private void registerClasses() { - - kryo.register(Inet4Address.class, new Serializer() { - - @Override - public void write(Kryo kryo, Output output, Object object) { - InetAddress ia = (InetAddress) object; - kryo.writeObject(output, ia.getAddress()); - } - - @Override - public Object read(Kryo kryo, Input input, Class type) { - try { - byte[] buf = kryo.readObject(input, byte[].class); - InetAddress addr = Inet4Address.getByAddress(buf); - return addr; - } catch (UnknownHostException e) { - System.out.println("Custom InetAddress read failed"); - e.printStackTrace(); - return null; - } - } - }); - - kryo.register(PathName.class, new Serializer() { - - @Override - public void write(Kryo kryo, Output output, Object object) { - PathName pn = (PathName) object; - kryo.writeObject(output, pn.getName()); - } - - @Override - public Object read(Kryo kryo, Input input, Class type) { - String addr = input.readString(); - return new PathName(addr); - } - }); - - kryo.register(ValueList.class, new Serializer() { - @Override - public void write(Kryo kryo, Output output, Object object) { - ValueList vl = (ValueList) object; - kryo.writeObject(output, ((TypeCollection) vl.getType()).getElementType()); - ArrayList<Value> al = new ArrayList<>(); - for (Value v : vl.getValue()) { - al.add(v); - } - kryo.writeObject(output, al); - } - - @Override - public Object read(Kryo kryo, Input input, Class type) { - TypePrimitive t = kryo.readObject(input, TypePrimitive.class); - ArrayList list = kryo.readObject(input, ArrayList.class); - return new ValueList(list, t); - } - }); - - kryo.register(ValueSet.class, new Serializer() { - @Override - public void write(Kryo kryo, Output output, Object object) { - ValueSet vs = (ValueSet) object; - kryo.writeObject(output, ((TypeCollection) vs.getType()).getElementType()); - HashSet<Value> hs = new HashSet(); - for (Value v : vs.getValue()) { - hs.add(v); - } - kryo.writeObject(output, hs); - } - - @Override - public Object read(Kryo kryo, Input input, Class type) { - TypePrimitive t = kryo.readObject(input, TypePrimitive.class); - HashSet set = kryo.readObject(input, HashSet.class); - return new ValueSet(set, t); - } - }); - - kryo.register(AttributesMap.class, new Serializer() { - @Override - public void write(Kryo kryo, Output output, Object object) { - AttributesMap attribMap = (AttributesMap) object; - HashMap<Attribute, Value> hashMap = new HashMap<>(); - - for (Map.Entry<Attribute, Value> e : attribMap) { - hashMap.put(e.getKey(), e.getValue()); - } - - kryo.writeObject(output, hashMap); - } - - @Override - public Object read(Kryo kryo, Input input, Class type) { - HashMap<Attribute, Value> hashMap = kryo.readObject(input, HashMap.class); - AttributesMap attribMap = new AttributesMap(); - for (Map.Entry<Attribute, Value> e : hashMap.entrySet()) { - attribMap.add(e.getKey(), e.getValue()); - } - return attribMap; - } - }); - - kryo.register(ValueQuery.class, new Serializer() { - @Override - public void write(Kryo kryo, Output output, Object object) { - ValueQuery vq = (ValueQuery) object; - kryo.writeObject(output, vq.getCode()); - } - - @Override - public Object read(Kryo kryo, Input input, Class type) { - String code = kryo.readObject(input, String.class); - ValueQuery vq = null; - try { - vq = new ValueQuery(code); - } catch (Exception e) { - System.out.println("Value query deserialization failed"); - e.printStackTrace(); - } - return vq; - } - }); - - // model - kryo.register(Value.class); - kryo.register(ValueBoolean.class); - kryo.register(ValueContact.class); - kryo.register(ValueDouble.class); - kryo.register(ValueDuration.class); - kryo.register(ValueDouble.class); - kryo.register(ValueInt.class); - kryo.register(ValueNull.class); - kryo.register(ValueSet.class); - kryo.register(ValueString.class); - kryo.register(ValueTime.class); - kryo.register(ValueUtils.class); - kryo.register(ZMI.class); - - kryo.register(Attribute.class); - kryo.register(AttributesMap.class); - kryo.register(AttributesUtil.class); - - kryo.register(Type.class); - kryo.register(Type.PrimaryType.class); - kryo.register(TypeCollection.class); - kryo.register(TypePrimitive.class); - - // messages in chronological order so it's easier to keep track - kryo.register(AgentMessage.class); - kryo.register(AttributesMessage.class); - kryo.register(GetStateMessage.class); - kryo.register(HejkaMessage.class); - kryo.register(NoCoTamMessage.class); - kryo.register(QueryMessage.class); - kryo.register(QurnikMessage.class); - kryo.register(RemikMessage.class); - kryo.register(RemoveZMIMessage.class); - kryo.register(RequestStateMessage.class); - kryo.register(ResponseMessage.class); - kryo.register(RunQueriesMessage.class); - kryo.register(SetAttributeMessage.class); - kryo.register(StanikMessage.Type.class); - kryo.register(StanikMessage.class); - kryo.register(TimerSchedulerMessage.class); - kryo.register(UDUPMessage.class); - kryo.register(UpdateAttributesMessage.class); - kryo.register(UpdateQueriesMessage.class); - kryo.register(GossipGirlMessage.class); - kryo.register(GossipGirlMessage.Type.class); - kryo.register(RemoteGossipGirlMessage.class); - - // modules - kryo.register(TimerScheduledTask.class); - kryo.register(RecursiveScheduledTask.class); - - // other - kryo.register(byte[].class); - kryo.register(LinkedHashMap.class); - kryo.register(HashMap.class); - kryo.register(HashSet.class); - kryo.register(ModuleType.class); - kryo.register(DateFormat.class); - kryo.register(ArrayList.class); - } - - public UDUPMessage deserialize(byte[] packetData) { - ByteArrayInputStream in = new ByteArrayInputStream(packetData); - Input kryoInput = new Input(in); - UDUPMessage msg = kryo.readObject(kryoInput, UDUPMessage.class); - return msg; - } - - public byte[] serialize(UDUPMessage msg) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Output kryoOut = new Output(out); - System.out.println("SERIALIZING " + msg.getContent()); - kryo.writeObject(kryoOut, msg); - kryoOut.flush(); - kryoOut.close(); - return out.toByteArray(); - } -} diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java index 3da380c..dae0420 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/agent/modules/UDUPServer.java @@ -1,6 +1,7 @@ package pl.edu.mimuw.cloudatlas.agent.modules; import com.google.common.primitives.Bytes; +import pl.edu.mimuw.cloudatlas.ByteSerializer; import pl.edu.mimuw.cloudatlas.agent.messages.UDUPMessage; import pl.edu.mimuw.cloudatlas.model.ValueUtils; @@ -15,7 +16,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class UDUPServer implements Runnable { private UDUP udp; - private UDUPSerializer serializer; + private ByteSerializer serializer; private DatagramSocket socket; private InetAddress address; private HashMap<String, ArrayList<byte[]>> partialPackets; @@ -27,7 +28,7 @@ public class UDUPServer implements Runnable { this.address = addr; this.bufSize = bufSize; this.partialPackets = new HashMap<>(); - this.serializer = new UDUPSerializer(); + this.serializer = new ByteSerializer(); this.running = new AtomicBoolean(false); } @@ -58,7 +59,7 @@ public class UDUPServer implements Runnable { UDUPMessage msg; if (packetNo == 1 && packet.getLength() < this.bufSize) { - msg = this.serializer.deserialize(packetData); + msg = (UDUPMessage) this.serializer.deserialize(packetData, UDUPMessage.class); System.out.println("UDP received message " + msg.getContent().getMessageId()); } else { System.out.println("UDP received partial message with transmission id " + transmissionID + " packet no " + packetNo); @@ -137,7 +138,7 @@ public class UDUPServer implements Runnable { if (this.partialPackets.containsKey(transmissionID)) { try { byte[] allPacketData = concatPacketData(transmissionID, newPacketNo, packetData); - msg = this.serializer.deserialize(allPacketData); + msg = (UDUPMessage) this.serializer.deserialize(allPacketData, UDUPMessage.class); this.partialPackets.remove(transmissionID); System.out.println("Kryo put together whole transmission for msg " + msg.getContent().getMessageId()); } catch (Error | Exception e) { |