Netty协议-Rocket MQ之NettyRemotingClient/Server
小编:管理员 272阅读 2022.07.29
RocketMQ性能强劲依赖于Netty通讯协议和特定的通讯协议。
通信流程
NettyRemotingAbstract类的内部有一个NettyServerHandler内部类,该类中封装了客户端与服务端交互的基础代码。
processMessageReceived根据cmd类型处理msg信息
@ChannelHandler.Sharable class NettyServerHandler extends SimpleChannelInboundHandler复制{ @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } } public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } }
processResponseCommand
/** * Process incoming request command issued by remote peer. * * @param ctx channel handler context. * @param cmd request command. */ public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { final Pair复制matched = this.processorTable.get(cmd.getCode()); final Pair pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { //解析远程(服务端)channel String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); //RPC调用前执行钩子函数 doBeforeRpcHooks(remoteAddr, cmd); final RemotingResponseCallback callback = new RemotingResponseCallback() { @Override public void callback(RemotingCommand response) { doAfterRpcHooks(remoteAddr, cmd, response); //判断是否为单向通信 if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { //消息追加到内存后刷入硬盘中 ctx.writeAndFlush(response); } catch (Throwable e) { } } else { } } } }; if (pair.getObject1() instanceof AsyncNettyRequestProcessor) { AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1(); //异步处理请求 processor.asyncProcessRequest(ctx, cmd, callback); } else { NettyRequestProcessor processor = pair.getObject1(); RemotingCommand response = processor.processRequest(ctx, cmd); callback.callback(response); } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; try { final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); } if (!cmd.isOnewayRPC()) { //非oneWay方式消息发送,构造发送消息响应信息 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); ... }
processResponseCommand
/** * Process response from remote peer to the previous issued requests. * * @param ctx channel handler context. * @param cmd response command instance. */ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { //opaque被赋值为requestId final int opaque = cmd.getOpaque(); //从map缓存获取正在进行的其中一个请求 final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); //移除本次请求 responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { //执行回调(排序、执行完成) executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }复制
protected final HashMap复制> processorTable = new HashMap >(64); protected final ConcurrentMap responseTable = new ConcurrentHashMap (256);
processorTable :记录全部请求、以及处理器。 responseTable记录所有响应。
更清晰的文件请下载: 点我下载
通讯协议
RocketMQ自定义的私有协议栈都是基于TCP/IP协议,使用Netty的NIO TCP协议栈进行私有协议栈的定制和开发。
RocketMQ协议共分为四个部分:
- Header data:协议头,数据是序列化【fastjosn】后的json,json的每个key字段都是固定的,
- body data:请求的二进制实际数据,例如发送消息的网络请求中,Body传输实际的消息内容。
- Length:消息总长度
- Header length:序列化类型&消息头长度,第一个字节表示序列化类型,后面三个自己表示消息头长度。
RemotingCommand类封装了通讯消息、编码、解码方式,这些组成了RocketMQ的通讯协议。
//根据自定义协议解析消息头、消息体 public static RemotingCommand decode(final ByteBuffer byteBuffer) throws RemotingCommandException { int length = byteBuffer.limit(); int oriHeaderLen = byteBuffer.getInt(); //获取头部报文长度 int headerLength = getHeaderLength(oriHeaderLen); //获取头部报文数据 byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); //反序列化解析Header data和RemotingCommand类 RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); //获取body长度 int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { //获取Body数据 bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; } //根据自定义协议编码消息头、消息体 public ByteBuffer encode() { // 1> header length size int length = 4; // 2> header data length byte[] headerData = this.headerEncode(); length += headerData.length; // 3> body data length if (this.body != null) { length += body.length; } ByteBuffer result = ByteBuffer.allocate(4 + length); // length result.putInt(length); // header length result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); // body data; if (this.body != null) { result.put(this.body); } result.flip(); return result; }复制
相关推荐
- 【RocketMQ系列】RocketMQ集群,RocketMQ-on-DLedger集群 本文RocketMQ系列第四篇,主要介绍RocketMQ集群及如何部署自动容灾切换的 RocketMQ-on-DLedger Group。RocketMQ集群搭建ROcketMQ集群搭建有以下几种方案:「单Master模式」「多Master模式」「多Master多Slave模式-异步复制」「多Master多Slave模式-同步双写」其…
- 3DMAX提示和技巧 本主题标识使用 Civil View 的一些重要提示和技巧。常规使用屏幕分辨率至少为 1280x1024 的 Civil View。低于此分辨率时,一些面板将占用过多屏幕空间。 将视口设置为线框显示以达到最佳性能。 要尽可能简化用户界面,请在单个视口中工作并关闭 3ds Max 命令面…