小心程序猿QAQ 潜水
  • 4发帖数
  • 4主题数
  • 0关注数
  • 0粉丝
开启左侧

RocketMQ源码详解 | Producer篇:消息构成、发送链路

[复制链接]
小心程序猿QAQ 发表于 2021-10-28 21:48:02 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
概述

在上一节 RocketMQ源码详解 | Producer篇 · 其一:Start,然后 Send 一条消息 中,我们相识了 Producer 在发送消息的流程。这次我们再来具体下看消息的构成与其发送的链路
Message

在 RocketMQ 的使用中,Message 类是在发送消息时必须用到的,此中 body 即是消息的存放位置,另有的就是消息的 标识(flag) 和 属性(properties)
public class Message {  private String topic;  private int flag;  private Map properties;  private byte[] body;  private String transactionId;}消息的标识(flag)

变量名
含义
COMPRESSED_FLAG
压缩消息。消息为批量的时间,就会举行压缩,默认使用5级的 zip
MULTI_TAGS_FLAG
有多个 tag。
TRANSACTION_NOT_TYPE
事务为未知状态。当 Broker 回查 Producer 的时间,假如为 Commit 应该提交,为 Rollback 应该回滚,为 Unknown 时应该继续回查
TRANSACTION_PREPARED_TYPE
事务的运行状态。当前消息是事务的一部分
TRANSACTION_COMMIT_TYPE
事务的提交消息。要求提交事务
TRANSACTION_ROLLBACK_TYPE
事务的回滚消息。要求回滚事务
BORNHOST_V6_FLAG
生成该消息的 host 是否 ipv6 的地点
STOREHOSTADDRESS_V6_FLAG
长期化该消息的 host 是否是 ipv6 的地点



消息的属性(properties)

而消息的 properties 较多,只摘了一小段
属性
含义
KEYS
消息的 Key。服务器会通过 key 设置索引,应用可以通过 Topic 和 Key 来查找这条消息以及被谁消耗
TAGS
消息的子范例,可以根据 tag 选择性消耗
DELAY
延迟消息的延迟级别(共16级,理论上可以有18级)
RETRY_TOPIC
必要重试的 Topic(在 Broker 中会存放到 SCHEDULE_TOPIC_XXXX Topic,此中有 18 个 queue,对应 18 个重试延迟)
REAL_TOPIC
真实的 Topic (RocketMQ 经常使用更换目的 Topic 的"把戏",如事务消息和延时消息,这个字段记录了真正的 Topic)
PGROUP
生产者组
MAX_OFFSET\MIN_OFFSET
在 pull 中的最大偏移量和最小偏移量
TRANSFER_FLAG
事务有关标识
MSG_TYPE
消息范例,是否为回复消息
BUYER_ID
嗯...买家ID?



当然,这只是在生产者中的消息的样子,在 Broker 和消耗者的眼中中,它是这样的
public class MessageExt extends Message {  private static final long serialVersionUID = 5720810158625748049L;   private String brokerName;   private int queueId;   // 存盘的巨细  private int storeSize;   // 在 ConsumerQueue 中的偏移量  private long queueOffset;  private int sysFlag;  // 消息创建时间  private long bornTimestamp;  // 创建地点  private SocketAddress bornHost;   // 存盘时间  private long storeTimestamp;  private SocketAddress storeHost;  private String msgId;  // 在 commitLog 中的偏移量  private long commitLogOffset;  // crc 校验  private int bodyCRC;  // 消耗重试次数  private int reconsumeTimes;   private long preparedTransactionOffset;}


消息的包装

那么,producer 生成了这样的消息后,会直接将其发出去吗?
让我们继续跟踪上一篇没讲完的内容



MQClientAPIImpl#sendMessage

long beginStartTime = System.currentTimeMillis();RemotingCommand request = null;String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);// 是否为 reply 消息boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);if (isReply) {  // 是 smart 消息则加上请求头  if (sendSmartMsg) {    SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);    request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);  } else {    request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);  }} else {  if (sendSmartMsg || msg instanceof MessageBatch) {    SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);    request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);  } else {    request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);  }}request.setBody(msg.getBody()); /* -- pass -- */在这里,我们可以看到在这又加了层套娃(只保存了body),然后才发送
RemotingCommand 的具体属性如下
private int code;private LanguageCode language = LanguageCode.JAVA;private int version = 0;private int opaque = requestId.getAndIncrement();private int flag = 0;private String remark;private HashMap extFields;private transient CommandCustomHeader customHeader;private transient byte[] body;我们还在他的方法中找到了一个叫 encode 的方法,而且返回的是 ByteBuffer 。因此这就是实际发送的消息。
public ByteBuffer encode() {  // 1> header length size  int length = 4;   // 2> header data length  byte[] headerData = this.headerEncode();  length += headerData.length;   // 3> body data length  if (this.body != null) {    length += body.length;  }   ByteBuffer result = ByteBuffer.allocate(4 + length);   // length  result.putInt(length);   // header length  result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));   // header data  result.put(headerData);   // body data;  if (this.body != null) {    result.put(this.body);  }   result.flip();   return result;}


消息的结构

具体的消息结构如下图:

                               
登录/注册后可看大图

此中每个字段在 Request 和 Response 中都有不同的含义
code

在 Request 中,为请求的操作码
public class RequestCode {  // 发送消息  public static final int SEND_MESSAGE = 10;  // 拉取消息  public static final int PULL_MESSAGE = 11;  // 查询消息(所在topic, 必要的 key, 最大数量, 开始偏移量, 结束偏移量)  public static final int QUERY_MESSAGE = 12;  // 查询 Broker 偏移量(未使用)  public static final int QUERY_BROKER_OFFSET = 13;  /*     * 查询消耗者偏移量     *      消耗者会将偏移量存储在内存中,当使用主从架构时,会默认由主 Broker 负责读于写     *      为制止消息堆积,堆积消息超过指定的值时,会由从服务器来接管读,但会导致消耗进度问题     *      所以主从消耗进度的同等性由 从服务器主动上报 和 消耗者内存进度优先 来保证     */  // 查询消耗者自己的偏移量  public static final int QUERY_CONSUMER_OFFSET = 14;  // 提交自己的偏移量  public static final int UPDATE_CONSUMER_OFFSET = 15;  // 创建或更新Topic    public static final int UPDATE_AND_CREATE_TOPIC = 17;  // 获取所有的Topic信息  public static final int GET_ALL_TOPIC_CONFIG = 21;  /*   unused  */  public static final int GET_TOPIC_CONFIG_LIST = 22;  public static final int GET_TOPIC_NAME_LIST = 23;  // 更新 Broker 配置  public static final int UPDATE_BROKER_CONFIG = 25;  // 获取 Broker 配置  public static final int GET_BROKER_CONFIG = 26;  public static final int TRIGGER_DELETE_FILES = 27;  // 获取 Broker 运行时信息  public static final int GET_BROKER_RUNTIME_INFO = 28;  // 通过期间戳查找偏移量  public static final int SEARCH_OFFSET_BY_TIMESTAMP = 29;  // 获取最大偏移量  public static final int GET_MAX_OFFSET = 30;  // 获取最小偏移量  public static final int GET_MIN_OFFSET = 31;  //  public static final int GET_EARLIEST_MSG_STORETIME = 32;   /*       由 Broker 处理        */  // 通过消息ID查询消息  public static final int VIEW_MESSAGE_BY_ID = 33;  // 心跳消息  public static final int HEART_BEAT = 34;  // 注销客户端  public static final int UNREGISTER_CLIENT = 35;  // 陈诉消耗失败(一段时间后重试) (Deprecated)  public static final int CONSUMER_SEND_MSG_BACK = 36;  // 事务结果(大概是 commit 或 rollback)  public static final int END_TRANSACTION = 37;  // 通过消耗者组获取消耗者列表  public static final int GET_CONSUMER_LIST_BY_GROUP = 38;   // 查抄事务状态; Broker对于事务的未知状态的回查操作  public static final int CHECK_TRANSACTION_STATE = 39;  // 通知消耗者的ID已经被更改  public static final int NOTIFY_CONSUMER_IDS_CHANGED = 40;   // 批量锁定 Queue (rebalance使用)  public static final int LOCK_BATCH_MQ = 41;  // 解锁 Queue  public static final int UNLOCK_BATCH_MQ = 42;  // 获得该 Broker 上的所有的消耗者偏移量  public static final int GET_ALL_CONSUMER_OFFSET = 43;  // 获得延迟 Topic 上的偏移量  public static final int GET_ALL_DELAY_OFFSET = 45;  // 查抄客户端配置  public static final int CHECK_CLIENT_CONFIG = 46;  // 更新或创建 ACL  public static final int UPDATE_AND_CREATE_ACL_CONFIG = 50;  // 删除 ACL 配置  public static final int DELETE_ACL_CONFIG = 51;  // 获取 Broker 集群的 ACL 信息  public static final int GET_BROKER_CLUSTER_ACL_INFO = 52;  // 更新全局白名单  public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG = 53;  // 获取 Broker 集群的 ACL 配置  public static final int GET_BROKER_CLUSTER_ACL_CONFIG = 54;    /*      NameServer 相关      */   // 放入键值配置  public static final int PUT_KV_CONFIG = 100;  // 获取键值配置  public static final int GET_KV_CONFIG = 101;  // 删除键值配置  public static final int DELETE_KV_CONFIG = 102;  // 注册 Broker  public static final int REGISTER_BROKER = 103;  // 注销 Broker  public static final int UNREGISTER_BROKER = 104;  // 获取指定 Topic 的路由信息  public static final int GET_ROUTEINFO_BY_TOPIC = 105;  // 获取 Broker 的集群信息  public static final int GET_BROKER_CLUSTER_INFO = 106;     // 更新或创建订阅组  public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200;  // 获取所有订阅组的配置  public static final int GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201;  // 获取 Topic 的度量指标  public static final int GET_TOPIC_STATS_INFO = 202;  // 获取消耗者在线列表(rpc)  public static final int GET_CONSUMER_CONNECTION_LIST = 203;  // 获取生产者在线列表  public static final int GET_PRODUCER_CONNECTION_LIST = 204;  public static final int WIPE_WRITE_PERM_OF_BROKER = 205;  // 从 NameSrv 获取所有 Topic  public static final int GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206;  // 删除订阅组  public static final int DELETE_SUBSCRIPTIONGROUP = 207;  // 获取消耗者的度量指标  public static final int GET_CONSUME_STATS = 208;   public static final int SUSPEND_CONSUMER = 209;  public static final int RESUME_CONSUMER = 210;  public static final int RESET_CONSUMER_OFFSET_IN_CONSUMER = 211;  public static final int RESET_CONSUMER_OFFSET_IN_BROKER = 212;   public static final int ADJUST_CONSUMER_THREAD_POOL = 213;   public static final int WHO_CONSUME_THE_MESSAGE = 214;   // 删除 Broker 中的 Topic  public static final int DELETE_TOPIC_IN_BROKER = 215;  // 删除 NameSrv 中的 Topic  public static final int DELETE_TOPIC_IN_NAMESRV = 216;  // 获取键值列表  public static final int GET_KVLIST_BY_NAMESPACE = 219;   // 重置消耗者的消耗进度  public static final int RESET_CONSUMER_CLIENT_OFFSET = 220;  // 从消耗者中获取消耗者的度量指标  public static final int GET_CONSUMER_STATUS_FROM_CLIENT = 221;  // 让 Broker 重置消耗进度  public static final int INVOKE_BROKER_TO_RESET_OFFSET = 222;  // 让 Broker 更新消耗者的度量信息  public static final int INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223;  // 查询消息被谁消耗  public static final int QUERY_TOPIC_CONSUME_BY_WHO = 300;  // 从集群中获取 Topic  public static final int GET_TOPICS_BY_CLUSTER = 224;  // 注册过滤器服务器  public static final int REGISTER_FILTER_SERVER = 301;  // 注册消息过滤类  public static final int REGISTER_MESSAGE_FILTER_CLASS = 302;  // 查询消耗时间  public static final int QUERY_CONSUME_TIME_SPAN = 303;  // 从 NameSrv 中获取系统Topic  public static final int GET_SYSTEM_TOPIC_LIST_FROM_NS = 304;  // 从 Broker 中获取系统Topic  public static final int GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305;  // 清理过期的消耗队列  public static final int CLEAN_EXPIRED_CONSUMEQUEUE = 306;  // 获取 Consumer 的运行时信息  public static final int GET_CONSUMER_RUNNING_INFO = 307;  // 查询修正偏移量  public static final int QUERY_CORRECTION_OFFSET = 308;  // 直接消耗消息  public static final int CONSUME_MESSAGE_DIRECTLY = 309;  // 发送消息(v2),优化网络数据包  public static final int SEND_MESSAGE_V2 = 310;    // 单元化相关 topic  public static final int GET_UNIT_TOPIC_LIST = 311;  // 获取含有单元化订阅组的 Topic 列表  public static final int GET_HAS_UNIT_SUB_TOPIC_LIST = 312;  // 获取含有单元化订阅组的非单元化 Topic 列表  public static final int GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313;  // 克隆消耗进度  public static final int CLONE_GROUP_OFFSET = 314;  // 查询 Broker 上的度量信息  public static final int VIEW_BROKER_STATS_DATA = 315;   // 清理未使用的 Topic  public static final int CLEAN_UNUSED_TOPIC = 316;  // 获取 broker 上的有关消耗的度量信息  public static final int GET_BROKER_CONSUME_STATS = 317;   /* update the config of name server */  public static final int UPDATE_NAMESRV_CONFIG = 318;  /* get config from name server */  public static final int GET_NAMESRV_CONFIG = 319;   // 发送批量消息  public static final int SEND_BATCH_MESSAGE = 320;  // 查询消耗的 Queue  public static final int QUERY_CONSUME_QUEUE = 321;  // 查询数据版本  public static final int QUERY_DATA_VERSION = 322;   /* resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before */  public static final int RESUME_CHECK_HALF_MESSAGE = 323;  // 回送消息  public static final int SEND_REPLY_MESSAGE = 324;  public static final int SEND_REPLY_MESSAGE_V2 = 325;  // push回送消息到客户端  public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;}


在 Response 中,为相应码
public class ResponseCode extends RemotingSysResponseCode {  // 革新到磁盘超时  public static final int FLUSH_DISK_TIMEOUT = 10;  // 从节点不可达  public static final int SLAVE_NOT_AVAILABLE = 11;  // 从节点刷盘超时  public static final int FLUSH_SLAVE_TIMEOUT = 12;  // 非法的消息结构  public static final int MESSAGE_ILLEGAL = 13;  // 服务不可用  public static final int SERVICE_NOT_AVAILABLE = 14;  // 版本不支持  public static final int VERSION_NOT_SUPPORTED = 15;  // 未授权的  public static final int NO_PERMISSION = 16;  // Topic 不存在  public static final int TOPIC_NOT_EXIST = 17;  // Topic 已经存在  public static final int TOPIC_EXIST_ALREADY = 18;  // 要拉取的偏移量不存在  public static final int PULL_NOT_FOUND = 19;  // 立即重新拉取  public static final int PULL_RETRY_IMMEDIATELY = 20;  // 重定向拉取的偏移量  public static final int PULL_OFFSET_MOVED = 21;  // 不存在的队列  public static final int QUERY_NOT_FOUND = 22;  // 订阅的 url 解析失败  public static final int SUBSCRIPTION_PARSE_FAILED = 23;  // 目的订阅不存在  public static final int SUBSCRIPTION_NOT_EXIST = 24;  // 订阅不是最新的  public static final int SUBSCRIPTION_NOT_LATEST = 25;  // 订阅组不存在  public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26;  // 订阅的数据不存在 (tag表达式异常)  public static final int FILTER_DATA_NOT_EXIST = 27;  // 该 Broker 上订阅的数据不是最新的  public static final int FILTER_DATA_NOT_LATEST = 28;   // 事务应该提交  public static final int TRANSACTION_SHOULD_COMMIT = 200;  // 事务应该回滚  public static final int TRANSACTION_SHOULD_ROLLBACK = 201;  // 事务状态位置  public static final int TRANSACTION_STATE_UNKNOW = 202;  // 事务状态Group错误  public static final int TRANSACTION_STATE_GROUP_WRONG = 203;   // 买家ID不存在  public static final int NO_BUYER_ID = 204;  public static final int NOT_IN_CURRENT_UNIT = 205;  // 消耗者不在线(rpc)  public static final int CONSUMER_NOT_ONLINE = 206;  // 消耗超时  public static final int CONSUME_MSG_TIMEOUT = 207;  // 消息不存在  public static final int NO_MESSAGE = 208;  // 更新或创建 ACL 配置失败  public static final int UPDATE_AND_CREATE_ACL_CONFIG_FAILED = 209;  // 删除 ACL 配置失败  public static final int DELETE_ACL_CONFIG_FAILED = 210;  // 更新全局白名单地点失败  public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211; }



lang

字段为消息发起方编码语言,这里默认为 java
private LanguageCode language = LanguageCode.JAVA;version

消息发起方的程序版本
opaque

该字段是为了在同连续接上标识不同的请求,在相应的时间能够回调对应的函数( rocketmq 的发送使用了 TCP 连接复用)
remark

在 Reqeust 中,用于传输自定义文本
在 Response 中,用于传输错误的原因
ext

传输自定义的消息头



消息的发送

在知道消息长啥样后,就可以继续看发送代码了
switch (communicationMode) {  case ONEWAY:    this.remotingClient.invokeOneway(addr, request, timeoutMillis);    return null;  case ASYNC:    final AtomicInteger times = new AtomicInteger();    long costTimeAsync = System.currentTimeMillis() - beginStartTime;    if (timeoutMillis < costTimeAsync) {      throw new RemotingTooMuchRequestException("sendMessage call timeout");    }    this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,                          retryTimesWhenSendFailed, times, context, producer);    return null;  case SYNC:    long costTimeSync = System.currentTimeMillis() - beginStartTime;    if (timeoutMillis < costTimeSync) {      throw new RemotingTooMuchRequestException("sendMessage call timeout");    }    return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);  default:    assert false;    break;} return null;


NettyRemotingClient#invokeOneway

我们先来看最简单的Oneway
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {  // 创建 Channel  final Channel channel = this.getAndCreateChannel(addr);  if (channel != null && channel.isActive()) {    try {      doBeforeRpcHooks(addr, request);      // 使用建立好的连接发送      this.invokeOnewayImpl(channel, request, timeoutMillis);    } catch (RemotingSendRequestException e) {      log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);      this.closeChannel(addr, channel);      throw e;    }  } else {    this.closeChannel(addr, channel);    throw new RemotingConnectException(addr);  }}以上可以大致抽象为两个操作:获取或建立TCP连接、通过连接发送数据,同时一旦发生异常则关闭连接



NettyRemotingClient#getAndCreateChannel

先看第一个操作
private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {  // 地点为空则说明要获取的是NameServer的地点  if (null == addr) {    return getAndCreateNameserverChannel();  }   // 尝试从缓存中获取  ChannelWrapper cw = this.channelTables.get(addr);  if (cw != null && cw.isOK()) {    return cw.getChannel();  }   // 没有或未就绪则新建连接  return this.createChannel(addr);}可以看出,这里是由一个 ChannelTable 来维护所有的连接,而 ChannelTable 又是由 NettyRemotingClient 维护,即其是在 JVM 上的全局共享实例。
然后再具体查看创建的方法,可以发现 Channel 终极是由客户端的 Bootstrap 异步创建
ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {  return cw.getChannel();} // 连接的建立是串行的if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {  try {    // 双重校验保证连接确实没被创建    boolean createNewConnection;    cw = this.channelTables.get(addr);    if (cw != null) {      if (cw.isOK()) {        // 连接建立完成        return cw.getChannel();      } else if (!cw.getChannelFuture().isDone()) {        createNewConnection = false;      } else {        // 建立过但失败了        this.channelTables.remove(addr);        createNewConnection = true;      }    } else {      createNewConnection = true;    }     if (createNewConnection) {      // 实际上的连接创建      ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));      log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);      cw = new ChannelWrapper(channelFuture);      this.channelTables.put(addr, cw);    }  } catch (Exception e) {    log.error("createChannel: create channel exception", e);  } finally {    this.lockChannelTables.unlock();  }} else {  log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);} if (cw != null) {  ChannelFuture channelFuture = cw.getChannelFuture();  // 阻塞直到创建完成  if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {    if (cw.isOK()) {      log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());      return cw.getChannel();    } else {      log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());    }  } else {    log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),             channelFuture.toString());  }} return null;


NettyRemotingAbstract#invokeOnewayImpl

然后接着看第二个操作:通过连接发送数据
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)  throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {  // 在请求头上的 flag 标记为 oneway 请求  request.markOnewayRPC();  // 获取信号量,保证不会系统不会蒙受过多请求  boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);  if (acquired) {    final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);    try {      channel.writeAndFlush(request).addListener(new ChannelFutureListener() {        @Override        public void operationComplete(ChannelFuture f) throws Exception {          // 真正发送完成后,释放锁          once.release();          if (!f.isSuccess()) {            log.warn("send a request command to channel  failed.");          }        }      });    } catch (Exception e) {      once.release();      log.warn("write send a request command to channel  failed.");      throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);    }  } else {    // 超出请求数    if (timeoutMillis
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

猜你喜欢
在线客服邮箱
wxcy#wkgb.net

邮箱地址#换为@

Powered by 创意电子 ©2018-现在 专注资源实战分享源码下载站联盟商城