From a92d2be76cf2941510bc44d82b8353d56f13bc08 Mon Sep 17 00:00:00 2001 From: QingMings <1821063757@qq.com> Date: Mon, 21 Aug 2017 10:26:11 +0800 Subject: [PATCH 1/6] update vertx version to 3.4.2 --- build.gradle | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/build.gradle b/build.gradle index 3f61d54..e9c6a5b 100644 --- a/build.gradle +++ b/build.gradle @@ -1,13 +1,13 @@ group 'ng.abdlquadri' -version '1.1' +version '1.0-SNAPSHOT' apply plugin: 'java' -project.sourceCompatibility = 1.5 +project.sourceCompatibility = 1.7 -compileTestJava { - project.sourceCompatibility = 1.8 -} +//compileTestJava { +// sourceCompatibility = 1.8 +//} repositories { mavenCentral() @@ -23,6 +23,7 @@ dependencies { testCompile group: 'junit', name: 'junit', version: '4.11' - testCompile group: 'io.vertx', name: 'vertx-core', version: '3.3.3' - testCompile group: 'io.vertx', name: 'vertx-tcp-eventbus-bridge', version: '3.3.3' + testCompile group: 'io.vertx', name: 'vertx-core', version: '3.4.2' + testCompile group: 'io.vertx', name: 'vertx-tcp-eventbus-bridge', version: '3.4.2' } + From 3c0875b8bf42825b218b2f4b95ef52823cbaeaee Mon Sep 17 00:00:00 2001 From: QingMings <1821063757@qq.com> Date: Mon, 21 Aug 2017 10:26:48 +0800 Subject: [PATCH 2/6] add 2 method with replyAddress --- .../java/ng/abdlquadri/eventbus/EventBus.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/main/java/ng/abdlquadri/eventbus/EventBus.java b/src/main/java/ng/abdlquadri/eventbus/EventBus.java index 8b49b71..9b539af 100644 --- a/src/main/java/ng/abdlquadri/eventbus/EventBus.java +++ b/src/main/java/ng/abdlquadri/eventbus/EventBus.java @@ -61,6 +61,7 @@ public void written(boolean isWritten) { }); } + public static void send(String address, String jsonMessage, Handler handler) { String replyAddress = UUID.randomUUID().toString(); Json json = Json.object().set(EventBusMessageAttributes.TYPE, "send") @@ -76,6 +77,24 @@ public void written(boolean isWritten) { } }); } + public static void send(String address, String jsonMessage,String reply_address,Boolean nouse,Handler handler) { + String replyAddress = reply_address; + Json json = Json.object().set(EventBusMessageAttributes.TYPE, "send") + .set(EventBusMessageAttributes.ADDRESS, address) + .set(EventBusMessageAttributes.REPLY_ADDRESS, replyAddress) + .set(EventBusMessageAttributes.HEADERS, Json.object()) + .set(EventBusMessageAttributes.BODY, Json.read(jsonMessage)); + addReplyHandler(replyAddress, handler); + writeToWire(channel, json.toString(), new WriteHandler() { + @Override + public void written(boolean isWritten) { + + } + }); + } + + + public static void send(String address, String jsonMessage, String jsonHeaders) { log.log(Level.INFO, "Making a SEND to EventBus Server"); @@ -120,6 +139,20 @@ public void written(boolean isWritten) { } }); } + public static void send(String address, String jsonMessage,String reply_address,Boolean nouse) { + String replyAddress = reply_address; + Json json = Json.object().set(EventBusMessageAttributes.TYPE, "send") + .set(EventBusMessageAttributes.ADDRESS, address) + .set(EventBusMessageAttributes.REPLY_ADDRESS, replyAddress) + .set(EventBusMessageAttributes.HEADERS, Json.object()) + .set(EventBusMessageAttributes.BODY, Json.read(jsonMessage)); + writeToWire(channel, json.toString(), new WriteHandler() { + @Override + public void written(boolean isWritten) { + + } + }); + } public static void publish(String address, String jsonMessage) { Json json = Json.object().set(EventBusMessageAttributes.TYPE, "publish") From fee94e412a38d072933ce52a3e6abde83aec16e6 Mon Sep 17 00:00:00 2001 From: QingMings <1821063757@qq.com> Date: Mon, 21 Aug 2017 10:41:33 +0800 Subject: [PATCH 3/6] support chinese ,set bytes chartset name "utf-8", jsonobject tostring first and then getBytes --- .../ng/abdlquadri/eventbus/EventBusUtil.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/main/java/ng/abdlquadri/eventbus/EventBusUtil.java b/src/main/java/ng/abdlquadri/eventbus/EventBusUtil.java index 3cf3033..b9722ee 100644 --- a/src/main/java/ng/abdlquadri/eventbus/EventBusUtil.java +++ b/src/main/java/ng/abdlquadri/eventbus/EventBusUtil.java @@ -1,22 +1,21 @@ package ng.abdlquadri.eventbus; -import java.util.ArrayList; -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; - import mjson.Json; - import ng.abdlquadri.eventbus.handlers.Handler; import ng.abdlquadri.eventbus.handlers.WriteHandler; import ng.abdlquadri.eventbus.senders.ReplySender; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + /** * Created by abdlquadri on 12/19/15. */ @@ -40,10 +39,15 @@ public void written(boolean isWritten) { } public static void writeToWire(final Channel channel, String jsonObject, final WriteHandler writeHandler) { - int length = jsonObject.length(); - ByteBuf buffer = Unpooled.buffer() - .writeInt(length) - .writeBytes(jsonObject.getBytes()); + int length = jsonObject.toString().getBytes().length; + ByteBuf buffer = null; + try { + buffer = Unpooled.buffer() + .writeInt(length) + .writeBytes(jsonObject.getBytes("utf-8")); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } log.log(Level.INFO, "Writing to wire."); if (channel != null) { From 74484046824f042cd950059911c8fcd760ccf547 Mon Sep 17 00:00:00 2001 From: QingMings <1821063757@qq.com> Date: Mon, 21 Aug 2017 10:43:34 +0800 Subject: [PATCH 4/6] support chinese ,mprovementi method [messageBufferToString] --- .../eventbus/EventBusFrameHandler.java | 33 ++++++++----------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/src/main/java/ng/abdlquadri/eventbus/EventBusFrameHandler.java b/src/main/java/ng/abdlquadri/eventbus/EventBusFrameHandler.java index 3b37853..32019d1 100644 --- a/src/main/java/ng/abdlquadri/eventbus/EventBusFrameHandler.java +++ b/src/main/java/ng/abdlquadri/eventbus/EventBusFrameHandler.java @@ -1,26 +1,21 @@ package ng.abdlquadri.eventbus; -import static ng.abdlquadri.eventbus.EventBus.channel; -import static ng.abdlquadri.eventbus.EventBus.globalConnectHandler; -import static ng.abdlquadri.eventbus.EventBus.handlers; -import static ng.abdlquadri.eventbus.EventBus.replyHandlers; -import static ng.abdlquadri.eventbus.EventBusUtil.addReplySender; -import static ng.abdlquadri.eventbus.EventBusUtil.sendPing; - -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; - import mjson.Json; - import ng.abdlquadri.eventbus.handlers.Handler; import ng.abdlquadri.eventbus.senders.ReplySender; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static ng.abdlquadri.eventbus.EventBus.*; +import static ng.abdlquadri.eventbus.EventBusUtil.addReplySender; +import static ng.abdlquadri.eventbus.EventBusUtil.sendPing; + /** * Created by abdlquadri on 12/9/15. */ @@ -110,12 +105,10 @@ public void send(String replyMessage, String headers, Handler handler) { private String messageBufferToString(ByteBuf inMsg) { int messageLength = inMsg.readInt(); - StringBuilder message = new StringBuilder(); - for (int i = 0; i < messageLength; i++) { - char c = (char) inMsg.readByte(); - message.append(c); - } - return message.toString(); + + byte[] req = new byte[inMsg.readableBytes()]; + inMsg.readBytes(req); + return new String(req); } @Override From 44461c5d538bdb7603cd67ed264fa8c9734e010f Mon Sep 17 00:00:00 2001 From: QingMings <1821063757@qq.com> Date: Mon, 21 Aug 2017 10:43:56 +0800 Subject: [PATCH 5/6] support chinese ,mprovementi method [messageBufferToString] --- build.gradle | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/build.gradle b/build.gradle index e9c6a5b..7ea3eb1 100644 --- a/build.gradle +++ b/build.gradle @@ -5,9 +5,9 @@ apply plugin: 'java' project.sourceCompatibility = 1.7 -//compileTestJava { -// sourceCompatibility = 1.8 -//} +compileTestJava { + sourceCompatibility = 1.8 +} repositories { mavenCentral() From 76823865dea9baf996fe061afc9b4882dd6102cb Mon Sep 17 00:00:00 2001 From: QingMings <1821063757@qq.com> Date: Tue, 29 Aug 2017 17:03:48 +0800 Subject: [PATCH 6/6] cache NioEventLoopGroup ,cache bootstrap --- .../java/ng/abdlquadri/eventbus/EventBus.java | 41 ++++++++++++++----- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/src/main/java/ng/abdlquadri/eventbus/EventBus.java b/src/main/java/ng/abdlquadri/eventbus/EventBus.java index 9b539af..a97440c 100644 --- a/src/main/java/ng/abdlquadri/eventbus/EventBus.java +++ b/src/main/java/ng/abdlquadri/eventbus/EventBus.java @@ -33,7 +33,8 @@ */ public class EventBus { private static Logger log = Logger.getLogger(EventBus.class.getName()); - + private static NioEventLoopGroup group; + private static Bootstrap bootstrap; public static Channel channel; public static final ConcurrentMap> handlers = new ConcurrentHashMap>(); public static final ConcurrentMap replyHandlers = new ConcurrentHashMap(); @@ -229,15 +230,15 @@ public void written(boolean isWritten) { public static void connect(String host, int port, final ConnectHandler connectHandler) { log.log(Level.INFO, "Connecting to EventBus Server"); - NioEventLoopGroup group = new NioEventLoopGroup(); - - Bootstrap bootstrap = new Bootstrap(); - - bootstrap.group(group) - .channel(NioSocketChannel.class) - .remoteAddress(new InetSocketAddress(host, port)) - .handler(new EventBusInitializer()); - +// final NioEventLoopGroup group = new NioEventLoopGroup(); + final NioEventLoopGroup group =groupCache(); +// final Bootstrap bootstrap = new Bootstrap(); +// +// bootstrap.group(group) +// .channel(NioSocketChannel.class) +// .remoteAddress(new InetSocketAddress(host, port)) +// .handler(new EventBusInitializer()); +final Bootstrap bootstrap =bootstrapCache(host,port); final ChannelFuture channelFuture = bootstrap.connect(); channelFuture.addListener(new ChannelFutureListener() { @Override @@ -250,6 +251,7 @@ public void operationComplete(ChannelFuture future) throws Exception { } else { connectHandler.onConnect(false); +// group.shutdownGracefully(); log.log(Level.SEVERE, "Failed Connecting to EventBus Server"); } @@ -259,9 +261,28 @@ public void operationComplete(ChannelFuture future) throws Exception { } + private static NioEventLoopGroup groupCache(){ + if (group==null){ + group = new NioEventLoopGroup(); + } + return group; + } + + private static Bootstrap bootstrapCache(String host, int port){ + if (bootstrap==null){ + bootstrap = new Bootstrap(); + + bootstrap.group(group) + .channel(NioSocketChannel.class) + .remoteAddress(new InetSocketAddress(host, port)) + .handler(new EventBusInitializer()); + } + return bootstrap; + } public static void close() { if (channel != null) { final ChannelFuture closeFuture = channel.close(); + closeFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess() && future.isDone()){