RocketMQ源码详解 | Producer篇:消息构成、发送链路
概述在上一节 RocketMQ源码详解 | Producer篇 · 其一:Start,然后 Send 一条消息 中,我们相识了 Producer 在发送消息的流程。这次我们再来具体下看消息的构成与其发送的链路
Message
在 RocketMQ 的使用中,Message 类是在发送消息时必须用到的,此中 body 即是消息的存放位置,另有的就是消息的 标识(flag) 和 属性(properties)
public class Message {private String topic;private int flag;private Map properties;private byte[] body;private String transactionId;}消息的标识(flag)
变量名
含义
COMPRESSED_FLAG
压缩消息。消息为批量的时间,就会举行压缩,默认使用5级的 zip
MULTI_TAGS_FLAG
有多个 tag。
TRANSACTION_NOT_TYPE
事务为未知状态。当 Broker 回查 Producer 的时间,假如为 Commit 应该提交,为 Rollback 应该回滚,为 Unknown 时应该继续回查
TRANSACTION_PREPARED_TYPE
事务的运行状态。当前消息是事务的一部分
TRANSACTION_COMMIT_TYPE
事务的提交消息。要求提交事务
TRANSACTION_ROLLBACK_TYPE
事务的回滚消息。要求回滚事务
BORNHOST_V6_FLAG
生成该消息的 host 是否 ipv6 的地点
STOREHOSTADDRESS_V6_FLAG
长期化该消息的 host 是否是 ipv6 的地点
消息的属性(properties)
而消息的 properties 较多,只摘了一小段
属性
含义
KEYS
消息的 Key。服务器会通过 key 设置索引,应用可以通过 Topic 和 Key 来查找这条消息以及被谁消耗
TAGS
消息的子范例,可以根据 tag 选择性消耗
DELAY
延迟消息的延迟级别(共16级,理论上可以有18级)
RETRY_TOPIC
必要重试的 Topic(在 Broker 中会存放到 SCHEDULE_TOPIC_XXXX Topic,此中有 18 个 queue,对应 18 个重试延迟)
REAL_TOPIC
真实的 Topic (RocketMQ 经常使用更换目的 Topic 的"把戏",如事务消息和延时消息,这个字段记录了真正的 Topic)
PGROUP
生产者组
MAX_OFFSET\MIN_OFFSET
在 pull 中的最大偏移量和最小偏移量
TRANSFER_FLAG
事务有关标识
MSG_TYPE
消息范例,是否为回复消息
BUYER_ID
嗯...买家ID?
当然,这只是在生产者中的消息的样子,在 Broker 和消耗者的眼中中,它是这样的
public class MessageExt extends Message {private static final long serialVersionUID = 5720810158625748049L; private String brokerName; private int queueId; // 存盘的巨细private int storeSize; // 在 ConsumerQueue 中的偏移量private long queueOffset;private int sysFlag;// 消息创建时间private long bornTimestamp;// 创建地点private SocketAddress bornHost; // 存盘时间private long storeTimestamp;private SocketAddress storeHost;private String msgId;// 在 commitLog 中的偏移量private long commitLogOffset;// crc 校验private int bodyCRC;// 消耗重试次数private int reconsumeTimes; private long preparedTransactionOffset;}
消息的包装
那么,producer 生成了这样的消息后,会直接将其发出去吗?
让我们继续跟踪上一篇没讲完的内容
MQClientAPIImpl#sendMessage
long beginStartTime = System.currentTimeMillis();RemotingCommand request = null;String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);// 是否为 reply 消息boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);if (isReply) {// 是 smart 消息则加上请求头if (sendSmartMsg) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);} else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);}} else {if (sendSmartMsg || msg instanceof MessageBatch) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);} else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);}}request.setBody(msg.getBody()); /* -- pass -- */在这里,我们可以看到在这又加了层套娃(只保存了body),然后才发送
RemotingCommand 的具体属性如下
private int code;private LanguageCode language = LanguageCode.JAVA;private int version = 0;private int opaque = requestId.getAndIncrement();private int flag = 0;private String remark;private HashMap extFields;private transient CommandCustomHeader customHeader;private transient byte[] body;我们还在他的方法中找到了一个叫 encode 的方法,而且返回的是 ByteBuffer 。因此这就是实际发送的消息。
public ByteBuffer encode() {// 1> header length sizeint length = 4; // 2> header data lengthbyte[] headerData = this.headerEncode();length += headerData.length; // 3> body data lengthif (this.body != null) { length += body.length;} ByteBuffer result = ByteBuffer.allocate(4 + length); // lengthresult.putInt(length); // header lengthresult.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header dataresult.put(headerData); // body data;if (this.body != null) { result.put(this.body);} result.flip(); return result;}
消息的结构
具体的消息结构如下图:
https://p9.toutiaoimg.com/large/pgc-image/6b50238a77c949019427d609a75bffc7
此中每个字段在 Request 和 Response 中都有不同的含义
code
在 Request 中,为请求的操作码
public class RequestCode {// 发送消息public static final int SEND_MESSAGE = 10;// 拉取消息public static final int PULL_MESSAGE = 11;// 查询消息(所在topic, 必要的 key, 最大数量, 开始偏移量, 结束偏移量)public static final int QUERY_MESSAGE = 12;// 查询 Broker 偏移量(未使用)public static final int QUERY_BROKER_OFFSET = 13;/* * 查询消耗者偏移量 * 消耗者会将偏移量存储在内存中,当使用主从架构时,会默认由主 Broker 负责读于写 * 为制止消息堆积,堆积消息超过指定的值时,会由从服务器来接管读,但会导致消耗进度问题 * 所以主从消耗进度的同等性由 从服务器主动上报 和 消耗者内存进度优先 来保证 */// 查询消耗者自己的偏移量public static final int QUERY_CONSUMER_OFFSET = 14;// 提交自己的偏移量public static final int UPDATE_CONSUMER_OFFSET = 15;// 创建或更新Topic public static final int UPDATE_AND_CREATE_TOPIC = 17;// 获取所有的Topic信息public static final int GET_ALL_TOPIC_CONFIG = 21;/* unused*/public static final int GET_TOPIC_CONFIG_LIST = 22;public static final int GET_TOPIC_NAME_LIST = 23;// 更新 Broker 配置public static final int UPDATE_BROKER_CONFIG = 25;// 获取 Broker 配置public static final int GET_BROKER_CONFIG = 26;public static final int TRIGGER_DELETE_FILES = 27;// 获取 Broker 运行时信息public static final int GET_BROKER_RUNTIME_INFO = 28;// 通过期间戳查找偏移量public static final int SEARCH_OFFSET_BY_TIMESTAMP = 29;// 获取最大偏移量public static final int GET_MAX_OFFSET = 30;// 获取最小偏移量public static final int GET_MIN_OFFSET = 31;//public static final int GET_EARLIEST_MSG_STORETIME = 32; /* 由 Broker 处理 */// 通过消息ID查询消息public static final int VIEW_MESSAGE_BY_ID = 33;// 心跳消息public static final int HEART_BEAT = 34;// 注销客户端public static final int UNREGISTER_CLIENT = 35;// 陈诉消耗失败(一段时间后重试) (Deprecated)public static final int CONSUMER_SEND_MSG_BACK = 36;// 事务结果(大概是 commit 或 rollback)public static final int END_TRANSACTION = 37;// 通过消耗者组获取消耗者列表public static final int GET_CONSUMER_LIST_BY_GROUP = 38; // 查抄事务状态; Broker对于事务的未知状态的回查操作public static final int CHECK_TRANSACTION_STATE = 39;// 通知消耗者的ID已经被更改public static final int NOTIFY_CONSUMER_IDS_CHANGED = 40; // 批量锁定 Queue (rebalance使用)public static final int LOCK_BATCH_MQ = 41;// 解锁 Queuepublic static final int UNLOCK_BATCH_MQ = 42;// 获得该 Broker 上的所有的消耗者偏移量public static final int GET_ALL_CONSUMER_OFFSET = 43;// 获得延迟 Topic 上的偏移量public static final int GET_ALL_DELAY_OFFSET = 45;// 查抄客户端配置public static final int CHECK_CLIENT_CONFIG = 46;// 更新或创建 ACLpublic static final int UPDATE_AND_CREATE_ACL_CONFIG = 50;// 删除 ACL 配置public static final int DELETE_ACL_CONFIG = 51;// 获取 Broker 集群的 ACL 信息public static final int GET_BROKER_CLUSTER_ACL_INFO = 52;// 更新全局白名单public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG = 53;// 获取 Broker 集群的 ACL 配置public static final int GET_BROKER_CLUSTER_ACL_CONFIG = 54; /* NameServer 相关 */ // 放入键值配置public static final int PUT_KV_CONFIG = 100;// 获取键值配置public static final int GET_KV_CONFIG = 101;// 删除键值配置public static final int DELETE_KV_CONFIG = 102;// 注册 Brokerpublic static final int REGISTER_BROKER = 103;// 注销 Brokerpublic static final int UNREGISTER_BROKER = 104;// 获取指定 Topic 的路由信息public static final int GET_ROUTEINFO_BY_TOPIC = 105;// 获取 Broker 的集群信息public static final int GET_BROKER_CLUSTER_INFO = 106; // 更新或创建订阅组public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200;// 获取所有订阅组的配置public static final int GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201;// 获取 Topic 的度量指标public static final int GET_TOPIC_STATS_INFO = 202;// 获取消耗者在线列表(rpc)public static final int GET_CONSUMER_CONNECTION_LIST = 203;// 获取生产者在线列表public static final int GET_PRODUCER_CONNECTION_LIST = 204;public static final int WIPE_WRITE_PERM_OF_BROKER = 205;// 从 NameSrv 获取所有 Topicpublic static final int GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206;// 删除订阅组public static final int DELETE_SUBSCRIPTIONGROUP = 207;// 获取消耗者的度量指标public static final int GET_CONSUME_STATS = 208; public static final int SUSPEND_CONSUMER = 209;public static final int RESUME_CONSUMER = 210;public static final int RESET_CONSUMER_OFFSET_IN_CONSUMER = 211;public static final int RESET_CONSUMER_OFFSET_IN_BROKER = 212; public static final int ADJUST_CONSUMER_THREAD_POOL = 213; public static final int WHO_CONSUME_THE_MESSAGE = 214; // 删除 Broker 中的 Topicpublic static final int DELETE_TOPIC_IN_BROKER = 215;// 删除 NameSrv 中的 Topicpublic static final int DELETE_TOPIC_IN_NAMESRV = 216;// 获取键值列表public static final int GET_KVLIST_BY_NAMESPACE = 219; // 重置消耗者的消耗进度public static final int RESET_CONSUMER_CLIENT_OFFSET = 220;// 从消耗者中获取消耗者的度量指标public static final int GET_CONSUMER_STATUS_FROM_CLIENT = 221;// 让 Broker 重置消耗进度public static final int INVOKE_BROKER_TO_RESET_OFFSET = 222;// 让 Broker 更新消耗者的度量信息public static final int INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223;// 查询消息被谁消耗public static final int QUERY_TOPIC_CONSUME_BY_WHO = 300;// 从集群中获取 Topicpublic static final int GET_TOPICS_BY_CLUSTER = 224;// 注册过滤器服务器public static final int REGISTER_FILTER_SERVER = 301;// 注册消息过滤类public static final int REGISTER_MESSAGE_FILTER_CLASS = 302;// 查询消耗时间public static final int QUERY_CONSUME_TIME_SPAN = 303;// 从 NameSrv 中获取系统Topicpublic static final int GET_SYSTEM_TOPIC_LIST_FROM_NS = 304;// 从 Broker 中获取系统Topicpublic static final int GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305;// 清理过期的消耗队列public static final int CLEAN_EXPIRED_CONSUMEQUEUE = 306;// 获取 Consumer 的运行时信息public static final int GET_CONSUMER_RUNNING_INFO = 307;// 查询修正偏移量public static final int QUERY_CORRECTION_OFFSET = 308;// 直接消耗消息public static final int CONSUME_MESSAGE_DIRECTLY = 309;// 发送消息(v2),优化网络数据包public static final int SEND_MESSAGE_V2 = 310; // 单元化相关 topicpublic static final int GET_UNIT_TOPIC_LIST = 311;// 获取含有单元化订阅组的 Topic 列表public static final int GET_HAS_UNIT_SUB_TOPIC_LIST = 312;// 获取含有单元化订阅组的非单元化 Topic 列表public static final int GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313;// 克隆消耗进度public static final int CLONE_GROUP_OFFSET = 314;// 查询 Broker 上的度量信息public static final int VIEW_BROKER_STATS_DATA = 315; // 清理未使用的 Topicpublic static final int CLEAN_UNUSED_TOPIC = 316;// 获取 broker 上的有关消耗的度量信息public static final int GET_BROKER_CONSUME_STATS = 317; /* update the config of name server */public static final int UPDATE_NAMESRV_CONFIG = 318;/* get config from name server */public static final int GET_NAMESRV_CONFIG = 319; // 发送批量消息public static final int SEND_BATCH_MESSAGE = 320;// 查询消耗的 Queuepublic static final int QUERY_CONSUME_QUEUE = 321;// 查询数据版本public static final int QUERY_DATA_VERSION = 322; /* resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before */public static final int RESUME_CHECK_HALF_MESSAGE = 323;// 回送消息public static final int SEND_REPLY_MESSAGE = 324;public static final int SEND_REPLY_MESSAGE_V2 = 325;// push回送消息到客户端public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;}
在 Response 中,为相应码
public class ResponseCode extends RemotingSysResponseCode {// 革新到磁盘超时public static final int FLUSH_DISK_TIMEOUT = 10;// 从节点不可达public static final int SLAVE_NOT_AVAILABLE = 11;// 从节点刷盘超时public static final int FLUSH_SLAVE_TIMEOUT = 12;// 非法的消息结构public static final int MESSAGE_ILLEGAL = 13;// 服务不可用public static final int SERVICE_NOT_AVAILABLE = 14;// 版本不支持public static final int VERSION_NOT_SUPPORTED = 15;// 未授权的public static final int NO_PERMISSION = 16;// Topic 不存在public static final int TOPIC_NOT_EXIST = 17;// Topic 已经存在public static final int TOPIC_EXIST_ALREADY = 18;// 要拉取的偏移量不存在public static final int PULL_NOT_FOUND = 19;// 立即重新拉取public static final int PULL_RETRY_IMMEDIATELY = 20;// 重定向拉取的偏移量public static final int PULL_OFFSET_MOVED = 21;// 不存在的队列public static final int QUERY_NOT_FOUND = 22;// 订阅的 url 解析失败public static final int SUBSCRIPTION_PARSE_FAILED = 23;// 目的订阅不存在public static final int SUBSCRIPTION_NOT_EXIST = 24;// 订阅不是最新的public static final int SUBSCRIPTION_NOT_LATEST = 25;// 订阅组不存在public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26;// 订阅的数据不存在 (tag表达式异常)public static final int FILTER_DATA_NOT_EXIST = 27;// 该 Broker 上订阅的数据不是最新的public static final int FILTER_DATA_NOT_LATEST = 28; // 事务应该提交public static final int TRANSACTION_SHOULD_COMMIT = 200;// 事务应该回滚public static final int TRANSACTION_SHOULD_ROLLBACK = 201;// 事务状态位置public static final int TRANSACTION_STATE_UNKNOW = 202;// 事务状态Group错误public static final int TRANSACTION_STATE_GROUP_WRONG = 203; // 买家ID不存在public static final int NO_BUYER_ID = 204;public static final int NOT_IN_CURRENT_UNIT = 205;// 消耗者不在线(rpc)public static final int CONSUMER_NOT_ONLINE = 206;// 消耗超时public static final int CONSUME_MSG_TIMEOUT = 207;// 消息不存在public static final int NO_MESSAGE = 208;// 更新或创建 ACL 配置失败public static final int UPDATE_AND_CREATE_ACL_CONFIG_FAILED = 209;// 删除 ACL 配置失败public static final int DELETE_ACL_CONFIG_FAILED = 210;// 更新全局白名单地点失败public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211; }
lang
字段为消息发起方编码语言,这里默认为 java
private LanguageCode language = LanguageCode.JAVA;version
消息发起方的程序版本
opaque
该字段是为了在同连续接上标识不同的请求,在相应的时间能够回调对应的函数( rocketmq 的发送使用了 TCP 连接复用)
remark
在 Reqeust 中,用于传输自定义文本
在 Response 中,用于传输错误的原因
ext
传输自定义的消息头
消息的发送
在知道消息长啥样后,就可以继续看发送代码了
switch (communicationMode) {case ONEWAY: this.remotingClient.invokeOneway(addr, request, timeoutMillis); return null;case ASYNC: final AtomicInteger times = new AtomicInteger(); long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeAsync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer); return null;case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeSync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);default: assert false; break;} return null;
NettyRemotingClient#invokeOneway
我们先来看最简单的Oneway
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {// 创建 Channelfinal Channel channel = this.getAndCreateChannel(addr);if (channel != null && channel.isActive()) { try { doBeforeRpcHooks(addr, request); // 使用建立好的连接发送 this.invokeOnewayImpl(channel, request, timeoutMillis); } catch (RemotingSendRequestException e) { log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; }} else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr);}}以上可以大致抽象为两个操作:获取或建立TCP连接、通过连接发送数据,同时一旦发生异常则关闭连接
NettyRemotingClient#getAndCreateChannel
先看第一个操作
private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {// 地点为空则说明要获取的是NameServer的地点if (null == addr) { return getAndCreateNameserverChannel();} // 尝试从缓存中获取ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) { return cw.getChannel();} // 没有或未就绪则新建连接return this.createChannel(addr);}可以看出,这里是由一个 ChannelTable 来维护所有的连接,而 ChannelTable 又是由 NettyRemotingClient 维护,即其是在 JVM 上的全局共享实例。
然后再具体查看创建的方法,可以发现 Channel 终极是由客户端的 Bootstrap 异步创建
ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {return cw.getChannel();} // 连接的建立是串行的if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try { // 双重校验保证连接确实没被创建 boolean createNewConnection; cw = this.channelTables.get(addr); if (cw != null) { if (cw.isOK()) { // 连接建立完成 return cw.getChannel(); } else if (!cw.getChannelFuture().isDone()) { createNewConnection = false; } else { // 建立过但失败了 this.channelTables.remove(addr); createNewConnection = true; } } else { createNewConnection = true; } if (createNewConnection) { // 实际上的连接创建 ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr)); log.info("createChannel: begin to connect remote host[{}] asynchronously", addr); cw = new ChannelWrapper(channelFuture); this.channelTables.put(addr, cw); }} catch (Exception e) { log.error("createChannel: create channel exception", e);} finally { this.lockChannelTables.unlock();}} else {log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);} if (cw != null) {ChannelFuture channelFuture = cw.getChannelFuture();// 阻塞直到创建完成if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { if (cw.isOK()) { log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); return cw.getChannel(); } else { log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause()); }} else { log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), channelFuture.toString());}} return null;
NettyRemotingAbstract#invokeOnewayImpl
然后接着看第二个操作:通过连接发送数据
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {// 在请求头上的 flag 标记为 oneway 请求request.markOnewayRPC();// 获取信号量,保证不会系统不会蒙受过多请求boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { // 真正发送完成后,释放锁 once.release(); if (!f.isSuccess()) { log.warn("send a request command to channelfailed."); } } }); } catch (Exception e) { once.release(); log.warn("write send a request command to channelfailed."); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); }} else { // 超出请求数 if (timeoutMillis
页:
[1]