概述
在上一节 RocketMQ源码详解 | Producer篇 · 其一:Start,然后 Send 一条消息 中,我们相识了 Producer 在发送消息的流程。这次我们再来具体下看消息的构成与其发送的链路
Message
在 RocketMQ 的使用中,Message 类是在发送消息时必须用到的,此中 body 即是消息的存放位置,另有的就是消息的 标识(flag) 和 属性(properties)
public class Message { private String topic; private int flag; private Map properties; private byte[] body; private String transactionId;}消息的标识(flag)
变量名
| 含义
| COMPRESSED_FLAG
| 压缩消息。消息为批量的时间,就会举行压缩,默认使用5级的 zip
| MULTI_TAGS_FLAG
| 有多个 tag。
| TRANSACTION_NOT_TYPE
| 事务为未知状态。当 Broker 回查 Producer 的时间,假如为 Commit 应该提交,为 Rollback 应该回滚,为 Unknown 时应该继续回查
| TRANSACTION_PREPARED_TYPE
| 事务的运行状态。当前消息是事务的一部分
| TRANSACTION_COMMIT_TYPE
| 事务的提交消息。要求提交事务
| TRANSACTION_ROLLBACK_TYPE
| 事务的回滚消息。要求回滚事务
| BORNHOST_V6_FLAG
| 生成该消息的 host 是否 ipv6 的地点
| STOREHOSTADDRESS_V6_FLAG
| 长期化该消息的 host 是否是 ipv6 的地点
|
消息的属性(properties)
而消息的 properties 较多,只摘了一小段
属性
| 含义
| KEYS
| 消息的 Key。服务器会通过 key 设置索引,应用可以通过 Topic 和 Key 来查找这条消息以及被谁消耗
| TAGS
| 消息的子范例,可以根据 tag 选择性消耗
| DELAY
| 延迟消息的延迟级别(共16级,理论上可以有18级)
| RETRY_TOPIC
| 必要重试的 Topic(在 Broker 中会存放到 SCHEDULE_TOPIC_XXXX Topic,此中有 18 个 queue,对应 18 个重试延迟)
| REAL_TOPIC
| 真实的 Topic (RocketMQ 经常使用更换目的 Topic 的"把戏",如事务消息和延时消息,这个字段记录了真正的 Topic)
| PGROUP
| 生产者组
| MAX_OFFSET\MIN_OFFSET
| 在 pull 中的最大偏移量和最小偏移量
| TRANSFER_FLAG
| 事务有关标识
| MSG_TYPE
| 消息范例,是否为回复消息
| BUYER_ID
| 嗯...买家ID?
|
当然,这只是在生产者中的消息的样子,在 Broker 和消耗者的眼中中,它是这样的
public class MessageExt extends Message { private static final long serialVersionUID = 5720810158625748049L; private String brokerName; private int queueId; // 存盘的巨细 private int storeSize; // 在 ConsumerQueue 中的偏移量 private long queueOffset; private int sysFlag; // 消息创建时间 private long bornTimestamp; // 创建地点 private SocketAddress bornHost; // 存盘时间 private long storeTimestamp; private SocketAddress storeHost; private String msgId; // 在 commitLog 中的偏移量 private long commitLogOffset; // crc 校验 private int bodyCRC; // 消耗重试次数 private int reconsumeTimes; private long preparedTransactionOffset;}
消息的包装
那么,producer 生成了这样的消息后,会直接将其发出去吗?
让我们继续跟踪上一篇没讲完的内容
MQClientAPIImpl#sendMessage
long beginStartTime = System.currentTimeMillis();RemotingCommand request = null;String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);// 是否为 reply 消息boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);if (isReply) { // 是 smart 消息则加上请求头 if (sendSmartMsg) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader); }} else { if (sendSmartMsg || msg instanceof MessageBatch) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); }}request.setBody(msg.getBody()); /* -- pass -- */在这里,我们可以看到在这又加了层套娃(只保存了body),然后才发送
RemotingCommand 的具体属性如下
private int code;private LanguageCode language = LanguageCode.JAVA;private int version = 0;private int opaque = requestId.getAndIncrement();private int flag = 0;private String remark;private HashMap extFields;private transient CommandCustomHeader customHeader;private transient byte[] body;我们还在他的方法中找到了一个叫 encode 的方法,而且返回的是 ByteBuffer 。因此这就是实际发送的消息。
public ByteBuffer encode() { // 1> header length size int length = 4; // 2> header data length byte[] headerData = this.headerEncode(); length += headerData.length; // 3> body data length if (this.body != null) { length += body.length; } ByteBuffer result = ByteBuffer.allocate(4 + length); // length result.putInt(length); // header length result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); // body data; if (this.body != null) { result.put(this.body); } result.flip(); return result;}
消息的结构
具体的消息结构如下图:
此中每个字段在 Request 和 Response 中都有不同的含义
code
在 Request 中,为请求的操作码
public class RequestCode { // 发送消息 public static final int SEND_MESSAGE = 10; // 拉取消息 public static final int PULL_MESSAGE = 11; // 查询消息(所在topic, 必要的 key, 最大数量, 开始偏移量, 结束偏移量) public static final int QUERY_MESSAGE = 12; // 查询 Broker 偏移量(未使用) public static final int QUERY_BROKER_OFFSET = 13; /* * 查询消耗者偏移量 * 消耗者会将偏移量存储在内存中,当使用主从架构时,会默认由主 Broker 负责读于写 * 为制止消息堆积,堆积消息超过指定的值时,会由从服务器来接管读,但会导致消耗进度问题 * 所以主从消耗进度的同等性由 从服务器主动上报 和 消耗者内存进度优先 来保证 */ // 查询消耗者自己的偏移量 public static final int QUERY_CONSUMER_OFFSET = 14; // 提交自己的偏移量 public static final int UPDATE_CONSUMER_OFFSET = 15; // 创建或更新Topic public static final int UPDATE_AND_CREATE_TOPIC = 17; // 获取所有的Topic信息 public static final int GET_ALL_TOPIC_CONFIG = 21; /* unused */ public static final int GET_TOPIC_CONFIG_LIST = 22; public static final int GET_TOPIC_NAME_LIST = 23; // 更新 Broker 配置 public static final int UPDATE_BROKER_CONFIG = 25; // 获取 Broker 配置 public static final int GET_BROKER_CONFIG = 26; public static final int TRIGGER_DELETE_FILES = 27; // 获取 Broker 运行时信息 public static final int GET_BROKER_RUNTIME_INFO = 28; // 通过期间戳查找偏移量 public static final int SEARCH_OFFSET_BY_TIMESTAMP = 29; // 获取最大偏移量 public static final int GET_MAX_OFFSET = 30; // 获取最小偏移量 public static final int GET_MIN_OFFSET = 31; // public static final int GET_EARLIEST_MSG_STORETIME = 32; /* 由 Broker 处理 */ // 通过消息ID查询消息 public static final int VIEW_MESSAGE_BY_ID = 33; // 心跳消息 public static final int HEART_BEAT = 34; // 注销客户端 public static final int UNREGISTER_CLIENT = 35; // 陈诉消耗失败(一段时间后重试) (Deprecated) public static final int CONSUMER_SEND_MSG_BACK = 36; // 事务结果(大概是 commit 或 rollback) public static final int END_TRANSACTION = 37; // 通过消耗者组获取消耗者列表 public static final int GET_CONSUMER_LIST_BY_GROUP = 38; // 查抄事务状态; Broker对于事务的未知状态的回查操作 public static final int CHECK_TRANSACTION_STATE = 39; // 通知消耗者的ID已经被更改 public static final int NOTIFY_CONSUMER_IDS_CHANGED = 40; // 批量锁定 Queue (rebalance使用) public static final int LOCK_BATCH_MQ = 41; // 解锁 Queue public static final int UNLOCK_BATCH_MQ = 42; // 获得该 Broker 上的所有的消耗者偏移量 public static final int GET_ALL_CONSUMER_OFFSET = 43; // 获得延迟 Topic 上的偏移量 public static final int GET_ALL_DELAY_OFFSET = 45; // 查抄客户端配置 public static final int CHECK_CLIENT_CONFIG = 46; // 更新或创建 ACL public static final int UPDATE_AND_CREATE_ACL_CONFIG = 50; // 删除 ACL 配置 public static final int DELETE_ACL_CONFIG = 51; // 获取 Broker 集群的 ACL 信息 public static final int GET_BROKER_CLUSTER_ACL_INFO = 52; // 更新全局白名单 public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG = 53; // 获取 Broker 集群的 ACL 配置 public static final int GET_BROKER_CLUSTER_ACL_CONFIG = 54; /* NameServer 相关 */ // 放入键值配置 public static final int PUT_KV_CONFIG = 100; // 获取键值配置 public static final int GET_KV_CONFIG = 101; // 删除键值配置 public static final int DELETE_KV_CONFIG = 102; // 注册 Broker public static final int REGISTER_BROKER = 103; // 注销 Broker public static final int UNREGISTER_BROKER = 104; // 获取指定 Topic 的路由信息 public static final int GET_ROUTEINFO_BY_TOPIC = 105; // 获取 Broker 的集群信息 public static final int GET_BROKER_CLUSTER_INFO = 106; // 更新或创建订阅组 public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200; // 获取所有订阅组的配置 public static final int GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201; // 获取 Topic 的度量指标 public static final int GET_TOPIC_STATS_INFO = 202; // 获取消耗者在线列表(rpc) public static final int GET_CONSUMER_CONNECTION_LIST = 203; // 获取生产者在线列表 public static final int GET_PRODUCER_CONNECTION_LIST = 204; public static final int WIPE_WRITE_PERM_OF_BROKER = 205; // 从 NameSrv 获取所有 Topic public static final int GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206; // 删除订阅组 public static final int DELETE_SUBSCRIPTIONGROUP = 207; // 获取消耗者的度量指标 public static final int GET_CONSUME_STATS = 208; public static final int SUSPEND_CONSUMER = 209; public static final int RESUME_CONSUMER = 210; public static final int RESET_CONSUMER_OFFSET_IN_CONSUMER = 211; public static final int RESET_CONSUMER_OFFSET_IN_BROKER = 212; public static final int ADJUST_CONSUMER_THREAD_POOL = 213; public static final int WHO_CONSUME_THE_MESSAGE = 214; // 删除 Broker 中的 Topic public static final int DELETE_TOPIC_IN_BROKER = 215; // 删除 NameSrv 中的 Topic public static final int DELETE_TOPIC_IN_NAMESRV = 216; // 获取键值列表 public static final int GET_KVLIST_BY_NAMESPACE = 219; // 重置消耗者的消耗进度 public static final int RESET_CONSUMER_CLIENT_OFFSET = 220; // 从消耗者中获取消耗者的度量指标 public static final int GET_CONSUMER_STATUS_FROM_CLIENT = 221; // 让 Broker 重置消耗进度 public static final int INVOKE_BROKER_TO_RESET_OFFSET = 222; // 让 Broker 更新消耗者的度量信息 public static final int INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223; // 查询消息被谁消耗 public static final int QUERY_TOPIC_CONSUME_BY_WHO = 300; // 从集群中获取 Topic public static final int GET_TOPICS_BY_CLUSTER = 224; // 注册过滤器服务器 public static final int REGISTER_FILTER_SERVER = 301; // 注册消息过滤类 public static final int REGISTER_MESSAGE_FILTER_CLASS = 302; // 查询消耗时间 public static final int QUERY_CONSUME_TIME_SPAN = 303; // 从 NameSrv 中获取系统Topic public static final int GET_SYSTEM_TOPIC_LIST_FROM_NS = 304; // 从 Broker 中获取系统Topic public static final int GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305; // 清理过期的消耗队列 public static final int CLEAN_EXPIRED_CONSUMEQUEUE = 306; // 获取 Consumer 的运行时信息 public static final int GET_CONSUMER_RUNNING_INFO = 307; // 查询修正偏移量 public static final int QUERY_CORRECTION_OFFSET = 308; // 直接消耗消息 public static final int CONSUME_MESSAGE_DIRECTLY = 309; // 发送消息(v2),优化网络数据包 public static final int SEND_MESSAGE_V2 = 310; // 单元化相关 topic public static final int GET_UNIT_TOPIC_LIST = 311; // 获取含有单元化订阅组的 Topic 列表 public static final int GET_HAS_UNIT_SUB_TOPIC_LIST = 312; // 获取含有单元化订阅组的非单元化 Topic 列表 public static final int GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313; // 克隆消耗进度 public static final int CLONE_GROUP_OFFSET = 314; // 查询 Broker 上的度量信息 public static final int VIEW_BROKER_STATS_DATA = 315; // 清理未使用的 Topic public static final int CLEAN_UNUSED_TOPIC = 316; // 获取 broker 上的有关消耗的度量信息 public static final int GET_BROKER_CONSUME_STATS = 317; /* update the config of name server */ public static final int UPDATE_NAMESRV_CONFIG = 318; /* get config from name server */ public static final int GET_NAMESRV_CONFIG = 319; // 发送批量消息 public static final int SEND_BATCH_MESSAGE = 320; // 查询消耗的 Queue public static final int QUERY_CONSUME_QUEUE = 321; // 查询数据版本 public static final int QUERY_DATA_VERSION = 322; /* resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before */ public static final int RESUME_CHECK_HALF_MESSAGE = 323; // 回送消息 public static final int SEND_REPLY_MESSAGE = 324; public static final int SEND_REPLY_MESSAGE_V2 = 325; // push回送消息到客户端 public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;}
在 Response 中,为相应码
public class ResponseCode extends RemotingSysResponseCode { // 革新到磁盘超时 public static final int FLUSH_DISK_TIMEOUT = 10; // 从节点不可达 public static final int SLAVE_NOT_AVAILABLE = 11; // 从节点刷盘超时 public static final int FLUSH_SLAVE_TIMEOUT = 12; // 非法的消息结构 public static final int MESSAGE_ILLEGAL = 13; // 服务不可用 public static final int SERVICE_NOT_AVAILABLE = 14; // 版本不支持 public static final int VERSION_NOT_SUPPORTED = 15; // 未授权的 public static final int NO_PERMISSION = 16; // Topic 不存在 public static final int TOPIC_NOT_EXIST = 17; // Topic 已经存在 public static final int TOPIC_EXIST_ALREADY = 18; // 要拉取的偏移量不存在 public static final int PULL_NOT_FOUND = 19; // 立即重新拉取 public static final int PULL_RETRY_IMMEDIATELY = 20; // 重定向拉取的偏移量 public static final int PULL_OFFSET_MOVED = 21; // 不存在的队列 public static final int QUERY_NOT_FOUND = 22; // 订阅的 url 解析失败 public static final int SUBSCRIPTION_PARSE_FAILED = 23; // 目的订阅不存在 public static final int SUBSCRIPTION_NOT_EXIST = 24; // 订阅不是最新的 public static final int SUBSCRIPTION_NOT_LATEST = 25; // 订阅组不存在 public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26; // 订阅的数据不存在 (tag表达式异常) public static final int FILTER_DATA_NOT_EXIST = 27; // 该 Broker 上订阅的数据不是最新的 public static final int FILTER_DATA_NOT_LATEST = 28; // 事务应该提交 public static final int TRANSACTION_SHOULD_COMMIT = 200; // 事务应该回滚 public static final int TRANSACTION_SHOULD_ROLLBACK = 201; // 事务状态位置 public static final int TRANSACTION_STATE_UNKNOW = 202; // 事务状态Group错误 public static final int TRANSACTION_STATE_GROUP_WRONG = 203; // 买家ID不存在 public static final int NO_BUYER_ID = 204; public static final int NOT_IN_CURRENT_UNIT = 205; // 消耗者不在线(rpc) public static final int CONSUMER_NOT_ONLINE = 206; // 消耗超时 public static final int CONSUME_MSG_TIMEOUT = 207; // 消息不存在 public static final int NO_MESSAGE = 208; // 更新或创建 ACL 配置失败 public static final int UPDATE_AND_CREATE_ACL_CONFIG_FAILED = 209; // 删除 ACL 配置失败 public static final int DELETE_ACL_CONFIG_FAILED = 210; // 更新全局白名单地点失败 public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211; }
lang
字段为消息发起方编码语言,这里默认为 java
private LanguageCode language = LanguageCode.JAVA;version
消息发起方的程序版本
opaque
该字段是为了在同连续接上标识不同的请求,在相应的时间能够回调对应的函数( rocketmq 的发送使用了 TCP 连接复用)
remark
在 Reqeust 中,用于传输自定义文本
在 Response 中,用于传输错误的原因
ext
传输自定义的消息头
消息的发送
在知道消息长啥样后,就可以继续看发送代码了
switch (communicationMode) { case ONEWAY: this.remotingClient.invokeOneway(addr, request, timeoutMillis); return null; case ASYNC: final AtomicInteger times = new AtomicInteger(); long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeAsync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer); return null; case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeSync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); default: assert false; break;} return null;
NettyRemotingClient#invokeOneway
我们先来看最简单的Oneway
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { // 创建 Channel final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { doBeforeRpcHooks(addr, request); // 使用建立好的连接发送 this.invokeOnewayImpl(channel, request, timeoutMillis); } catch (RemotingSendRequestException e) { log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); }}以上可以大致抽象为两个操作:获取或建立TCP连接、通过连接发送数据,同时一旦发生异常则关闭连接
NettyRemotingClient#getAndCreateChannel
先看第一个操作
private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException { // 地点为空则说明要获取的是NameServer的地点 if (null == addr) { return getAndCreateNameserverChannel(); } // 尝试从缓存中获取 ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } // 没有或未就绪则新建连接 return this.createChannel(addr);}可以看出,这里是由一个 ChannelTable 来维护所有的连接,而 ChannelTable 又是由 NettyRemotingClient 维护,即其是在 JVM 上的全局共享实例。
然后再具体查看创建的方法,可以发现 Channel 终极是由客户端的 Bootstrap 异步创建
ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) { return cw.getChannel();} // 连接的建立是串行的if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { // 双重校验保证连接确实没被创建 boolean createNewConnection; cw = this.channelTables.get(addr); if (cw != null) { if (cw.isOK()) { // 连接建立完成 return cw.getChannel(); } else if (!cw.getChannelFuture().isDone()) { createNewConnection = false; } else { // 建立过但失败了 this.channelTables.remove(addr); createNewConnection = true; } } else { createNewConnection = true; } if (createNewConnection) { // 实际上的连接创建 ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr)); log.info("createChannel: begin to connect remote host[{}] asynchronously", addr); cw = new ChannelWrapper(channelFuture); this.channelTables.put(addr, cw); } } catch (Exception e) { log.error("createChannel: create channel exception", e); } finally { this.lockChannelTables.unlock(); }} else { log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);} if (cw != null) { ChannelFuture channelFuture = cw.getChannelFuture(); // 阻塞直到创建完成 if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { if (cw.isOK()) { log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); return cw.getChannel(); } else { log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause()); } } else { log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), channelFuture.toString()); }} return null;
NettyRemotingAbstract#invokeOnewayImpl
然后接着看第二个操作:通过连接发送数据
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { // 在请求头上的 flag 标记为 oneway 请求 request.markOnewayRPC(); // 获取信号量,保证不会系统不会蒙受过多请求 boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { // 真正发送完成后,释放锁 once.release(); if (!f.isSuccess()) { log.warn("send a request command to channel failed."); } } }); } catch (Exception e) { once.release(); log.warn("write send a request command to channel failed."); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { // 超出请求数 if (timeoutMillis |