小心程序猿QAQ 发表于 2021-10-28 21:48:02

RocketMQ源码详解 | Producer篇:消息构成、发送链路

概述

在上一节 RocketMQ源码详解 | Producer篇 · 其一:Start,然后 Send 一条消息 中,我们相识了 Producer 在发送消息的流程。这次我们再来具体下看消息的构成与其发送的链路
Message

在 RocketMQ 的使用中,Message 类是在发送消息时必须用到的,此中 body 即是消息的存放位置,另有的就是消息的 标识(flag) 和 属性(properties)
public class Message {private String topic;private int flag;private Map properties;private byte[] body;private String transactionId;}消息的标识(flag)

变量名
含义
COMPRESSED_FLAG
压缩消息。消息为批量的时间,就会举行压缩,默认使用5级的 zip
MULTI_TAGS_FLAG
有多个 tag。
TRANSACTION_NOT_TYPE
事务为未知状态。当 Broker 回查 Producer 的时间,假如为 Commit 应该提交,为 Rollback 应该回滚,为 Unknown 时应该继续回查
TRANSACTION_PREPARED_TYPE
事务的运行状态。当前消息是事务的一部分
TRANSACTION_COMMIT_TYPE
事务的提交消息。要求提交事务
TRANSACTION_ROLLBACK_TYPE
事务的回滚消息。要求回滚事务
BORNHOST_V6_FLAG
生成该消息的 host 是否 ipv6 的地点
STOREHOSTADDRESS_V6_FLAG
长期化该消息的 host 是否是 ipv6 的地点



消息的属性(properties)

而消息的 properties 较多,只摘了一小段
属性
含义
KEYS
消息的 Key。服务器会通过 key 设置索引,应用可以通过 Topic 和 Key 来查找这条消息以及被谁消耗
TAGS
消息的子范例,可以根据 tag 选择性消耗
DELAY
延迟消息的延迟级别(共16级,理论上可以有18级)
RETRY_TOPIC
必要重试的 Topic(在 Broker 中会存放到 SCHEDULE_TOPIC_XXXX Topic,此中有 18 个 queue,对应 18 个重试延迟)
REAL_TOPIC
真实的 Topic (RocketMQ 经常使用更换目的 Topic 的"把戏",如事务消息和延时消息,这个字段记录了真正的 Topic)
PGROUP
生产者组
MAX_OFFSET\MIN_OFFSET
在 pull 中的最大偏移量和最小偏移量
TRANSFER_FLAG
事务有关标识
MSG_TYPE
消息范例,是否为回复消息
BUYER_ID
嗯...买家ID?



当然,这只是在生产者中的消息的样子,在 Broker 和消耗者的眼中中,它是这样的
public class MessageExt extends Message {private static final long serialVersionUID = 5720810158625748049L;   private String brokerName;   private int queueId;   // 存盘的巨细private int storeSize;   // 在 ConsumerQueue 中的偏移量private long queueOffset;private int sysFlag;// 消息创建时间private long bornTimestamp;// 创建地点private SocketAddress bornHost;   // 存盘时间private long storeTimestamp;private SocketAddress storeHost;private String msgId;// 在 commitLog 中的偏移量private long commitLogOffset;// crc 校验private int bodyCRC;// 消耗重试次数private int reconsumeTimes;   private long preparedTransactionOffset;}


消息的包装

那么,producer 生成了这样的消息后,会直接将其发出去吗?
让我们继续跟踪上一篇没讲完的内容



MQClientAPIImpl#sendMessage

long beginStartTime = System.currentTimeMillis();RemotingCommand request = null;String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);// 是否为 reply 消息boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);if (isReply) {// 是 smart 消息则加上请求头if (sendSmartMsg) {    SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);    request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);} else {    request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);}} else {if (sendSmartMsg || msg instanceof MessageBatch) {    SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);    request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);} else {    request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);}}request.setBody(msg.getBody()); /* -- pass -- */在这里,我们可以看到在这又加了层套娃(只保存了body),然后才发送
RemotingCommand 的具体属性如下
private int code;private LanguageCode language = LanguageCode.JAVA;private int version = 0;private int opaque = requestId.getAndIncrement();private int flag = 0;private String remark;private HashMap extFields;private transient CommandCustomHeader customHeader;private transient byte[] body;我们还在他的方法中找到了一个叫 encode 的方法,而且返回的是 ByteBuffer 。因此这就是实际发送的消息。
public ByteBuffer encode() {// 1> header length sizeint length = 4;   // 2> header data lengthbyte[] headerData = this.headerEncode();length += headerData.length;   // 3> body data lengthif (this.body != null) {    length += body.length;}   ByteBuffer result = ByteBuffer.allocate(4 + length);   // lengthresult.putInt(length);   // header lengthresult.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));   // header dataresult.put(headerData);   // body data;if (this.body != null) {    result.put(this.body);}   result.flip();   return result;}


消息的结构

具体的消息结构如下图:
https://p9.toutiaoimg.com/large/pgc-image/6b50238a77c949019427d609a75bffc7
此中每个字段在 Request 和 Response 中都有不同的含义
code

在 Request 中,为请求的操作码
public class RequestCode {// 发送消息public static final int SEND_MESSAGE = 10;// 拉取消息public static final int PULL_MESSAGE = 11;// 查询消息(所在topic, 必要的 key, 最大数量, 开始偏移量, 结束偏移量)public static final int QUERY_MESSAGE = 12;// 查询 Broker 偏移量(未使用)public static final int QUERY_BROKER_OFFSET = 13;/*   * 查询消耗者偏移量   *      消耗者会将偏移量存储在内存中,当使用主从架构时,会默认由主 Broker 负责读于写   *      为制止消息堆积,堆积消息超过指定的值时,会由从服务器来接管读,但会导致消耗进度问题   *      所以主从消耗进度的同等性由 从服务器主动上报 和 消耗者内存进度优先 来保证   */// 查询消耗者自己的偏移量public static final int QUERY_CONSUMER_OFFSET = 14;// 提交自己的偏移量public static final int UPDATE_CONSUMER_OFFSET = 15;// 创建或更新Topic    public static final int UPDATE_AND_CREATE_TOPIC = 17;// 获取所有的Topic信息public static final int GET_ALL_TOPIC_CONFIG = 21;/*   unused*/public static final int GET_TOPIC_CONFIG_LIST = 22;public static final int GET_TOPIC_NAME_LIST = 23;// 更新 Broker 配置public static final int UPDATE_BROKER_CONFIG = 25;// 获取 Broker 配置public static final int GET_BROKER_CONFIG = 26;public static final int TRIGGER_DELETE_FILES = 27;// 获取 Broker 运行时信息public static final int GET_BROKER_RUNTIME_INFO = 28;// 通过期间戳查找偏移量public static final int SEARCH_OFFSET_BY_TIMESTAMP = 29;// 获取最大偏移量public static final int GET_MAX_OFFSET = 30;// 获取最小偏移量public static final int GET_MIN_OFFSET = 31;//public static final int GET_EARLIEST_MSG_STORETIME = 32;   /*       由 Broker 处理      */// 通过消息ID查询消息public static final int VIEW_MESSAGE_BY_ID = 33;// 心跳消息public static final int HEART_BEAT = 34;// 注销客户端public static final int UNREGISTER_CLIENT = 35;// 陈诉消耗失败(一段时间后重试) (Deprecated)public static final int CONSUMER_SEND_MSG_BACK = 36;// 事务结果(大概是 commit 或 rollback)public static final int END_TRANSACTION = 37;// 通过消耗者组获取消耗者列表public static final int GET_CONSUMER_LIST_BY_GROUP = 38;   // 查抄事务状态; Broker对于事务的未知状态的回查操作public static final int CHECK_TRANSACTION_STATE = 39;// 通知消耗者的ID已经被更改public static final int NOTIFY_CONSUMER_IDS_CHANGED = 40;   // 批量锁定 Queue (rebalance使用)public static final int LOCK_BATCH_MQ = 41;// 解锁 Queuepublic static final int UNLOCK_BATCH_MQ = 42;// 获得该 Broker 上的所有的消耗者偏移量public static final int GET_ALL_CONSUMER_OFFSET = 43;// 获得延迟 Topic 上的偏移量public static final int GET_ALL_DELAY_OFFSET = 45;// 查抄客户端配置public static final int CHECK_CLIENT_CONFIG = 46;// 更新或创建 ACLpublic static final int UPDATE_AND_CREATE_ACL_CONFIG = 50;// 删除 ACL 配置public static final int DELETE_ACL_CONFIG = 51;// 获取 Broker 集群的 ACL 信息public static final int GET_BROKER_CLUSTER_ACL_INFO = 52;// 更新全局白名单public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG = 53;// 获取 Broker 集群的 ACL 配置public static final int GET_BROKER_CLUSTER_ACL_CONFIG = 54;    /*      NameServer 相关      */   // 放入键值配置public static final int PUT_KV_CONFIG = 100;// 获取键值配置public static final int GET_KV_CONFIG = 101;// 删除键值配置public static final int DELETE_KV_CONFIG = 102;// 注册 Brokerpublic static final int REGISTER_BROKER = 103;// 注销 Brokerpublic static final int UNREGISTER_BROKER = 104;// 获取指定 Topic 的路由信息public static final int GET_ROUTEINFO_BY_TOPIC = 105;// 获取 Broker 的集群信息public static final int GET_BROKER_CLUSTER_INFO = 106;   // 更新或创建订阅组public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200;// 获取所有订阅组的配置public static final int GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201;// 获取 Topic 的度量指标public static final int GET_TOPIC_STATS_INFO = 202;// 获取消耗者在线列表(rpc)public static final int GET_CONSUMER_CONNECTION_LIST = 203;// 获取生产者在线列表public static final int GET_PRODUCER_CONNECTION_LIST = 204;public static final int WIPE_WRITE_PERM_OF_BROKER = 205;// 从 NameSrv 获取所有 Topicpublic static final int GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206;// 删除订阅组public static final int DELETE_SUBSCRIPTIONGROUP = 207;// 获取消耗者的度量指标public static final int GET_CONSUME_STATS = 208;   public static final int SUSPEND_CONSUMER = 209;public static final int RESUME_CONSUMER = 210;public static final int RESET_CONSUMER_OFFSET_IN_CONSUMER = 211;public static final int RESET_CONSUMER_OFFSET_IN_BROKER = 212;   public static final int ADJUST_CONSUMER_THREAD_POOL = 213;   public static final int WHO_CONSUME_THE_MESSAGE = 214;   // 删除 Broker 中的 Topicpublic static final int DELETE_TOPIC_IN_BROKER = 215;// 删除 NameSrv 中的 Topicpublic static final int DELETE_TOPIC_IN_NAMESRV = 216;// 获取键值列表public static final int GET_KVLIST_BY_NAMESPACE = 219;   // 重置消耗者的消耗进度public static final int RESET_CONSUMER_CLIENT_OFFSET = 220;// 从消耗者中获取消耗者的度量指标public static final int GET_CONSUMER_STATUS_FROM_CLIENT = 221;// 让 Broker 重置消耗进度public static final int INVOKE_BROKER_TO_RESET_OFFSET = 222;// 让 Broker 更新消耗者的度量信息public static final int INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223;// 查询消息被谁消耗public static final int QUERY_TOPIC_CONSUME_BY_WHO = 300;// 从集群中获取 Topicpublic static final int GET_TOPICS_BY_CLUSTER = 224;// 注册过滤器服务器public static final int REGISTER_FILTER_SERVER = 301;// 注册消息过滤类public static final int REGISTER_MESSAGE_FILTER_CLASS = 302;// 查询消耗时间public static final int QUERY_CONSUME_TIME_SPAN = 303;// 从 NameSrv 中获取系统Topicpublic static final int GET_SYSTEM_TOPIC_LIST_FROM_NS = 304;// 从 Broker 中获取系统Topicpublic static final int GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305;// 清理过期的消耗队列public static final int CLEAN_EXPIRED_CONSUMEQUEUE = 306;// 获取 Consumer 的运行时信息public static final int GET_CONSUMER_RUNNING_INFO = 307;// 查询修正偏移量public static final int QUERY_CORRECTION_OFFSET = 308;// 直接消耗消息public static final int CONSUME_MESSAGE_DIRECTLY = 309;// 发送消息(v2),优化网络数据包public static final int SEND_MESSAGE_V2 = 310;    // 单元化相关 topicpublic static final int GET_UNIT_TOPIC_LIST = 311;// 获取含有单元化订阅组的 Topic 列表public static final int GET_HAS_UNIT_SUB_TOPIC_LIST = 312;// 获取含有单元化订阅组的非单元化 Topic 列表public static final int GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313;// 克隆消耗进度public static final int CLONE_GROUP_OFFSET = 314;// 查询 Broker 上的度量信息public static final int VIEW_BROKER_STATS_DATA = 315;   // 清理未使用的 Topicpublic static final int CLEAN_UNUSED_TOPIC = 316;// 获取 broker 上的有关消耗的度量信息public static final int GET_BROKER_CONSUME_STATS = 317;   /* update the config of name server */public static final int UPDATE_NAMESRV_CONFIG = 318;/* get config from name server */public static final int GET_NAMESRV_CONFIG = 319;   // 发送批量消息public static final int SEND_BATCH_MESSAGE = 320;// 查询消耗的 Queuepublic static final int QUERY_CONSUME_QUEUE = 321;// 查询数据版本public static final int QUERY_DATA_VERSION = 322;   /* resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before */public static final int RESUME_CHECK_HALF_MESSAGE = 323;// 回送消息public static final int SEND_REPLY_MESSAGE = 324;public static final int SEND_REPLY_MESSAGE_V2 = 325;// push回送消息到客户端public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;}


在 Response 中,为相应码
public class ResponseCode extends RemotingSysResponseCode {// 革新到磁盘超时public static final int FLUSH_DISK_TIMEOUT = 10;// 从节点不可达public static final int SLAVE_NOT_AVAILABLE = 11;// 从节点刷盘超时public static final int FLUSH_SLAVE_TIMEOUT = 12;// 非法的消息结构public static final int MESSAGE_ILLEGAL = 13;// 服务不可用public static final int SERVICE_NOT_AVAILABLE = 14;// 版本不支持public static final int VERSION_NOT_SUPPORTED = 15;// 未授权的public static final int NO_PERMISSION = 16;// Topic 不存在public static final int TOPIC_NOT_EXIST = 17;// Topic 已经存在public static final int TOPIC_EXIST_ALREADY = 18;// 要拉取的偏移量不存在public static final int PULL_NOT_FOUND = 19;// 立即重新拉取public static final int PULL_RETRY_IMMEDIATELY = 20;// 重定向拉取的偏移量public static final int PULL_OFFSET_MOVED = 21;// 不存在的队列public static final int QUERY_NOT_FOUND = 22;// 订阅的 url 解析失败public static final int SUBSCRIPTION_PARSE_FAILED = 23;// 目的订阅不存在public static final int SUBSCRIPTION_NOT_EXIST = 24;// 订阅不是最新的public static final int SUBSCRIPTION_NOT_LATEST = 25;// 订阅组不存在public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26;// 订阅的数据不存在 (tag表达式异常)public static final int FILTER_DATA_NOT_EXIST = 27;// 该 Broker 上订阅的数据不是最新的public static final int FILTER_DATA_NOT_LATEST = 28;   // 事务应该提交public static final int TRANSACTION_SHOULD_COMMIT = 200;// 事务应该回滚public static final int TRANSACTION_SHOULD_ROLLBACK = 201;// 事务状态位置public static final int TRANSACTION_STATE_UNKNOW = 202;// 事务状态Group错误public static final int TRANSACTION_STATE_GROUP_WRONG = 203;   // 买家ID不存在public static final int NO_BUYER_ID = 204;public static final int NOT_IN_CURRENT_UNIT = 205;// 消耗者不在线(rpc)public static final int CONSUMER_NOT_ONLINE = 206;// 消耗超时public static final int CONSUME_MSG_TIMEOUT = 207;// 消息不存在public static final int NO_MESSAGE = 208;// 更新或创建 ACL 配置失败public static final int UPDATE_AND_CREATE_ACL_CONFIG_FAILED = 209;// 删除 ACL 配置失败public static final int DELETE_ACL_CONFIG_FAILED = 210;// 更新全局白名单地点失败public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211; }



lang

字段为消息发起方编码语言,这里默认为 java
private LanguageCode language = LanguageCode.JAVA;version

消息发起方的程序版本
opaque

该字段是为了在同连续接上标识不同的请求,在相应的时间能够回调对应的函数( rocketmq 的发送使用了 TCP 连接复用)
remark

在 Reqeust 中,用于传输自定义文本
在 Response 中,用于传输错误的原因
ext

传输自定义的消息头



消息的发送

在知道消息长啥样后,就可以继续看发送代码了
switch (communicationMode) {case ONEWAY:    this.remotingClient.invokeOneway(addr, request, timeoutMillis);    return null;case ASYNC:    final AtomicInteger times = new AtomicInteger();    long costTimeAsync = System.currentTimeMillis() - beginStartTime;    if (timeoutMillis < costTimeAsync) {      throw new RemotingTooMuchRequestException("sendMessage call timeout");    }    this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,                        retryTimesWhenSendFailed, times, context, producer);    return null;case SYNC:    long costTimeSync = System.currentTimeMillis() - beginStartTime;    if (timeoutMillis < costTimeSync) {      throw new RemotingTooMuchRequestException("sendMessage call timeout");    }    return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);default:    assert false;    break;} return null;


NettyRemotingClient#invokeOneway

我们先来看最简单的Oneway
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {// 创建 Channelfinal Channel channel = this.getAndCreateChannel(addr);if (channel != null && channel.isActive()) {    try {      doBeforeRpcHooks(addr, request);      // 使用建立好的连接发送      this.invokeOnewayImpl(channel, request, timeoutMillis);    } catch (RemotingSendRequestException e) {      log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);      this.closeChannel(addr, channel);      throw e;    }} else {    this.closeChannel(addr, channel);    throw new RemotingConnectException(addr);}}以上可以大致抽象为两个操作:获取或建立TCP连接、通过连接发送数据,同时一旦发生异常则关闭连接



NettyRemotingClient#getAndCreateChannel

先看第一个操作
private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {// 地点为空则说明要获取的是NameServer的地点if (null == addr) {    return getAndCreateNameserverChannel();}   // 尝试从缓存中获取ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {    return cw.getChannel();}   // 没有或未就绪则新建连接return this.createChannel(addr);}可以看出,这里是由一个 ChannelTable 来维护所有的连接,而 ChannelTable 又是由 NettyRemotingClient 维护,即其是在 JVM 上的全局共享实例。
然后再具体查看创建的方法,可以发现 Channel 终极是由客户端的 Bootstrap 异步创建
ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {return cw.getChannel();} // 连接的建立是串行的if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {    // 双重校验保证连接确实没被创建    boolean createNewConnection;    cw = this.channelTables.get(addr);    if (cw != null) {      if (cw.isOK()) {      // 连接建立完成      return cw.getChannel();      } else if (!cw.getChannelFuture().isDone()) {      createNewConnection = false;      } else {      // 建立过但失败了      this.channelTables.remove(addr);      createNewConnection = true;      }    } else {      createNewConnection = true;    }   if (createNewConnection) {      // 实际上的连接创建      ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));      log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);      cw = new ChannelWrapper(channelFuture);      this.channelTables.put(addr, cw);    }} catch (Exception e) {    log.error("createChannel: create channel exception", e);} finally {    this.lockChannelTables.unlock();}} else {log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);} if (cw != null) {ChannelFuture channelFuture = cw.getChannelFuture();// 阻塞直到创建完成if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {    if (cw.isOK()) {      log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());      return cw.getChannel();    } else {      log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());    }} else {    log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),             channelFuture.toString());}} return null;


NettyRemotingAbstract#invokeOnewayImpl

然后接着看第二个操作:通过连接发送数据
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {// 在请求头上的 flag 标记为 oneway 请求request.markOnewayRPC();// 获取信号量,保证不会系统不会蒙受过多请求boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {    final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);    try {      channel.writeAndFlush(request).addListener(new ChannelFutureListener() {      @Override      public void operationComplete(ChannelFuture f) throws Exception {          // 真正发送完成后,释放锁          once.release();          if (!f.isSuccess()) {            log.warn("send a request command to channelfailed.");          }      }      });    } catch (Exception e) {      once.release();      log.warn("write send a request command to channelfailed.");      throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);    }} else {    // 超出请求数    if (timeoutMillis
页: [1]
查看完整版本: RocketMQ源码详解 | Producer篇:消息构成、发送链路