大数据从业者FelixZh 潜水
  • 1发帖数
  • 1主题数
  • 0关注数
  • 0粉丝
开启左侧

MirrorMakerV2源码解读

[复制链接]
大数据从业者FelixZh 发表于 2021-10-19 07:04:00 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
前言

前面已经梳理MirrorMakerV2(以下简称MM2)概念及实践,可以直接到文章合集-Kafka中查阅。本文主要深入MM2源码,介绍MM2的启动流程。
启动脚本剖析

启动脚本为connect-mirror-maker.sh,可以通过KAFKA_LOG4J_OPTS指定log4j配置文件,可以通过KAFKA_HEAP_OPTS指定堆内存(默认为-Xms256M –Xmx2G),详细类路径为:
org.apache.kafka.connect.mirror.MirrorMakerMirrorMaker类剖析

main方法起首通过argparse4j(第三方实现的解析命令行的工具)解析命令行参数,返回一个Namespace包装类。
ArgumentParser parser = ArgumentParsers.newArgumentParser("connect-mirror-maker");parser.description("MirrorMaker 2.0 driver");parser.addArgument("config").type(Arguments.fileType().verifyCanRead())    .metavar("mm2.properties").required(true)    .help("MM2 configuration file.");parser.addArgument("--clusters").nargs("+").metavar("CLUSTER").required(false)    .help("Target cluster to use for this node.");Namespace ns;try {    ns = parser.parseArgs(args);} catch (ArgumentParserException e) {    parser.handleError(e);    Exit.exit(-1);    return;}题外话:这个Namespace类计划的很方便。该类实际包装一个Map(存储解析出来的命令行参数KV),通过一个get()方法返回泛型类效果,然后getString()、getByte()、getShort()、getInt()、getLong()、getFloat()、getDouble()、getBoolean()、getList()等方法调用get()方法获取指定命据范例的返回效果。这样的计划很方便获取各种范例的命令行参数值,完全可以复用在别的日常开发中。Namespace详细类结构如图所示:

                               
登录/注册后可看大图


MirrorMaker类实例化方法

继承正题,解析connect-mirror-maker.sh命令行参数可以获取到配置文件和clusters参数。然后就可以实例化MirrorMaker类。
MirrorMaker mirrorMaker = new MirrorMaker(config, clusters, Time.SYSTEM);该类的构造方法中用一个Set范例herderPairs变量存放配置文件中配置的replicaiton flow流向(如:x-y或者y->x)。默认情况下,纵然x->y.enabled=false,x->y也会被存放到herderPairs内;除非emit.heartbeats.enabled=false或者x-y.emit.heartbeats.enabled=false时,x-y才不会被存放到herderPairs内。这么做的原因是:
对于开启emit.hearts的replicationflow(如A->B),是必要两个herders的。一个是A->B属于真实的replicaiton flow同步topic数据,即MirrorSourceConnector;另一个是B->A属于用于监控replicaiton flow健康状态的提交heartbeats到A的MirrorHeartbeatConnector。
log.info("Targeting clusters {}", this.clusters);this.herderPairs = config.clusterPairs().stream()    .filter(x -> this.clusters.contains(x.target()))    .collect(Collectors.toSet());除了上述规则,假如命令行参数显式设置了clusters参数,herderPairs内存储的SourceAndTarget pairs还必要满足target cluster属于clusters参数的子集。这么做的原因是考虑MirrorMaker优先运行在target cluster原则:该原则主要是考虑异常情况下,避免无效读写请求。
然后,遍历herderPairs变量,每个变量(SourceAndTarget)均执行addHerder()方法。AddHerder()方法主要功能:实例化一个Kafka Connect框架Wroker类,该Worker类会运行多个线程执行多个tasks(即读Kafka或者写Kafka)。
Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);再基于Worker对象,实例化Herder接口类,该接口有两个实现类:StandaloneHerder、DistributedHerder。MirrorMaker类使用的是DistributedHerder。
Herder herder = new DistributedHerder(distributedConfig, time, worker,        kafkaClusterId, statusBackingStore, configBackingStore,        advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin);然后,将每一个SourceAndTarget和对应的herder,存储到Map范例的herders变量中。
private final Map herders = new HashMap();herders.put(sourceAndTarget, herder);关于Herder接口和两个实现类:
Herder接口用于跟踪和管理workers和connectors。StandaloneHerder属于单进程,使用基于内存的MemoryStatusBackingStore和MemoryConfigBackingStore,用于standalone模式Kafka Connect进程。DistributedHerder使用基于Kafka的KafkaStatusBackingStore和KafkaConfigBackingStore,在多个进程之间和谐workers,底层基于Kafka group membership实现group managed;每个加入group的DistributedHerder实例,上报它的configuration state。Group和谐器给每个实例分配一些connectors和tasks执行,分配策略接纳简朴的round-ronbin方式。但这也不是绝对的,为了避免start/stop花销,herder也会接纳sticky分配策略。DistributedHerder实例只运行分配给它的connectors和tasks。补充下几个概念:
Connectors:the high level abstraction that coordinates data streaming by managing tasksTasks:the implementation of how data is copied to or from KafkaWorkers:the running processes that execute connectors and tasks至此,MirrorMaker构造方法竣事,MirrorMaker实例化完成。
MirrorMaker类start方法

我们知道,上述构造方法中得到Map范例的herders变量。这里重新遍历下,调用每个Herder实例的start方法(即DistributedHerder类的start方法)。
for (Herder herder : herders.values()) {    try {        herder.start();    } finally {        startLatch.countDown();    }}DistributedHerder类实例化时,实例化了一个线程数为1的herderExecutor线程池。
this.herderExecutor = new ThreadPoolExecutor(1, 1, 0L,        TimeUnit.MILLISECONDS,        new LinkedBlockingDeque(1),        ThreadUtils.createThreadFactory(                this.getClass().getSimpleName() + "-" + clientId + "-%d", false));DistributedHerder类实现了Runnable接口,其start方法将自身提交到上述线程池运行。
@Overridepublic void start() {    this.herderExecutor.submit(this);}DistributedHerder类run方法通过调用startServices()方法以启动worker、statusBackingStore、configBackingStore服务。
protected void startServices() {    this.worker.start();    this.statusBackingStore.start();    this.configBackingStore.start();}末了,MirrorMaker类start()方法会遍历Set范例的herderPairs变量,每一个变量执行configureConnectors()方法,该方法终极调用DistributedHerder类的putConnectorConfig方法。
herderPairs.forEach(x -> configureConnectors(x));putConnectorConfig将详细的SourceAndTarget所对应的replication flow任务提交到上述已经完成启动的DistributedHerder各种服务中。
至此,MIrrorMakerV2.0启动完成!!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

猜你喜欢
在线客服邮箱
wxcy#wkgb.net

邮箱地址#换为@

Powered by 创意电子 ©2018-现在 专注资源实战分享源码下载站联盟商城