本文将带您了解关于rocketmqnetty底层设计的新内容,同时我们还将为您解释rocketmq底层数据结构的相关知识,另外,我们还将为您提供关于ApacheRocketMQRocketMQ是什么?
本文将带您了解关于rocketmq netty底层设计的新内容,同时我们还将为您解释rocketmq底层数据结构的相关知识,另外,我们还将为您提供关于Apache RocketMQ RocketMQ是什么? 消息中间件、netty学习之五 netty在rocketmq中的使用、netty无缝切换rabbitmq、activemq、rocketmq实现聊天室单聊、群聊功能、RocketMQ Committer 蒋晓峰与 RocketMQ-Flink 的成长故事的实用信息。
本文目录一览:- rocketmq netty底层设计(rocketmq底层数据结构)
- Apache RocketMQ RocketMQ是什么? 消息中间件
- netty学习之五 netty在rocketmq中的使用
- netty无缝切换rabbitmq、activemq、rocketmq实现聊天室单聊、群聊功能
- RocketMQ Committer 蒋晓峰与 RocketMQ-Flink 的成长故事
rocketmq netty底层设计(rocketmq底层数据结构)
rocketmq底层网络使用的netty框架,类图如下
RecketMQ通信模块的顶层结构是RemotingServer和RemotingClient,分别对应通信的服务端和客户端
首先看看RemotingServer


1 public interface RemotingServer extends RemotingService {
2
3 void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
4 final ExecutorService executor);
5
6 void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
7
8 int localListenPort();
9
10 Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
11
12 RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
13 final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
14 RemotingTimeoutException;
15
16 void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
17 final InvokeCallback invokeCallback) throws InterruptedException,
18 RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
19
20 void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
21 throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
22 RemotingSendRequestException;
23
24 }
RemotingServer类中比较重要的是:localListenPort、registerProcessor和registerDefaultProcessor,
registerDefaultProcesor用来设置接收到消息后的处理方法。
RemotingClient类和RemotingServer类相对应,比较重要的方法是updateNameServerAddressList、
invokeSync和invokeOneway,updateNameServerAddresList用来获取有效的NameServer地址,invoke-
Sync与invokeOneway用来向Server端发送请求,如下。


1 public interface RemotingClient extends RemotingService {
2
3 void updateNameServerAddressList(final List<String> addrs);
4
5 List<String> getNameServerAddressList();
6
7 RemotingCommand invokeSync(final String addr, final RemotingCommand request,
8 final long timeoutMillis) throws InterruptedException, RemotingConnectException,
9 RemotingSendRequestException, RemotingTimeoutException;
10
11 void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
12 final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
13 RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
14
15 void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
16 throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
17 RemotingTimeoutException, RemotingSendRequestException;
18
19 void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
20 final ExecutorService executor);
21
22 void setCallbackExecutor(final ExecutorService callbackExecutor);
23
24 ExecutorService getCallbackExecutor();
25
26 boolean isChannelWritable(final String addr);
27 }
二、自定义协议
NettyRemotingServer和NettyRemotingClient分别实现了RemotingServer和RemotingClient这两个接
口,但它们有很多共有的内容,比如invokeSync、invokeOneway等,所以这些共有函数被提取到NettyRe-
motingAbstract共同继承的父类中。首先来分析一下在NettyRemotingAbstract中是如何处理接收到的内容
的,如下。
1 public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
2 final RemotingCommand cmd = msg;
3 if (cmd != null) {
4 switch (cmd.getType()) {
5 case REQUEST_COMMAND:
6 processRequestCommand(ctx, cmd);
7 break;
8 case RESPONSE_COMMAND:
9 processResponseCommand(ctx, cmd);
10 break;
11 default:
12 break;
13 }
14 }
15 }


1 public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
2 final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
3 final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
4 final int opaque = cmd.getOpaque();
5
6 if (pair != null) {
7 Runnable run = new Runnable() {
8 @Override
9 public void run() {
10 try {
11 RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
12 if (rpcHook != null) {
13 rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
14 }
15
16 final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
17 if (rpcHook != null) {
18 rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
19 }
20
21 if (!cmd.isOnewayRPC()) {
22 if (response != null) {
23 response.setOpaque(opaque);
24 response.markResponseType();
25 try {
26 ctx.writeAndFlush(response);
27 } catch (Throwable e) {
28 log.error("process request over, but response failed", e);
29 log.error(cmd.toString());
30 log.error(response.toString());
31 }
32 } else {
33
34 }
35 }
36 } catch (Throwable e) {
37 log.error("process request exception", e);
38 log.error(cmd.toString());
39
40 if (!cmd.isOnewayRPC()) {
41 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
42 RemotingHelper.exceptionSimpleDesc(e));
43 response.setOpaque(opaque);
44 ctx.writeAndFlush(response);
45 }
46 }
47 }
48 };
49
50 if (pair.getObject1().rejectRequest()) {
51 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
52 "[REJECTREQUEST]system busy, start flow control for a while");
53 response.setOpaque(opaque);
54 ctx.writeAndFlush(response);
55 return;
56 }
57
58 try {
59 final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
60 pair.getObject2().submit(requestTask);
61 } catch (RejectedExecutionException e) {
62 if ((System.currentTimeMillis() % 10000) == 0) {
63 log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
64 + ", too many requests and system thread pool busy, RejectedExecutionException "
65 + pair.getObject2().toString()
66 + " request code: " + cmd.getCode());
67 }
68
69 if (!cmd.isOnewayRPC()) {
70 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
71 "[OVERLOAD]system busy, start flow control for a while");
72 response.setOpaque(opaque);
73 ctx.writeAndFlush(response);
74 }
75 }
76 } else {
77 String error = " request type " + cmd.getCode() + " not supported";
78 final RemotingCommand response =
79 RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
80 response.setOpaque(opaque);
81 ctx.writeAndFlush(response);
82 log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
83 }
84 }


1 public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
2 final int opaque = cmd.getOpaque();
3 final ResponseFuture responseFuture = responseTable.get(opaque);
4 if (responseFuture != null) {
5 responseFuture.setResponseCommand(cmd);
6
7 responseTable.remove(opaque);
8
9 if (responseFuture.getInvokeCallback() != null) {
10 executeInvokeCallback(responseFuture);
11 } else {
12 responseFuture.putResponse(cmd);
13 responseFuture.release();
14 }
15 } else {
16 log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
17 log.warn(cmd.toString());
18 }
19 }
无论是服务端还是客户端都需要处理接收到的请求,处理方法由processMessageReceived定义,
注意这里接收到的消息已经被转换成RemotingCommand了,而不是原始的字节流。
RemotingCommand是RocketMQ自定义的协议,具体格式如下
这个协议只有四部分,但是覆盖了RocketMQ各个角色间几乎所有的通信过程,RemotingCommand有
实际的数据类型和各部分对应,如下所示。
1 private int code;
2 private LanguageCode language = LanguageCode.JAVA;
3 private int version = 0;
4 private int opaque = requestId.getAndIncrement();
5 private int flag = 0;
6 private String remark;
7 private HashMap<String, String> extFields;
8 private transient CommandCustomHeader customHeader;
9
10 private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
11
12 private transient byte[] body;
RocketMQ各个组件间的通信需要频繁地在字节码和RemotingCommand间相互转换,也就是编码、
解码过程,好在Netty提供了codec支持,这个频繁地操作只需要一行设置即可:pipeline().addLoast(new
NettyEncoder(), now NettyDecoder() )
RocketMQ对通信过程的另一个抽象是Processor和Executor,当接收到一个消息后,直接根据消息的类
型调用对应的Processor和Executor,把通信过程和业务逻辑分离开来。通过一个Broker中的代码段来看看
注册Processor的过程


1 public void registerProcessor() {
2 /**
3 * SendMessageProcessor
4 */
5 SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
6 sendProcessor.registerSendMessageHook(sendMessageHookList);
7 sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
8
9 this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
10 this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
11 this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
12 this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
13 this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
14 this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
15 this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
16 this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
17 /**
18 * PullMessageProcessor
19 */
20 this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
21 this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
22
23 /**
24 * QueryMessageProcessor
25 */
26 NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
27 this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
28 this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
29
30 this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
31 this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
32
33 /**
34 * ClientManageProcessor
35 */
36 ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
37 this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
38 this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
39 this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
40
41 this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
42 this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
43 this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
44
45 /**
46 * ConsumerManageProcessor
47 */
48 ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
49 this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
50 this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
51 this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
52
53 this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
54 this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
55 this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
56
57 /**
58 * EndTransactionProcessor
59 */
60 this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
61 this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
62
63 /**
64 * Default
65 */
66 AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
67 this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
68 this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
69 }
Apache RocketMQ RocketMQ是什么? 消息中间件
Apache RocketMQ RocketMQ是什么? 介绍
RocketMQ是什么?
RocketMQ
是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:
-
能够保证严格的消息顺序
-
提供丰富的消息拉取模式
-
高效的订阅者水平扩展能力
-
实时的消息订阅机制
-
亿级消息堆积能力
-
Metaq3.0 版本改名,产品名称改为RocketMQ
Apache RocketMQ RocketMQ是什么? 官网
https://rocketmq.apache.org/
netty学习之五 netty在rocketmq中的使用
Rocketmq 的通信层是基于通信框架 netty,下面来看看rocketmq底层继承图。
NettyRemotingAbstract是NettyRemotingClient和NettyRemotingServer的抽象父类,对发送和接收的公共部分进行了处理
一 . 首先在数据结构方面使用了responseFuture模式
1.保存了RPC处理器 ,Broker 接收请求将 opaque 直接把这个值设置回响应对象,客户端接收到这个响应,通过 opaque 从缓存查找对应的 ResponseFuture 对象
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
当服务端接受客户端响应的时候,会调用NettyServerHandler处理客户端的请求。
ServerBootstrap childHandler = //
this.serverBootstrap.group(this.eventLoopGroupBoss,
this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
//
.option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
//
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup, //
new NettyEncoder(), //
new NettyDecoder(), //
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //
new NettyConnetManageHandler(), //
new NettyServerHandler());
}
});
NettyServerHandler处理channelRead的时候调用processMessageRecevived
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
processMesssageReceived方法就会判断当前是作为Server端,接收的消息是请求,那么调用processTable对应的事件进行处理,如果作为Client端,接收的消息是回复,即接收到Server端的回复,那么从responseTable中,首先获取opaque对应的ResponseFuture,如果这个response是异步回调,则有InvokeCallback,那么调用invokeBack函数,然后将Response塞入ResponseFuture后返回;
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
2.其次还保存了对外请求
protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);
在服务端启动 start() 方法的时候调用线程启动扫描将超时的responseFuture直接删除掉
public void start() {
.............................
this.timer.scheduleAtFixedRate(new TimerTask() {
[@Override](https://my.oschina.net/u/1162528)
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Exception e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
扫描方法如下,判断哪些future超时:
public void scanResponseTable() {
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture> next = it.next();
ResponseFuture rep = next.getValue();
if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
rep.release();
it.remove();
rfList.add(rep);
plog.warn("remove timeout request, " + rep);
}
}
for (ResponseFuture rf : rfList) {
try {
rf.executeInvokeCallback();
} catch (Throwable e) {
plog.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
二. 上面看了基本的数据结构和数据处理方式,下面看下发送方信息的发送逻辑。
- invokeSyncImpl: 同步发送,发送时,生成ResponseFuture,放入responseTable中;然后发送后等待设置的timeout(3s)时间,如果对应的ResponseFuture为空,则报错;否则返回RemoteCommand进行业务逻辑处理;发送失败设置 ResponseFuture 发送失败,并且从缓存中移除 ResponseFuture(没有响 应过来,就用不到缓存中的 ResponseFuturel)
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
plog.warn("send a request command to channel <" + addr + "> failed.");
}
});
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
2 invokeAsyncImpl:异步发送,发送时,生成ResponseFuture,设置 opaque, callback, once,超时时间等值,并放入缓存集合,放入responseTable中;如果超过scanResponseTable的timeout (30s),则报错;否则调用注册的invokeCallback进行回调处理;异步一般链路耗时比较长, 为了防止本地缓存的 netty 请求过多, 使用信号量控制上,限默认 2048 个,发送成功 responseFuture.setSendRequestOK(true); 发送失败 responseFuture.setSendRequestOK(false), 信号量通过 once 释放, 删除缓存 Netty 接收 server 端响应,根据 opaque 从缓存获取 responseFuture,调用回调方法即接 口 InvokeCallback 实现
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
plog.warn("send a request command to channel <" + addr + "> failed.");
}
});
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
rocketmq大概的通信流程基本如此,具体还是要去github上下载源码研读,学下源代码对自身的提高确实蛮大的。
netty无缝切换rabbitmq、activemq、rocketmq实现聊天室单聊、群聊功能
netty的pipeline处理链上的handler:需要IdleStateHandler心跳检测channel是否有效,以及处理登录认证的UserAuthHandler和消息处理MessageHandler
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(defLoopGroup,
//编码解码器
new HttpServerCodec(),
//将多个消息转换成单一的消息对象
new HttpObjectAggregator(65536),
//支持异步发送大的码流,一般用于发送文件流
new ChunkedWriteHandler(),
//检测链路是否读空闲,配合心跳handler检测channel是否正常
new IdleStateHandler(60, 0, 0),
//处理握手和认证
new UserAuthHandler(),
//处理消息的发送
new MessageHandler()
);
}
对于所有连进来的channel,我们需要保存起来,往后的群发消息需要依靠这些channel
public static void addChannel(Channel channel) {
String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);
System.out.println("addChannel:" + remoteAddr);
if (!channel.isActive()) {
logger.error("channel is not active, address: {}", remoteAddr);
}
UserInfo userInfo = new UserInfo();
userInfo.setAddr(remoteAddr);
userInfo.setChannel(channel);
userInfo.setTime(System.currentTimeMillis());
userInfos.put(channel, userInfo);
}
登录后,channel就变成有效的channel,无效的channel之后将会丢弃
public static boolean saveUser(Channel channel, String nick, String password) {
UserInfo userInfo = userInfos.get(channel);
if (userInfo == null) {
return false;
}
if (!channel.isActive()) {
logger.error("channel is not active, address: {}, nick: {}", userInfo.getAddr(), nick);
return false;
}
// 验证用户名和密码
if (nick == null || password == null) {
return false;
}
LambdaQueryWrapper<account> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(Account::getUsername, nick).eq(Account::getPassword, password);
Account account = accountMapperStatic.selectOne(lambdaQueryWrapper);
if (account == null) {
return false;
}
// 增加一个认证用户
userCount.incrementAndGet();
userInfo.setNick(nick);
userInfo.setAuth(true);
userInfo.setId(account.getId());
userInfo.setUsername(account.getUsername());
userInfo.setGroupNumber(account.getGroupNumber());
userInfo.setTime(System.currentTimeMillis());
// 注册该用户推送消息的通道
offlineInfoTransmitStatic.registerPull(channel);
return true;
}
当channel关闭时,就不再接收消息。unregisterPull就是注销信息消费者,客户端不再接取聊天消息。此外,从下方有一个加写锁的操作,就是为了避免channel还在发送消息时,这边突然关闭channel,这样会导致报错。
public static void removeChannel(Channel channel) {
try {
logger.warn("channel will be remove, address is :{}", NettyUtil.parseChannelRemoteAddr(channel));
//加上读写锁保证移除channel时,避免channel关闭时,还有别的线程对其操作,造成错误
rwLock.writeLock().lock();
channel.close();
UserInfo userInfo = userInfos.get(channel);
if (userInfo != null) {
if (userInfo.isAuth()) {
offlineInfoTransmitStatic.unregisterPull(channel);
// 减去一个认证用户
userCount.decrementAndGet();
}
userInfos.remove(channel);
}
} finally {
rwLock.writeLock().unlock();
}
}
为了无缝切换使用rabbitmq、rocketmq、activemq、不使用中间件存储和转发聊天消息这4种状态,定义如下4个接口。依次是发送单聊消息、群聊消息、客户端启动接收消息、客户端下线不接收消息。
public interface OfflineInfoTransmit {
void pushP2P(Integer userId, String message);
void pushGroup(String groupNumber, String message);
void registerPull(Channel channel);
void unregisterPull(Channel channel);
}
其中,如何使用rabbitmq、rocketmq、activemq三种中间件中的一种来存储和转发聊天消息,它的处理流程如下:
- 单聊的模型参考线程池的模型,如果用户在线,直接通过channel发送给用户。如果用户离线,则发往中间件存储,下次用户上线时直接从中间件拉取消息。这样做对比所有消息的发送都通过中间件来转的好处是提升了性能
- 群聊则是完全通过中间件来转发消息,消息发送中间件,客户端从中间件接取消息。如果仍像单聊那样操作,在线用户直接通过channel发送,操作过于繁琐,要判断这个群组的哪些用户是否在线
- 如果用户在线就注册消费者,从中间件接取消息。否则,就断开消费者,消息保留在中间件中,以便客户端下次上线时拉取。这样就实现了离线消息的接收。
- 不管使用哪种中间件或使用不使用中间件,它的处理流程都遵循上面的3个要求,就能无缝切换上方的4种方法来存储和转发消息。需要哪种方法开启相应注解即可。
项目地址:https://github.com/shuangyueliao/netty-chat
更多技术文章,欢迎关注公众号一枚Java码农
RocketMQ Committer 蒋晓峰与 RocketMQ-Flink 的成长故事
自我介绍
我叫蒋晓峰,来自于阿里云开源大数据平台生态技术组,负责 Apache Flink 的 Connector 生态,同时负责集团和开源使用 RocketMQ Connector 的用户答疑。
最初接触到 RocketMQ 的时候,却是在来阿里以前,作为开源和大数据爱好者,慢慢发现上下游 RocketMQ 出现的次数也越来越多,其作为业务消息领域的首选,大量高价值的业务数据通过 RocketMQ 流转,自然而然的就产生了数据处理的需求,而 Flink 在实时计算中已经近乎成为事实标准,两相结合,于是就有了 rocketmq-flink[1] 的诞生。
当然,不仅仅是在一次次流量洪峰挑战下与阿里云 RocketMQ 团队结成的深厚战斗友谊,也包括逐渐喜欢上了 RocketMQ 稳定至上、重剑无锋的风格,所有的原因都促使我产生了对 rocketmq-flink 进行深度优化的念头。因此,在随后的日子里,我不仅与社区一起优化了原有的 RocketMQ DataStream Source/sink 的实现,而且在消息领域率先实现了 FLIP-27 的全新接口,并开始对 FLIP-143 进行支持;为了给用户提供基于 SQL 的开发体验,在 Datastream 实现的基础上,提供了 ScanTableSource/DynamicTableSink Table 接口的实现,目前 RocketMQ Connector 这三套公共接口的实现,均已经过了 “双十一” 海量 RocketMQ 消息数据考验洗礼表现稳定无任何故障,而这一切均已贡献给了社区。
经过 Apache RocketMQ 社区投票,有幸成为一名 Committer,Apache RocketMQ社区的开放、友好、积极的氛围,激励着我一路成长,在此也感谢我的社区领路人杜恒。成为 Committer,既是社区对自己的肯定,也是一份沉甸甸的责任,我也将努力与 RocketMQ 社区开发者一起,将 RocketMQ 打造成为最符合实时计算引擎Flink的消息系统,全面融入 Flink 生态;除此之外,也将与社区一起,持续优化符合流计算语义的轻量级 rocketmq-flink connect,共同将 Apache RocketMQ 打造成新一代“消息、事件、流”融合处理平台。
RocketMQ-Flink 介绍
Apache Flink 的 RocketMQ 集成 RocketMQ-Flink 模块包括 RocketMQSource/RocketMQSink 和 RocketTableSource/RocketMQSink,允许用户使用其 Source 从 RocketMQ 的主题中实例订阅消息和 Sink 发布消息到 RocketMQ 的主题。
RocketMQ SQL Connector
如何创建 Apache Flink 的 RocketMQ 表?
下面的例子展示了如何创建 RocketMQ 表:
CREATE TABLE rocketmq_source (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
''connector'' = ''rocketmq'',
''topic'' = ''user_behavior'',
''consumeGroup'' = ''behavior_consume_group'',
''nameServerAddress'' = ''127.0.0.1:9876''
);
CREATE TABLE rocketmq_sink (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
''connector'' = ''rocketmq'',
''topic'' = ''user_behavior'',
''produceGroup'' = ''behavior_produce_group'',
''nameServerAddress'' = ''127.0.0.1:9876''
);
可用元数据
以下的连接器元数据可以在表定义中通过元数据列的形式获取。
R/W
列定义了一个元数据是可读的(R
)还是可写的(W
)。
只读列必须声明为 VIRTUAL
以在 INSERT INTO
操作中排除它们。
KEY | DATA TYPE | DESCRIPTION | DEFAULT |
---|---|---|---|
topic | STRING NOT NULL | Topic name of the RocketMQ record. | R |
扩展的 CREATE TABLE
示例演示公开这些元数据字段的语法:
CREATE TABLE rocketmq_source (
`topic` STRING METADATA VIRTUAL,
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
''connector'' = ''rocketmq'',
''topic'' = ''user_behavior'',
''consumeGroup'' = ''behavior_consume_group'',
''nameServerAddress'' = ''127.0.0.1:9876''
);
欢迎社区用户试用 rocketmq-flink 并进行积极反馈,也欢迎Flink 与 RocketMQ 社区开发者积极参与进来,共同完善 rocketmq flink connector。
[1] https://github.com/apache/rocketmq-flink
本文分享自微信公众号 - RocketMQ官微(ApacheRocketMQ)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。
今天关于rocketmq netty底层设计和rocketmq底层数据结构的讲解已经结束,谢谢您的阅读,如果想了解更多关于Apache RocketMQ RocketMQ是什么? 消息中间件、netty学习之五 netty在rocketmq中的使用、netty无缝切换rabbitmq、activemq、rocketmq实现聊天室单聊、群聊功能、RocketMQ Committer 蒋晓峰与 RocketMQ-Flink 的成长故事的相关知识,请在本站搜索。
本文标签: