Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
group 'ng.abdlquadri'
version '1.1'
version '1.0-SNAPSHOT'

apply plugin: 'java'

project.sourceCompatibility = 1.5
project.sourceCompatibility = 1.7

compileTestJava {
project.sourceCompatibility = 1.8
sourceCompatibility = 1.8
}

repositories {
Expand All @@ -23,6 +23,7 @@ dependencies {


testCompile group: 'junit', name: 'junit', version: '4.11'
testCompile group: 'io.vertx', name: 'vertx-core', version: '3.3.3'
testCompile group: 'io.vertx', name: 'vertx-tcp-eventbus-bridge', version: '3.3.3'
testCompile group: 'io.vertx', name: 'vertx-core', version: '3.4.2'
testCompile group: 'io.vertx', name: 'vertx-tcp-eventbus-bridge', version: '3.4.2'
}

74 changes: 64 additions & 10 deletions src/main/java/ng/abdlquadri/eventbus/EventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
*/
public class EventBus {
private static Logger log = Logger.getLogger(EventBus.class.getName());

private static NioEventLoopGroup group;
private static Bootstrap bootstrap;
public static Channel channel;
public static final ConcurrentMap<String, List<Handler>> handlers = new ConcurrentHashMap<String, List<Handler>>();
public static final ConcurrentMap<String, Handler> replyHandlers = new ConcurrentHashMap<String, Handler>();
Expand Down Expand Up @@ -61,6 +62,7 @@ public void written(boolean isWritten) {
});
}


public static void send(String address, String jsonMessage, Handler handler) {
String replyAddress = UUID.randomUUID().toString();
Json json = Json.object().set(EventBusMessageAttributes.TYPE, "send")
Expand All @@ -76,6 +78,24 @@ public void written(boolean isWritten) {
}
});
}
public static void send(String address, String jsonMessage,String reply_address,Boolean nouse,Handler handler) {
String replyAddress = reply_address;
Json json = Json.object().set(EventBusMessageAttributes.TYPE, "send")
.set(EventBusMessageAttributes.ADDRESS, address)
.set(EventBusMessageAttributes.REPLY_ADDRESS, replyAddress)
.set(EventBusMessageAttributes.HEADERS, Json.object())
.set(EventBusMessageAttributes.BODY, Json.read(jsonMessage));
addReplyHandler(replyAddress, handler);
writeToWire(channel, json.toString(), new WriteHandler() {
@Override
public void written(boolean isWritten) {

}
});
}




public static void send(String address, String jsonMessage, String jsonHeaders) {
log.log(Level.INFO, "Making a SEND to EventBus Server");
Expand Down Expand Up @@ -120,6 +140,20 @@ public void written(boolean isWritten) {
}
});
}
public static void send(String address, String jsonMessage,String reply_address,Boolean nouse) {
String replyAddress = reply_address;
Json json = Json.object().set(EventBusMessageAttributes.TYPE, "send")
.set(EventBusMessageAttributes.ADDRESS, address)
.set(EventBusMessageAttributes.REPLY_ADDRESS, replyAddress)
.set(EventBusMessageAttributes.HEADERS, Json.object())
.set(EventBusMessageAttributes.BODY, Json.read(jsonMessage));
writeToWire(channel, json.toString(), new WriteHandler() {
@Override
public void written(boolean isWritten) {

}
});
}

public static void publish(String address, String jsonMessage) {
Json json = Json.object().set(EventBusMessageAttributes.TYPE, "publish")
Expand Down Expand Up @@ -196,15 +230,15 @@ public void written(boolean isWritten) {

public static void connect(String host, int port, final ConnectHandler connectHandler) {
log.log(Level.INFO, "Connecting to EventBus Server");
NioEventLoopGroup group = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();

bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new EventBusInitializer());

// final NioEventLoopGroup group = new NioEventLoopGroup();
final NioEventLoopGroup group =groupCache();
// final Bootstrap bootstrap = new Bootstrap();
//
// bootstrap.group(group)
// .channel(NioSocketChannel.class)
// .remoteAddress(new InetSocketAddress(host, port))
// .handler(new EventBusInitializer());
final Bootstrap bootstrap =bootstrapCache(host,port);
final ChannelFuture channelFuture = bootstrap.connect();
channelFuture.addListener(new ChannelFutureListener() {
@Override
Expand All @@ -217,6 +251,7 @@ public void operationComplete(ChannelFuture future) throws Exception {

} else {
connectHandler.onConnect(false);
// group.shutdownGracefully();
log.log(Level.SEVERE, "Failed Connecting to EventBus Server");

}
Expand All @@ -226,9 +261,28 @@ public void operationComplete(ChannelFuture future) throws Exception {

}

private static NioEventLoopGroup groupCache(){
if (group==null){
group = new NioEventLoopGroup();
}
return group;
}

private static Bootstrap bootstrapCache(String host, int port){
if (bootstrap==null){
bootstrap = new Bootstrap();

bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new EventBusInitializer());
}
return bootstrap;
}
public static void close() {
if (channel != null) {
final ChannelFuture closeFuture = channel.close();

closeFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess() && future.isDone()){
Expand Down
33 changes: 13 additions & 20 deletions src/main/java/ng/abdlquadri/eventbus/EventBusFrameHandler.java
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
package ng.abdlquadri.eventbus;

import static ng.abdlquadri.eventbus.EventBus.channel;
import static ng.abdlquadri.eventbus.EventBus.globalConnectHandler;
import static ng.abdlquadri.eventbus.EventBus.handlers;
import static ng.abdlquadri.eventbus.EventBus.replyHandlers;
import static ng.abdlquadri.eventbus.EventBusUtil.addReplySender;
import static ng.abdlquadri.eventbus.EventBusUtil.sendPing;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import mjson.Json;

import ng.abdlquadri.eventbus.handlers.Handler;
import ng.abdlquadri.eventbus.senders.ReplySender;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import static ng.abdlquadri.eventbus.EventBus.*;
import static ng.abdlquadri.eventbus.EventBusUtil.addReplySender;
import static ng.abdlquadri.eventbus.EventBusUtil.sendPing;

/**
* Created by abdlquadri on 12/9/15.
*/
Expand Down Expand Up @@ -110,12 +105,10 @@ public void send(String replyMessage, String headers, Handler handler) {

private String messageBufferToString(ByteBuf inMsg) {
int messageLength = inMsg.readInt();
StringBuilder message = new StringBuilder();
for (int i = 0; i < messageLength; i++) {
char c = (char) inMsg.readByte();
message.append(c);
}
return message.toString();

byte[] req = new byte[inMsg.readableBytes()];
inMsg.readBytes(req);
return new String(req);
}

@Override
Expand Down
26 changes: 15 additions & 11 deletions src/main/java/ng/abdlquadri/eventbus/EventBusUtil.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
package ng.abdlquadri.eventbus;

import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;

import mjson.Json;

import ng.abdlquadri.eventbus.handlers.Handler;
import ng.abdlquadri.eventbus.handlers.WriteHandler;
import ng.abdlquadri.eventbus.senders.ReplySender;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Created by abdlquadri on 12/19/15.
*/
Expand All @@ -40,10 +39,15 @@ public void written(boolean isWritten) {
}

public static void writeToWire(final Channel channel, String jsonObject, final WriteHandler writeHandler) {
int length = jsonObject.length();
ByteBuf buffer = Unpooled.buffer()
.writeInt(length)
.writeBytes(jsonObject.getBytes());
int length = jsonObject.toString().getBytes().length;
ByteBuf buffer = null;
try {
buffer = Unpooled.buffer()
.writeInt(length)
.writeBytes(jsonObject.getBytes("utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
log.log(Level.INFO, "Writing to wire.");

if (channel != null) {
Expand Down