继承正题,解析connect-mirror-maker.sh命令行参数可以获取到配置文件和clusters参数。然后就可以实例化MirrorMaker类。
MirrorMaker mirrorMaker = new MirrorMaker(config, clusters, Time.SYSTEM);该类的构造方法中用一个Set范例herderPairs变量存放配置文件中配置的replicaiton flow流向(如:x-y或者y->x)。默认情况下,纵然x->y.enabled=false,x->y也会被存放到herderPairs内;除非emit.heartbeats.enabled=false或者x-y.emit.heartbeats.enabled=false时,x-y才不会被存放到herderPairs内。这么做的原因是:
对于开启emit.hearts的replicationflow(如A->B),是必要两个herders的。一个是A->B属于真实的replicaiton flow同步topic数据,即MirrorSourceConnector;另一个是B->A属于用于监控replicaiton flow健康状态的提交heartbeats到A的MirrorHeartbeatConnector。
log.info("Targeting clusters {}", this.clusters);this.herderPairs = config.clusterPairs().stream() .filter(x -> this.clusters.contains(x.target())) .collect(Collectors.toSet());除了上述规则,假如命令行参数显式设置了clusters参数,herderPairs内存储的SourceAndTarget pairs还必要满足target cluster属于clusters参数的子集。这么做的原因是考虑MirrorMaker优先运行在target cluster原则:该原则主要是考虑异常情况下,避免无效读写请求。
然后,遍历herderPairs变量,每个变量(SourceAndTarget)均执行addHerder()方法。AddHerder()方法主要功能:实例化一个Kafka Connect框架Wroker类,该Worker类会运行多个线程执行多个tasks(即读Kafka或者写Kafka)。
Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);再基于Worker对象,实例化Herder接口类,该接口有两个实现类:StandaloneHerder、DistributedHerder。MirrorMaker类使用的是DistributedHerder。
Herder herder = new DistributedHerder(distributedConfig, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin);然后,将每一个SourceAndTarget和对应的herder,存储到Map范例的herders变量中。
private final Map herders = new HashMap();herders.put(sourceAndTarget, herder);关于Herder接口和两个实现类:
Herder接口用于跟踪和管理workers和connectors。StandaloneHerder属于单进程,使用基于内存的MemoryStatusBackingStore和MemoryConfigBackingStore,用于standalone模式Kafka Connect进程。DistributedHerder使用基于Kafka的KafkaStatusBackingStore和KafkaConfigBackingStore,在多个进程之间和谐workers,底层基于Kafka group membership实现group managed;每个加入group的DistributedHerder实例,上报它的configuration state。Group和谐器给每个实例分配一些connectors和tasks执行,分配策略接纳简朴的round-ronbin方式。但这也不是绝对的,为了避免start/stop花销,herder也会接纳sticky分配策略。DistributedHerder实例只运行分配给它的connectors和tasks。补充下几个概念:
Connectors:the high level abstraction that coordinates data streaming by managing tasksTasks:the implementation of how data is copied to or from KafkaWorkers:the running processes that execute connectors and tasks至此,MirrorMaker构造方法竣事,MirrorMaker实例化完成。
MirrorMaker类start方法