From ed3f1277cc1015232a723a90fd7fb07c492c93da Mon Sep 17 00:00:00 2001 From: DataSecs Date: Sun, 15 Apr 2018 05:13:58 +0200 Subject: [PATCH] Fixed a little bug that wouldn't allow one to create a client or server without creating a session listener. Fixed a problem with the packet class. The byte buffer was being passed incorrectly, which would cause encoding and decoding problems. Also added an option for the server to distribute packets via a selection of algorithms. Algorithms are going to come by time. In the next commit the examples are going to be adjusted to the changes. Bumped to version 1.6.0. --- README.md | 2 +- all/pom.xml | 6 +- client/pom.xml | 4 +- example/pom.xml | 4 +- pom.xml | 2 +- server/pom.xml | 4 +- .../de/datasecs/hydra/server/HydraServer.java | 19 +++ shared/pom.xml | 2 +- .../shared/distribution/Distribution.java | 6 + .../hydra/shared/handler/HydraSession.java | 9 +- .../initializer/HydraChannelInitializer.java | 6 +- .../hydra/shared/protocol/HydraProtocol.java | 8 ++ .../hydra/shared/protocol/packets/Packet.java | 117 ++++++------------ .../protocol/packets/StandardPacket.java | 14 ++- .../packets/serialization/PacketDecoder.java | 16 +-- .../packets/serialization/PacketEncoder.java | 9 +- 16 files changed, 115 insertions(+), 113 deletions(-) create mode 100644 shared/src/main/java/de/datasecs/hydra/shared/distribution/Distribution.java diff --git a/README.md b/README.md index f1f5401..6fd385d 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ like a [simple chat application](https://github.com/DataSecs/Hydra/wiki/Building de.datasecs hydra-all - 1.5.5 + 1.6.0 ``` diff --git a/all/pom.xml b/all/pom.xml index c950d63..4cb0425 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -5,7 +5,7 @@ hydra de.datasecs - 1.5.5 + 1.6.0 4.0.0 @@ -16,14 +16,14 @@ de.datasecs hydra-client - 1.5.5 + 1.6.0 de.datasecs hydra-server - 1.5.5 + 1.6.0 diff --git a/client/pom.xml b/client/pom.xml index c5eff82..2acb3e3 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -5,7 +5,7 @@ hydra de.datasecs - 1.5.5 + 1.6.0 4.0.0 @@ -16,7 +16,7 @@ de.datasecs hydra-shared - 1.5.5 + 1.6.0 diff --git a/example/pom.xml b/example/pom.xml index be70989..2c829da 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -5,7 +5,7 @@ hydra de.datasecs - 1.5.5 + 1.6.0 4.0.0 @@ -15,7 +15,7 @@ de.datasecs hydra-all - 1.5.5 + 1.6.0 diff --git a/pom.xml b/pom.xml index 27470e0..6f0abc6 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ de.datasecs hydra pom - 1.5.5 + 1.6.0 all diff --git a/server/pom.xml b/server/pom.xml index d09e7d3..9bc5ba2 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -5,7 +5,7 @@ hydra de.datasecs - 1.5.5 + 1.6.0 4.0.0 @@ -16,7 +16,7 @@ de.datasecs hydra-shared - 1.5.5 + 1.6.0 diff --git a/server/src/main/java/de/datasecs/hydra/server/HydraServer.java b/server/src/main/java/de/datasecs/hydra/server/HydraServer.java index cbd338a..fe49e78 100644 --- a/server/src/main/java/de/datasecs/hydra/server/HydraServer.java +++ b/server/src/main/java/de/datasecs/hydra/server/HydraServer.java @@ -1,11 +1,14 @@ package de.datasecs.hydra.server; +import de.datasecs.hydra.shared.distribution.Distribution; import de.datasecs.hydra.shared.handler.HydraSession; import de.datasecs.hydra.shared.handler.Session; import de.datasecs.hydra.shared.protocol.HydraProtocol; +import de.datasecs.hydra.shared.protocol.packets.Packet; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.internal.ConcurrentSet; import java.net.SocketAddress; import java.util.Arrays; @@ -105,4 +108,20 @@ public SocketAddress getLocalAdress() { public Set getSessions() { return protocol.getSessions(); } + + /** + * Sends a packet to all clients that are connected to the server with the specified distribution type. + * + * @param packet the packet that is supposed to be send to all connected clients. + * @param distributionType the type of distribution that is supposed to be used. + */ + public void send(Packet packet, Distribution distributionType) { + switch (distributionType) { + case SIMPLE_BROADCAST: + ConcurrentSet sessions = new ConcurrentSet<>(); + sessions.addAll(protocol.getSessions()); + sessions.forEach(session -> session.send(packet)); + break; + } + } } \ No newline at end of file diff --git a/shared/pom.xml b/shared/pom.xml index 87fce04..6caddbb 100644 --- a/shared/pom.xml +++ b/shared/pom.xml @@ -5,7 +5,7 @@ hydra de.datasecs - 1.5.5 + 1.6.0 4.0.0 diff --git a/shared/src/main/java/de/datasecs/hydra/shared/distribution/Distribution.java b/shared/src/main/java/de/datasecs/hydra/shared/distribution/Distribution.java new file mode 100644 index 0000000..7e91a15 --- /dev/null +++ b/shared/src/main/java/de/datasecs/hydra/shared/distribution/Distribution.java @@ -0,0 +1,6 @@ +package de.datasecs.hydra.shared.distribution; + +//TODO: Add explanation +public enum Distribution { + SIMPLE_BROADCAST +} diff --git a/shared/src/main/java/de/datasecs/hydra/shared/handler/HydraSession.java b/shared/src/main/java/de/datasecs/hydra/shared/handler/HydraSession.java index a29eef7..0634662 100644 --- a/shared/src/main/java/de/datasecs/hydra/shared/handler/HydraSession.java +++ b/shared/src/main/java/de/datasecs/hydra/shared/handler/HydraSession.java @@ -26,12 +26,17 @@ public HydraSession(Channel channel, HydraProtocol protocol) { @Override protected void channelRead0(ChannelHandlerContext context, Packet packet) { - protocol.callPacketListener(packet, this); + if (protocol.getPacketListener() != null) { + protocol.callPacketListener(packet, this); + } } @Override public void handlerRemoved(ChannelHandlerContext context) { - protocol.callSessionListener(false, this); + if (protocol.getSessionListener() != null) { + protocol.callSessionListener(false, this); + } + protocol.removeSession(this); } diff --git a/shared/src/main/java/de/datasecs/hydra/shared/initializer/HydraChannelInitializer.java b/shared/src/main/java/de/datasecs/hydra/shared/initializer/HydraChannelInitializer.java index 54b22f1..090ee76 100644 --- a/shared/src/main/java/de/datasecs/hydra/shared/initializer/HydraChannelInitializer.java +++ b/shared/src/main/java/de/datasecs/hydra/shared/initializer/HydraChannelInitializer.java @@ -46,7 +46,9 @@ protected void initChannel(SocketChannel channel) { protocol.setClientSession(session); } - // Inform SessionListener about new session - protocol.callSessionListener(true, session); + if (protocol.getSessionListener() != null) { + // Inform SessionListener about new session + protocol.callSessionListener(true, session); + } } } \ No newline at end of file diff --git a/shared/src/main/java/de/datasecs/hydra/shared/protocol/HydraProtocol.java b/shared/src/main/java/de/datasecs/hydra/shared/protocol/HydraProtocol.java index 7802692..cd09153 100644 --- a/shared/src/main/java/de/datasecs/hydra/shared/protocol/HydraProtocol.java +++ b/shared/src/main/java/de/datasecs/hydra/shared/protocol/HydraProtocol.java @@ -150,4 +150,12 @@ public void removeSession(Session session) { public Set getSessions() { return sessions; } + + public HydraSessionListener getSessionListener() { + return sessionListener; + } + + public HydraPacketListener getPacketListener() { + return packetListener; + } } \ No newline at end of file diff --git a/shared/src/main/java/de/datasecs/hydra/shared/protocol/packets/Packet.java b/shared/src/main/java/de/datasecs/hydra/shared/protocol/packets/Packet.java index 343df71..45ebb20 100644 --- a/shared/src/main/java/de/datasecs/hydra/shared/protocol/packets/Packet.java +++ b/shared/src/main/java/de/datasecs/hydra/shared/protocol/packets/Packet.java @@ -13,59 +13,21 @@ */ public abstract class Packet { - private ByteBuf byteBuf; - private Object objectToSerialize; - public void setByteBuf(ByteBuf byteBuf) { - this.byteBuf = byteBuf; - } - - public abstract void read(); - - public abstract void write(); - - protected void writeByte(byte byt) { - byteBuf.writeByte(byt); - } + public abstract void read(ByteBuf byteBuf); - protected byte readByte() { - return byteBuf.readByte(); - } + public abstract void write(ByteBuf byteBuf); - protected void writeInt(int integer) { + protected void writeInt(ByteBuf byteBuf, int integer) { byteBuf.writeInt(integer); } - protected int readInt() { + protected int readInt(ByteBuf byteBuf) { return byteBuf.readInt(); } - protected void writeFloat(float floatNumber) { - byteBuf.writeFloat(floatNumber); - } - - protected float readFloat() { - return byteBuf.readFloat(); - } - - protected void writeDouble(double doubleNumber) { - byteBuf.writeDouble(doubleNumber); - } - - protected double readDouble() { - return byteBuf.readDouble(); - } - - protected void writeLong(long longNumber) { - byteBuf.writeLong(longNumber); - } - - protected long readLong() { - return byteBuf.readLong(); - } - - protected void writeString(String string) { + protected void writeString(ByteBuf byteBuf, String string) { byteBuf.writeInt(string.length()); try { @@ -75,7 +37,7 @@ protected void writeString(String string) { } } - protected String readString() { + protected String readString(ByteBuf byteBuf) { byte[] bytes = new byte[byteBuf.readInt()]; byteBuf.readBytes(bytes); @@ -88,9 +50,9 @@ protected String readString() { return null; } - protected void writeObject(T object) { + protected void writeObject(ByteBuf byteBuf, Object object) { if (object == null) { - throw new IllegalArgumentException("object can't be null"); + throw new IllegalArgumentException("object cannot be null"); } try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) { @@ -103,33 +65,34 @@ protected void writeObject(T object) { } } - protected Object readObject() { + protected Object readObject(ByteBuf byteBuf) { + Object object = null; + int length = byteBuf.readInt(); if (length > byteBuf.readableBytes()) { - throw new IllegalStateException("length can't be larger than the readable bytes"); + throw new IllegalStateException("length cannot be larger than the readable bytes"); } byte[] bytes = new byte[length]; byteBuf.readBytes(bytes); - try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); - ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) { - return objectInputStream.readObject(); + try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) { + object = objectInputStream.readObject(); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } - return null; + return object; } - protected void writeCustomObject(T customObject, String pathOfCustomClassAtReceiver) { - writeCustomObject(customObject, "", pathOfCustomClassAtReceiver); + protected void writeCustomObject(ByteBuf byteBuf, T customObject, String pathOfCustomClassAtReceiver) { + writeCustomObject(byteBuf, customObject, "", pathOfCustomClassAtReceiver); } - private void writeCustomObject(T customObject, String prefix, String pathOfCustomClassAtReceiver) { + private void writeCustomObject(ByteBuf byteBuf, T customObject, String prefix, String pathOfCustomClassAtReceiver) { // Send path for rebuild of custom class if (!prefix.startsWith("#")) { - writeString(String.format("%s.%s", pathOfCustomClassAtReceiver, customObject.getClass().getSimpleName())); + writeString(byteBuf, String.format("%s.%s", pathOfCustomClassAtReceiver, customObject.getClass().getSimpleName())); } boolean isObject = true; @@ -143,7 +106,7 @@ private void writeCustomObject(T customObject, String prefix, String pathOfC } if (!(objectToSerialize instanceof Serializable)) { - writeCustomObject(objectToSerialize, String.format("%s#", prefix), pathOfCustomClassAtReceiver); + writeCustomObject(byteBuf, objectToSerialize, String.format("%s#", prefix), pathOfCustomClassAtReceiver); } else { String fieldName = field.getName(); if (prefix.startsWith("#")) { @@ -155,20 +118,20 @@ private void writeCustomObject(T customObject, String prefix, String pathOfC } } - writeString(fieldName); - writeObject(objectToSerialize); + writeString(byteBuf, fieldName); + writeObject(byteBuf, objectToSerialize); } } } if (!prefix.startsWith("#")) { // Signalizes that transmission is over - writeString("~"); + writeString(byteBuf, "~"); } } - protected T readCustomObject() { - String pathOfCustomClass = readString(); + protected T readCustomObject(ByteBuf byteBuf) { + String pathOfCustomClass = readString(byteBuf); T customObject = null; try { customObject = (T) Class.forName(pathOfCustomClass).newInstance(); @@ -181,19 +144,19 @@ protected T readCustomObject() { Map subFields = new ConcurrentHashMap<>(); String input; - while (!(input = readString()).startsWith("~")) { + while (!(input = readString(byteBuf)).startsWith("~")) { if (input.contains(";")) { subFields.clear(); String[] fieldNames = input.replace("*", pathOfCustomClass).split(";"); - subFields.put(fieldNames[0].replaceAll("#", ""), String.valueOf(readObject())); + subFields.put(fieldNames[0].replaceAll("#", ""), String.valueOf(readObject(byteBuf))); fields.put(fieldNames[1], subFields); } else { if (input.startsWith("#")) { - subFields.put(input.replaceAll("#", ""), readObject()); + subFields.put(input.replaceAll("#", ""), readObject(byteBuf)); } else { - fields.put(input, readObject()); + fields.put(input, readObject(byteBuf)); } } } @@ -235,11 +198,11 @@ private T readCustomObject(T customObject, Map fields) { return customObject; } - protected void writeArray(Object[] array) { - writeObject(array); + protected void writeArray(ByteBuf byteBuf, Object[] array) { + writeObject(byteBuf, array); } - protected T[] readArray() { + protected T[] readArray(ByteBuf byteBuf) { int length = byteBuf.readInt(); if (length > byteBuf.readableBytes()) { throw new IllegalStateException("length can't be larger than the readable bytes"); @@ -258,17 +221,17 @@ protected T[] readArray() { return null; } - protected void writeCustomClassArray(T[] array, String pathOfCustomClassAtReceiver) { - writeInt(array.length); - writeString(String.format("%s.%s", pathOfCustomClassAtReceiver, array.getClass().getSimpleName().replace("[]", ""))); + protected void writeCustomClassArray(ByteBuf byteBuf, T[] array, String pathOfCustomClassAtReceiver) { + writeInt(byteBuf, array.length); + writeString(byteBuf, String.format("%s.%s", pathOfCustomClassAtReceiver, array.getClass().getSimpleName().replace("[]", ""))); for (T t : array) { - writeCustomObject(t, "", pathOfCustomClassAtReceiver); + writeCustomObject(byteBuf, t, "", pathOfCustomClassAtReceiver); } } - protected T[] readCustomClassArray() { - int length = readInt(); - String path = readString(); + protected T[] readCustomClassArray(ByteBuf byteBuf) { + int length = readInt(byteBuf); + String path = readString(byteBuf); T[] array = null; try { array = (T[]) Array.newInstance(Class.forName(path), length); @@ -277,7 +240,7 @@ protected T[] readCustomClassArray() { } for (int i = 0; i < length; i++) { - array[i] = readCustomObject(); + array[i] = readCustomObject(byteBuf); } return array; diff --git a/shared/src/main/java/de/datasecs/hydra/shared/protocol/packets/StandardPacket.java b/shared/src/main/java/de/datasecs/hydra/shared/protocol/packets/StandardPacket.java index 7ecef2d..b5dc570 100644 --- a/shared/src/main/java/de/datasecs/hydra/shared/protocol/packets/StandardPacket.java +++ b/shared/src/main/java/de/datasecs/hydra/shared/protocol/packets/StandardPacket.java @@ -1,5 +1,7 @@ package de.datasecs.hydra.shared.protocol.packets; +import io.netty.buffer.ByteBuf; + /** * Created with love by DataSecs on 09.04.18 */ @@ -15,17 +17,17 @@ public StandardPacket(Object object) { } @Override - public void read() { - object = readObject(); + public String toString() { + return object.toString(); } @Override - public void write() { - writeObject(object); + public void read(ByteBuf byteBuf) { + readObject(byteBuf); } @Override - public String toString() { - return object.toString(); + public void write(ByteBuf byteBuf) { + writeObject(byteBuf, object); } } \ No newline at end of file diff --git a/shared/src/main/java/de/datasecs/hydra/shared/protocol/packets/serialization/PacketDecoder.java b/shared/src/main/java/de/datasecs/hydra/shared/protocol/packets/serialization/PacketDecoder.java index a5e71e2..8dd2d9a 100644 --- a/shared/src/main/java/de/datasecs/hydra/shared/protocol/packets/serialization/PacketDecoder.java +++ b/shared/src/main/java/de/datasecs/hydra/shared/protocol/packets/serialization/PacketDecoder.java @@ -20,13 +20,13 @@ public PacketDecoder(HydraProtocol protocol) { } @Override - protected void decode(ChannelHandlerContext context, ByteBuf in, List out) throws Exception { - in.readInt(); - - Packet packet = protocol.createPacket(in.readByte()); - packet.setByteBuf(in); - packet.read(); - - out.add(packet); + protected void decode(ChannelHandlerContext context, ByteBuf in, List out) { + int length = in.readInt(); + + if(length > 0) { + Packet packet = protocol.createPacket(in.readByte()); + packet.read(in); + out.add(packet); + } } } \ No newline at end of file diff --git a/shared/src/main/java/de/datasecs/hydra/shared/protocol/packets/serialization/PacketEncoder.java b/shared/src/main/java/de/datasecs/hydra/shared/protocol/packets/serialization/PacketEncoder.java index b378856..fede128 100644 --- a/shared/src/main/java/de/datasecs/hydra/shared/protocol/packets/serialization/PacketEncoder.java +++ b/shared/src/main/java/de/datasecs/hydra/shared/protocol/packets/serialization/PacketEncoder.java @@ -20,13 +20,10 @@ public PacketEncoder(HydraProtocol protocol) { } @Override - protected void encode(ChannelHandlerContext context, Packet packet, List out) throws Exception { + protected void encode(ChannelHandlerContext context, Packet packet, List out) { ByteBuf byteBuf = context.alloc().buffer(); byteBuf.writeByte(protocol.getPacketId(packet)); - - packet.setByteBuf(byteBuf); - packet.write(); - + packet.write(byteBuf); out.add(byteBuf); } -} +} \ No newline at end of file