diff --git a/build.gradle b/build.gradle index 3f61d54..7ea3eb1 100644 --- a/build.gradle +++ b/build.gradle @@ -1,12 +1,12 @@ group 'ng.abdlquadri' -version '1.1' +version '1.0-SNAPSHOT' apply plugin: 'java' -project.sourceCompatibility = 1.5 +project.sourceCompatibility = 1.7 compileTestJava { - project.sourceCompatibility = 1.8 + sourceCompatibility = 1.8 } repositories { @@ -23,6 +23,7 @@ dependencies { testCompile group: 'junit', name: 'junit', version: '4.11' - testCompile group: 'io.vertx', name: 'vertx-core', version: '3.3.3' - testCompile group: 'io.vertx', name: 'vertx-tcp-eventbus-bridge', version: '3.3.3' + testCompile group: 'io.vertx', name: 'vertx-core', version: '3.4.2' + testCompile group: 'io.vertx', name: 'vertx-tcp-eventbus-bridge', version: '3.4.2' } + diff --git a/src/main/java/ng/abdlquadri/eventbus/EventBus.java b/src/main/java/ng/abdlquadri/eventbus/EventBus.java index 8b49b71..a97440c 100644 --- a/src/main/java/ng/abdlquadri/eventbus/EventBus.java +++ b/src/main/java/ng/abdlquadri/eventbus/EventBus.java @@ -33,7 +33,8 @@ */ public class EventBus { private static Logger log = Logger.getLogger(EventBus.class.getName()); - + private static NioEventLoopGroup group; + private static Bootstrap bootstrap; public static Channel channel; public static final ConcurrentMap> handlers = new ConcurrentHashMap>(); public static final ConcurrentMap replyHandlers = new ConcurrentHashMap(); @@ -61,6 +62,7 @@ public void written(boolean isWritten) { }); } + public static void send(String address, String jsonMessage, Handler handler) { String replyAddress = UUID.randomUUID().toString(); Json json = Json.object().set(EventBusMessageAttributes.TYPE, "send") @@ -76,6 +78,24 @@ public void written(boolean isWritten) { } }); } + public static void send(String address, String jsonMessage,String reply_address,Boolean nouse,Handler handler) { + String replyAddress = reply_address; + Json json = Json.object().set(EventBusMessageAttributes.TYPE, "send") + .set(EventBusMessageAttributes.ADDRESS, address) + .set(EventBusMessageAttributes.REPLY_ADDRESS, replyAddress) + .set(EventBusMessageAttributes.HEADERS, Json.object()) + .set(EventBusMessageAttributes.BODY, Json.read(jsonMessage)); + addReplyHandler(replyAddress, handler); + writeToWire(channel, json.toString(), new WriteHandler() { + @Override + public void written(boolean isWritten) { + + } + }); + } + + + public static void send(String address, String jsonMessage, String jsonHeaders) { log.log(Level.INFO, "Making a SEND to EventBus Server"); @@ -120,6 +140,20 @@ public void written(boolean isWritten) { } }); } + public static void send(String address, String jsonMessage,String reply_address,Boolean nouse) { + String replyAddress = reply_address; + Json json = Json.object().set(EventBusMessageAttributes.TYPE, "send") + .set(EventBusMessageAttributes.ADDRESS, address) + .set(EventBusMessageAttributes.REPLY_ADDRESS, replyAddress) + .set(EventBusMessageAttributes.HEADERS, Json.object()) + .set(EventBusMessageAttributes.BODY, Json.read(jsonMessage)); + writeToWire(channel, json.toString(), new WriteHandler() { + @Override + public void written(boolean isWritten) { + + } + }); + } public static void publish(String address, String jsonMessage) { Json json = Json.object().set(EventBusMessageAttributes.TYPE, "publish") @@ -196,15 +230,15 @@ public void written(boolean isWritten) { public static void connect(String host, int port, final ConnectHandler connectHandler) { log.log(Level.INFO, "Connecting to EventBus Server"); - NioEventLoopGroup group = new NioEventLoopGroup(); - - Bootstrap bootstrap = new Bootstrap(); - - bootstrap.group(group) - .channel(NioSocketChannel.class) - .remoteAddress(new InetSocketAddress(host, port)) - .handler(new EventBusInitializer()); - +// final NioEventLoopGroup group = new NioEventLoopGroup(); + final NioEventLoopGroup group =groupCache(); +// final Bootstrap bootstrap = new Bootstrap(); +// +// bootstrap.group(group) +// .channel(NioSocketChannel.class) +// .remoteAddress(new InetSocketAddress(host, port)) +// .handler(new EventBusInitializer()); +final Bootstrap bootstrap =bootstrapCache(host,port); final ChannelFuture channelFuture = bootstrap.connect(); channelFuture.addListener(new ChannelFutureListener() { @Override @@ -217,6 +251,7 @@ public void operationComplete(ChannelFuture future) throws Exception { } else { connectHandler.onConnect(false); +// group.shutdownGracefully(); log.log(Level.SEVERE, "Failed Connecting to EventBus Server"); } @@ -226,9 +261,28 @@ public void operationComplete(ChannelFuture future) throws Exception { } + private static NioEventLoopGroup groupCache(){ + if (group==null){ + group = new NioEventLoopGroup(); + } + return group; + } + + private static Bootstrap bootstrapCache(String host, int port){ + if (bootstrap==null){ + bootstrap = new Bootstrap(); + + bootstrap.group(group) + .channel(NioSocketChannel.class) + .remoteAddress(new InetSocketAddress(host, port)) + .handler(new EventBusInitializer()); + } + return bootstrap; + } public static void close() { if (channel != null) { final ChannelFuture closeFuture = channel.close(); + closeFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess() && future.isDone()){ diff --git a/src/main/java/ng/abdlquadri/eventbus/EventBusFrameHandler.java b/src/main/java/ng/abdlquadri/eventbus/EventBusFrameHandler.java index 3b37853..32019d1 100644 --- a/src/main/java/ng/abdlquadri/eventbus/EventBusFrameHandler.java +++ b/src/main/java/ng/abdlquadri/eventbus/EventBusFrameHandler.java @@ -1,26 +1,21 @@ package ng.abdlquadri.eventbus; -import static ng.abdlquadri.eventbus.EventBus.channel; -import static ng.abdlquadri.eventbus.EventBus.globalConnectHandler; -import static ng.abdlquadri.eventbus.EventBus.handlers; -import static ng.abdlquadri.eventbus.EventBus.replyHandlers; -import static ng.abdlquadri.eventbus.EventBusUtil.addReplySender; -import static ng.abdlquadri.eventbus.EventBusUtil.sendPing; - -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; - import mjson.Json; - import ng.abdlquadri.eventbus.handlers.Handler; import ng.abdlquadri.eventbus.senders.ReplySender; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static ng.abdlquadri.eventbus.EventBus.*; +import static ng.abdlquadri.eventbus.EventBusUtil.addReplySender; +import static ng.abdlquadri.eventbus.EventBusUtil.sendPing; + /** * Created by abdlquadri on 12/9/15. */ @@ -110,12 +105,10 @@ public void send(String replyMessage, String headers, Handler handler) { private String messageBufferToString(ByteBuf inMsg) { int messageLength = inMsg.readInt(); - StringBuilder message = new StringBuilder(); - for (int i = 0; i < messageLength; i++) { - char c = (char) inMsg.readByte(); - message.append(c); - } - return message.toString(); + + byte[] req = new byte[inMsg.readableBytes()]; + inMsg.readBytes(req); + return new String(req); } @Override diff --git a/src/main/java/ng/abdlquadri/eventbus/EventBusUtil.java b/src/main/java/ng/abdlquadri/eventbus/EventBusUtil.java index 3cf3033..b9722ee 100644 --- a/src/main/java/ng/abdlquadri/eventbus/EventBusUtil.java +++ b/src/main/java/ng/abdlquadri/eventbus/EventBusUtil.java @@ -1,22 +1,21 @@ package ng.abdlquadri.eventbus; -import java.util.ArrayList; -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; - import mjson.Json; - import ng.abdlquadri.eventbus.handlers.Handler; import ng.abdlquadri.eventbus.handlers.WriteHandler; import ng.abdlquadri.eventbus.senders.ReplySender; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + /** * Created by abdlquadri on 12/19/15. */ @@ -40,10 +39,15 @@ public void written(boolean isWritten) { } public static void writeToWire(final Channel channel, String jsonObject, final WriteHandler writeHandler) { - int length = jsonObject.length(); - ByteBuf buffer = Unpooled.buffer() - .writeInt(length) - .writeBytes(jsonObject.getBytes()); + int length = jsonObject.toString().getBytes().length; + ByteBuf buffer = null; + try { + buffer = Unpooled.buffer() + .writeInt(length) + .writeBytes(jsonObject.getBytes("utf-8")); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } log.log(Level.INFO, "Writing to wire."); if (channel != null) {