GVKun编程网logo

rocketmq netty底层设计(rocketmq底层数据结构)

9

本文将带您了解关于rocketmqnetty底层设计的新内容,同时我们还将为您解释rocketmq底层数据结构的相关知识,另外,我们还将为您提供关于ApacheRocketMQRocketMQ是什么?

本文将带您了解关于rocketmq netty底层设计的新内容,同时我们还将为您解释rocketmq底层数据结构的相关知识,另外,我们还将为您提供关于Apache RocketMQ RocketMQ是什么? 消息中间件、netty学习之五 netty在rocketmq中的使用、netty无缝切换rabbitmq、activemq、rocketmq实现聊天室单聊、群聊功能、RocketMQ Committer 蒋晓峰与 RocketMQ-Flink 的成长故事的实用信息。

本文目录一览:

rocketmq netty底层设计(rocketmq底层数据结构)

rocketmq netty底层设计(rocketmq底层数据结构)

rocketmq底层网络使用的netty框架,类图如下

 

  RecketMQ通信模块的顶层结构是RemotingServer和RemotingClient,分别对应通信的服务端和客户端

首先看看RemotingServer

 1 public interface RemotingServer extends RemotingService {
 2 
 3     void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
 4         final ExecutorService executor);
 5 
 6     void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
 7 
 8     int localListenPort();
 9 
10     Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
11 
12     RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
13         final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
14         RemotingTimeoutException;
15 
16     void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
17         final InvokeCallback invokeCallback) throws InterruptedException,
18         RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
19 
20     void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
21         throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
22         RemotingSendRequestException;
23 
24 }
RemotingServer

 

  RemotingServer类中比较重要的是:localListenPort、registerProcessor和registerDefaultProcessor,

registerDefaultProcesor用来设置接收到消息后的处理方法。

  RemotingClient类和RemotingServer类相对应,比较重要的方法是updateNameServerAddressList、

invokeSync和invokeOneway,updateNameServerAddresList用来获取有效的NameServer地址,invoke-

Sync与invokeOneway用来向Server端发送请求,如下。

 1 public interface RemotingClient extends RemotingService {
 2 
 3     void updateNameServerAddressList(final List<String> addrs);
 4 
 5     List<String> getNameServerAddressList();
 6 
 7     RemotingCommand invokeSync(final String addr, final RemotingCommand request,
 8         final long timeoutMillis) throws InterruptedException, RemotingConnectException,
 9         RemotingSendRequestException, RemotingTimeoutException;
10 
11     void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
12         final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
13         RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
14 
15     void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
16         throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
17         RemotingTimeoutException, RemotingSendRequestException;
18 
19     void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
20         final ExecutorService executor);
21 
22     void setCallbackExecutor(final ExecutorService callbackExecutor);
23 
24     ExecutorService getCallbackExecutor();
25 
26     boolean isChannelWritable(final String addr);
27 }
RemotingClient

 

二、自定义协议

  NettyRemotingServer和NettyRemotingClient分别实现了RemotingServer和RemotingClient这两个接

口,但它们有很多共有的内容,比如invokeSync、invokeOneway等,所以这些共有函数被提取到NettyRe-

motingAbstract共同继承的父类中。首先来分析一下在NettyRemotingAbstract中是如何处理接收到的内容

的,如下。

 1    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
 2         final RemotingCommand cmd = msg;
 3         if (cmd != null) {
 4             switch (cmd.getType()) {
 5                 case REQUEST_COMMAND:
 6                     processRequestCommand(ctx, cmd);
 7                     break;
 8                 case RESPONSE_COMMAND:
 9                     processResponseCommand(ctx, cmd);
10                     break;
11                 default:
12                     break;
13             }
14         }
15     }
 1     public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
 2         final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
 3         final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
 4         final int opaque = cmd.getOpaque();
 5 
 6         if (pair != null) {
 7             Runnable run = new Runnable() {
 8                 @Override
 9                 public void run() {
10                     try {
11                         RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
12                         if (rpcHook != null) {
13                             rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
14                         }
15 
16                         final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
17                         if (rpcHook != null) {
18                             rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
19                         }
20 
21                         if (!cmd.isOnewayRPC()) {
22                             if (response != null) {
23                                 response.setOpaque(opaque);
24                                 response.markResponseType();
25                                 try {
26                                     ctx.writeAndFlush(response);
27                                 } catch (Throwable e) {
28                                     log.error("process request over, but response failed", e);
29                                     log.error(cmd.toString());
30                                     log.error(response.toString());
31                                 }
32                             } else {
33 
34                             }
35                         }
36                     } catch (Throwable e) {
37                         log.error("process request exception", e);
38                         log.error(cmd.toString());
39 
40                         if (!cmd.isOnewayRPC()) {
41                             final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
42                                 RemotingHelper.exceptionSimpleDesc(e));
43                             response.setOpaque(opaque);
44                             ctx.writeAndFlush(response);
45                         }
46                     }
47                 }
48             };
49 
50             if (pair.getObject1().rejectRequest()) {
51                 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
52                     "[REJECTREQUEST]system busy, start flow control for a while");
53                 response.setOpaque(opaque);
54                 ctx.writeAndFlush(response);
55                 return;
56             }
57 
58             try {
59                 final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
60                 pair.getObject2().submit(requestTask);
61             } catch (RejectedExecutionException e) {
62                 if ((System.currentTimeMillis() % 10000) == 0) {
63                     log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
64                         + ", too many requests and system thread pool busy, RejectedExecutionException "
65                         + pair.getObject2().toString()
66                         + " request code: " + cmd.getCode());
67                 }
68 
69                 if (!cmd.isOnewayRPC()) {
70                     final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
71                         "[OVERLOAD]system busy, start flow control for a while");
72                     response.setOpaque(opaque);
73                     ctx.writeAndFlush(response);
74                 }
75             }
76         } else {
77             String error = " request type " + cmd.getCode() + " not supported";
78             final RemotingCommand response =
79                 RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
80             response.setOpaque(opaque);
81             ctx.writeAndFlush(response);
82             log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
83         }
84     }
processRequestCommand
 1    public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
 2         final int opaque = cmd.getOpaque();
 3         final ResponseFuture responseFuture = responseTable.get(opaque);
 4         if (responseFuture != null) {
 5             responseFuture.setResponseCommand(cmd);
 6 
 7             responseTable.remove(opaque);
 8 
 9             if (responseFuture.getInvokeCallback() != null) {
10                 executeInvokeCallback(responseFuture);
11             } else {
12                 responseFuture.putResponse(cmd);
13                 responseFuture.release();
14             }
15         } else {
16             log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
17             log.warn(cmd.toString());
18         }
19     }
processResponseCommand

 

  无论是服务端还是客户端都需要处理接收到的请求,处理方法由processMessageReceived定义,

注意这里接收到的消息已经被转换成RemotingCommand了,而不是原始的字节流。

 

  RemotingCommand是RocketMQ自定义的协议,具体格式如下

 

  这个协议只有四部分,但是覆盖了RocketMQ各个角色间几乎所有的通信过程,RemotingCommand有

实际的数据类型和各部分对应,如下所示。

 1     private int code;
 2     private LanguageCode language = LanguageCode.JAVA;
 3     private int version = 0;
 4     private int opaque = requestId.getAndIncrement();
 5     private int flag = 0;
 6     private String remark;
 7     private HashMap<String, String> extFields;
 8     private transient CommandCustomHeader customHeader;
 9 
10     private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
11 
12     private transient byte[] body;

  RocketMQ各个组件间的通信需要频繁地在字节码和RemotingCommand间相互转换,也就是编码、

解码过程,好在Netty提供了codec支持,这个频繁地操作只需要一行设置即可:pipeline().addLoast(new

NettyEncoder(), now NettyDecoder() )

 

  RocketMQ对通信过程的另一个抽象是Processor和Executor,当接收到一个消息后,直接根据消息的类

型调用对应的Processor和Executor,把通信过程和业务逻辑分离开来。通过一个Broker中的代码段来看看

注册Processor的过程

 1    public void registerProcessor() {
 2         /**
 3          * SendMessageProcessor
 4          */
 5         SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
 6         sendProcessor.registerSendMessageHook(sendMessageHookList);
 7         sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
 8 
 9         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
10         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
11         this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
12         this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
13         this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
14         this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
15         this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
16         this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
17         /**
18          * PullMessageProcessor
19          */
20         this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
21         this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
22 
23         /**
24          * QueryMessageProcessor
25          */
26         NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
27         this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
28         this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
29 
30         this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
31         this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
32 
33         /**
34          * ClientManageProcessor
35          */
36         ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
37         this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
38         this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
39         this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
40 
41         this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
42         this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
43         this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
44 
45         /**
46          * ConsumerManageProcessor
47          */
48         ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
49         this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
50         this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
51         this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
52 
53         this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
54         this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
55         this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
56 
57         /**
58          * EndTransactionProcessor
59          */
60         this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
61         this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
62 
63         /**
64          * Default
65          */
66         AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
67         this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
68         this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
69     }
BrokerController

 

Apache RocketMQ RocketMQ是什么? 消息中间件

Apache RocketMQ RocketMQ是什么? 消息中间件

Apache RocketMQ RocketMQ是什么? 介绍

RocketMQ是什么?

RocketMQ
是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  • 能够保证严格的消息顺序

  • 提供丰富的消息拉取模式

  • 高效的订阅者水平扩展能力

  • 实时的消息订阅机制

  • 亿级消息堆积能力

  • Metaq3.0 版本改名,产品名称改为RocketMQ

Apache RocketMQ RocketMQ是什么? 官网

https://rocketmq.apache.org/

netty学习之五 netty在rocketmq中的使用

netty学习之五 netty在rocketmq中的使用

Rocketmq 的通信层是基于通信框架 netty,下面来看看rocketmq底层继承图。

输入图片说明

NettyRemotingAbstract是NettyRemotingClient和NettyRemotingServer的抽象父类,对发送和接收的公共部分进行了处理

一 . 首先在数据结构方面使用了responseFuture模式

1.保存了RPC处理器 ,Broker 接收请求将 opaque 直接把这个值设置回响应对象,客户端接收到这个响应,通过 opaque 从缓存查找对应的 ResponseFuture 对象

protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
          new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

当服务端接受客户端响应的时候,会调用NettyServerHandler处理客户端的请求。

 ServerBootstrap childHandler = //
                this.serverBootstrap.group(this.eventLoopGroupBoss,     
this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                       
                        .option(ChannelOption.SO_REUSEADDR, true)
                       
                        .option(ChannelOption.SO_KEEPALIVE, false)
                       
                        .childOption(ChannelOption.TCP_NODELAY, true)
                       
                        .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                        //
                        .option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                        //
                        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(
                                        defaultEventExecutorGroup, //
                                        new NettyEncoder(), //
                                        new NettyDecoder(), //
                                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //
                                        new NettyConnetManageHandler(), //
                                        new NettyServerHandler());
                            }
                        });

NettyServerHandler处理channelRead的时候调用processMessageRecevived


   class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }

processMesssageReceived方法就会判断当前是作为Server端,接收的消息是请求,那么调用processTable对应的事件进行处理,如果作为Client端,接收的消息是回复,即接收到Server端的回复,那么从responseTable中,首先获取opaque对应的ResponseFuture,如果这个response是异步回调,则有InvokeCallback,那么调用invokeBack函数,然后将Response塞入ResponseFuture后返回;

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

2.其次还保存了对外请求

protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable =
            new ConcurrentHashMap<Integer, ResponseFuture>(256);

在服务端启动 start() 方法的时候调用线程启动扫描将超时的responseFuture直接删除掉

 public void start() {
    .............................
     this.timer.scheduleAtFixedRate(new TimerTask() {
            [@Override](https://my.oschina.net/u/1162528)
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Exception e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
}

扫描方法如下,判断哪些future超时:

 public void scanResponseTable() {
        final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
        Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<Integer, ResponseFuture> next = it.next();
            ResponseFuture rep = next.getValue();
            if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
                rep.release();
                it.remove();
                rfList.add(rep);
                plog.warn("remove timeout request, " + rep);
            }
        }
        for (ResponseFuture rf : rfList) {
            try {
                rf.executeInvokeCallback();
            } catch (Throwable e) {
                plog.warn("scanResponseTable, operationComplete Exception", e);
            }
        }
    }

二. 上面看了基本的数据结构和数据处理方式,下面看下发送方信息的发送逻辑。

  1. invokeSyncImpl: 同步发送,发送时,生成ResponseFuture,放入responseTable中;然后发送后等待设置的timeout(3s)时间,如果对应的ResponseFuture为空,则报错;否则返回RemoteCommand进行业务逻辑处理;发送失败设置 ResponseFuture 发送失败,并且从缓存中移除 ResponseFuture(没有响 应过来,就用不到缓存中的 ResponseFuturel)
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
            throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();
        try {
            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }
                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    plog.warn("send a request command to channel <" + addr + "> failed.");
                }
            });

            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                            responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }
            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }

输入图片说明

2 invokeAsyncImpl:异步发送,发送时,生成ResponseFuture,设置 opaque, callback, once,超时时间等值,并放入缓存集合,放入responseTable中;如果超过scanResponseTable的timeout (30s),则报错;否则调用注册的invokeCallback进行回调处理;异步一般链路耗时比较长, 为了防止本地缓存的 netty 请求过多, 使用信号量控制上,限默认 2048 个,发送成功 responseFuture.setSendRequestOK(true); 发送失败 responseFuture.setSendRequestOK(false), 信号量通过 once 释放, 删除缓存 Netty 接收 server 端响应,根据 opaque 从缓存获取 responseFuture,调用回调方法即接 口 InvokeCallback 实现

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
           throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
       final int opaque = request.getOpaque();
       try {
           final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
           this.responseTable.put(opaque, responseFuture);
           final SocketAddress addr = channel.remoteAddress();
           channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
               @Override
               public void operationComplete(ChannelFuture f) throws Exception {
                   if (f.isSuccess()) {
                       responseFuture.setSendRequestOK(true);
                       return;
                   } else {
                       responseFuture.setSendRequestOK(false);
                   }
                   responseTable.remove(opaque);
                   responseFuture.setCause(f.cause());
                   responseFuture.putResponse(null);
                   plog.warn("send a request command to channel <" + addr + "> failed.");
               }
           });

           RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
           if (null == responseCommand) {
               if (responseFuture.isSendRequestOK()) {
                   throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                           responseFuture.getCause());
               } else {
                   throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
               }
           }
           return responseCommand;
       } finally {
           this.responseTable.remove(opaque);
       }
   }

输入图片说明

rocketmq大概的通信流程基本如此,具体还是要去github上下载源码研读,学下源代码对自身的提高确实蛮大的。

netty无缝切换rabbitmq、activemq、rocketmq实现聊天室单聊、群聊功能

netty无缝切换rabbitmq、activemq、rocketmq实现聊天室单聊、群聊功能

file

file

netty的pipeline处理链上的handler:需要IdleStateHandler心跳检测channel是否有效,以及处理登录认证的UserAuthHandler和消息处理MessageHandler

protected void initChannel(SocketChannel ch) throws Exception {
	ch.pipeline().addLast(defLoopGroup,
		//编码解码器
		new HttpServerCodec(),
		//将多个消息转换成单一的消息对象
		new HttpObjectAggregator(65536),
		//支持异步发送大的码流,一般用于发送文件流
		new ChunkedWriteHandler(),
		//检测链路是否读空闲,配合心跳handler检测channel是否正常
		new IdleStateHandler(60, 0, 0),
		//处理握手和认证
		new UserAuthHandler(),
		//处理消息的发送
		new MessageHandler()
	);
}

对于所有连进来的channel,我们需要保存起来,往后的群发消息需要依靠这些channel

public static void addChannel(Channel channel) {
        String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);
        System.out.println("addChannel:" + remoteAddr);
        if (!channel.isActive()) {
            logger.error("channel is not active, address: {}", remoteAddr);
        }
        UserInfo userInfo = new UserInfo();
        userInfo.setAddr(remoteAddr);
        userInfo.setChannel(channel);
        userInfo.setTime(System.currentTimeMillis());
        userInfos.put(channel, userInfo);
    }

登录后,channel就变成有效的channel,无效的channel之后将会丢弃

public static boolean saveUser(Channel channel, String nick, String password) {
        UserInfo userInfo = userInfos.get(channel);
        if (userInfo == null) {
            return false;
        }
        if (!channel.isActive()) {
            logger.error("channel is not active, address: {}, nick: {}", userInfo.getAddr(), nick);
            return false;
        }
        // 验证用户名和密码
        if (nick == null || password == null) {
            return false;
        }
        LambdaQueryWrapper<account> lambdaQueryWrapper = new LambdaQueryWrapper&lt;&gt;();
        lambdaQueryWrapper.eq(Account::getUsername, nick).eq(Account::getPassword, password);
        Account account = accountMapperStatic.selectOne(lambdaQueryWrapper);
        if (account == null) {
            return false;
        }
        // 增加一个认证用户
        userCount.incrementAndGet();
        userInfo.setNick(nick);
        userInfo.setAuth(true);
        userInfo.setId(account.getId());
        userInfo.setUsername(account.getUsername());
        userInfo.setGroupNumber(account.getGroupNumber());
        userInfo.setTime(System.currentTimeMillis());

        // 注册该用户推送消息的通道
        offlineInfoTransmitStatic.registerPull(channel);
        return true;
    }

当channel关闭时,就不再接收消息。unregisterPull就是注销信息消费者,客户端不再接取聊天消息。此外,从下方有一个加写锁的操作,就是为了避免channel还在发送消息时,这边突然关闭channel,这样会导致报错。

public static void removeChannel(Channel channel) {
        try {
            logger.warn("channel will be remove, address is :{}", NettyUtil.parseChannelRemoteAddr(channel));
            //加上读写锁保证移除channel时,避免channel关闭时,还有别的线程对其操作,造成错误
            rwLock.writeLock().lock();
            channel.close();
            UserInfo userInfo = userInfos.get(channel);
            if (userInfo != null) {
                if (userInfo.isAuth()) {
                    offlineInfoTransmitStatic.unregisterPull(channel);
                    // 减去一个认证用户
                    userCount.decrementAndGet();
                }
                userInfos.remove(channel);
            }
        } finally {
            rwLock.writeLock().unlock();
        }

    }

为了无缝切换使用rabbitmq、rocketmq、activemq、不使用中间件存储和转发聊天消息这4种状态,定义如下4个接口。依次是发送单聊消息、群聊消息、客户端启动接收消息、客户端下线不接收消息。

public interface OfflineInfoTransmit {
    void pushP2P(Integer userId, String message);

    void pushGroup(String groupNumber, String message);

    void registerPull(Channel channel);

    void unregisterPull(Channel channel);
}

其中,如何使用rabbitmq、rocketmq、activemq三种中间件中的一种来存储和转发聊天消息,它的处理流程如下:

  1. 单聊的模型参考线程池的模型,如果用户在线,直接通过channel发送给用户。如果用户离线,则发往中间件存储,下次用户上线时直接从中间件拉取消息。这样做对比所有消息的发送都通过中间件来转的好处是提升了性能
  2. 群聊则是完全通过中间件来转发消息,消息发送中间件,客户端从中间件接取消息。如果仍像单聊那样操作,在线用户直接通过channel发送,操作过于繁琐,要判断这个群组的哪些用户是否在线
  3. 如果用户在线就注册消费者,从中间件接取消息。否则,就断开消费者,消息保留在中间件中,以便客户端下次上线时拉取。这样就实现了离线消息的接收。
  4. 不管使用哪种中间件或使用不使用中间件,它的处理流程都遵循上面的3个要求,就能无缝切换上方的4种方法来存储和转发消息。需要哪种方法开启相应注解即可。

file

项目地址:https://github.com/shuangyueliao/netty-chat

更多技术文章,欢迎关注公众号一枚Java码农 image

RocketMQ Committer 蒋晓峰与 RocketMQ-Flink 的成长故事


自我介绍

我叫蒋晓峰,来自于阿里云开源大数据平台生态技术组,负责 Apache Flink 的 Connector 生态,同时负责集团和开源使用 RocketMQ Connector 的用户答疑。

最初接触到 RocketMQ 的时候,却是在来阿里以前,作为开源和大数据爱好者,慢慢发现上下游 RocketMQ 出现的次数也越来越多,其作为业务消息领域的首选,大量高价值的业务数据通过 RocketMQ 流转,自然而然的就产生了数据处理的需求,而 Flink 在实时计算中已经近乎成为事实标准,两相结合,于是就有了 rocketmq-flink[1] 的诞生。

当然,不仅仅是在一次次流量洪峰挑战下与阿里云 RocketMQ 团队结成的深厚战斗友谊,也包括逐渐喜欢上了 RocketMQ 稳定至上、重剑无锋的风格,所有的原因都促使我产生了对 rocketmq-flink 进行深度优化的念头。因此,在随后的日子里,我不仅与社区一起优化了原有的 RocketMQ DataStream Source/sink 的实现,而且在消息领域率先实现了 FLIP-27 的全新接口,并开始对 FLIP-143 进行支持;为了给用户提供基于 SQL 的开发体验,在 Datastream 实现的基础上,提供了 ScanTableSource/DynamicTableSink Table 接口的实现,目前 RocketMQ Connector 这三套公共接口的实现,均已经过了 “双十一” 海量 RocketMQ 消息数据考验洗礼表现稳定无任何故障,而这一切均已贡献给了社区。

经过 Apache RocketMQ 社区投票,有幸成为一名 Committer,Apache RocketMQ社区的开放、友好、积极的氛围,激励着我一路成长,在此也感谢我的社区领路人杜恒。成为 Committer,既是社区对自己的肯定,也是一份沉甸甸的责任,我也将努力与 RocketMQ 社区开发者一起,将 RocketMQ 打造成为最符合实时计算引擎Flink的消息系统,全面融入 Flink 生态;除此之外,也将与社区一起,持续优化符合流计算语义的轻量级 rocketmq-flink connect,共同将 Apache RocketMQ 打造成新一代“消息、事件、流”融合处理平台。

RocketMQ-Flink 介绍

Apache Flink 的 RocketMQ 集成 RocketMQ-Flink 模块包括 RocketMQSource/RocketMQSink 和 RocketTableSource/RocketMQSink,允许用户使用其 Source 从 RocketMQ 的主题中实例订阅消息和 Sink 发布消息到 RocketMQ 的主题。

RocketMQ SQL Connector

  • 如何创建 Apache Flink 的 RocketMQ 表?

下面的例子展示了如何创建 RocketMQ 表:

CREATE TABLE rocketmq_source (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
''connector'' = ''rocketmq'',
''topic'' = ''user_behavior'',
''consumeGroup'' = ''behavior_consume_group'',
''nameServerAddress'' = ''127.0.0.1:9876''
);

CREATE TABLE rocketmq_sink (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
''connector'' = ''rocketmq'',
''topic'' = ''user_behavior'',
''produceGroup'' = ''behavior_produce_group'',
''nameServerAddress'' = ''127.0.0.1:9876''
);
  • 可用元数据

以下的连接器元数据可以在表定义中通过元数据列的形式获取。

R/W 列定义了一个元数据是可读的(R)还是可写的(W)。
只读列必须声明为 VIRTUAL 以在 INSERT INTO 操作中排除它们。

KEY DATA TYPE DESCRIPTION DEFAULT
topic STRING NOT NULL Topic name of the RocketMQ record. R

扩展的 CREATE TABLE 示例演示公开这些元数据字段的语法:

CREATE TABLE rocketmq_source (
`topic` STRING METADATA VIRTUAL,
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
''connector'' = ''rocketmq'',
''topic'' = ''user_behavior'',
''consumeGroup'' = ''behavior_consume_group'',
''nameServerAddress'' = ''127.0.0.1:9876''
);



欢迎社区用户试用 rocketmq-flink 并进行积极反馈,也欢迎Flink 与 RocketMQ 社区开发者积极参与进来,共同完善 rocketmq flink connector。


[1] https://github.com/apache/rocketmq-flink

本文分享自微信公众号 - RocketMQ官微(ApacheRocketMQ)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

今天关于rocketmq netty底层设计rocketmq底层数据结构的讲解已经结束,谢谢您的阅读,如果想了解更多关于Apache RocketMQ RocketMQ是什么? 消息中间件、netty学习之五 netty在rocketmq中的使用、netty无缝切换rabbitmq、activemq、rocketmq实现聊天室单聊、群聊功能、RocketMQ Committer 蒋晓峰与 RocketMQ-Flink 的成长故事的相关知识,请在本站搜索。

本文标签:

上一篇CaffeMFC:caffe.pb.h(2525): error C2059: syntax error : ''constant''

下一篇Docker (四)-Dcoker 镜像(实用的docker镜像)