Hadoop DataNode工作机制,如下图所示:
DataNode启动流程源码如下图所示:
0)在pom.xml中增长如下依靠
org.apache.hadoop hadoop-client 3.1.3 org.apache.hadoop hadoop-hdfs 3.1.3 org.apache.hadoop hadoop-hdfs-client 3.1.3 provided 1)ctrl + n 全局查找datanode,进入DataNode.java
DataNode官方说明
DataNode is a class (and program) that stores a set of blocks for a DFS deployment. A single deployment can have one or many DataNodes. Each DataNode communicates regularly with asingle NameNode. It also communicates with client code and other DataNodes fromtime to time. DataNodes store a series of named blocks. The DataNode allows client code to read these blocks, or to write new block data. The DataNode mayalso, in response to instructions from its NameNode, delete blocks or copy blocks to/from other DataNodes. The DataNode maintains just one critical table:block-> stream of bytes (of BLOCK_SIZE or less) This info is stored on alocal disk. The DataNode reports the table's contents to the NameNode upon startup and every so often afterwards. DataNodes spend their lives in anendless loop of asking the NameNode for something to do. A NameNode can not connect to a DataNode directly; a NameNode simply returns values from functions invoked by a DataNode. DataNodes maintain an open server socket so that client code or other DataNodes can read/write data. The host/port for this server is reported to the NameNode, which then sends that information to clients or other DataNodes that might be interested.
2)ctrl + f,查找main方法
DataNode.java
public static void main(String args[]) { if (DFSUtil.parseHelpArgument(args,DataNode.USAGE, System.out, true)) { System.exit(0); } secureMain(args, null);} public static void secureMain(String args[],SecureResources resources) { int errorCode = 0; try { StringUtils.startupShutdownMessage(DataNode.class, args, LOG); DataNode datanode = createDataNode(args, null, resources); … … } catch (Throwable e) { LOG.error("Exception insecureMain", e); terminate(1, e); } finally { LOG.warn("Exiting Datanode"); terminate(errorCode); }} public static DataNode createDataNode(String args[], Configurationconf, SecureResources resources) throwsIOException { // 初始化DN DataNode dn = instantiateDataNode(args, conf, resources); if (dn != null) { // 启动DN历程 dn.runDatanodeDaemon(); } return dn;} public static DataNode instantiateDataNode(String args [],Configuration conf, SecureResources resources) throws IOException { ... ... return makeInstance(dataLocations, conf, resources);} static DataNode makeInstance(CollectiondataDirs, Configuration conf, SecureResourcesresources) throws IOException { ... ... return new DataNode(conf, locations, storageLocationChecker, resources);} DataNode(final Configuration conf, final List dataDirs, final StorageLocationChecker storageLocationChecker, final SecureResources resources)throws IOException { super(conf); ... ... try { hostName = getHostName(conf); LOG.info("Configured hostname is{}", hostName); // 启动DN startDataNode(dataDirs,resources); } catch (IOException ie) { shutdown(); throw ie; } ... ...} void startDataNode(List dataDirectories, SecureResources resources ) throws IOException { ... ... // 创建数据存储对象 storage = new DataStorage(); // global DN settings registerMXBean(); // 初始化DataXceiver initDataXceiver(); // 启动HttpServer startInfoServer(); pauseMonitor = new JvmPauseMonitor(); pauseMonitor.init(getConf()); pauseMonitor.start(); // BlockPoolTokenSecretManager is required tocreate ipc server. this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager(); // Login is done by now. Set the DN username. dnUserName =UserGroupInformation.getCurrentUser().getUserName(); LOG.info("dnUserName = {}",dnUserName); LOG.info("supergroup = {}",supergroup); // 初始化RPC服务 initIpcServer(); metrics = DataNodeMetrics.create(getConf(),getDisplayName()); peerMetrics = dnConf.peerStatsEnabled ? DataNodePeerMetrics.create(getDisplayName(), getConf()) : null; metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); ecWorker = new ErasureCodingWorker(getConf(),this); blockRecoveryWorker = new BlockRecoveryWorker(this); // 创建BlockPoolManager blockPoolManager = new BlockPoolManager(this); // 心跳管理 blockPoolManager.refreshNamenodes(getConf()); // Create the ReadaheadPool from the DataNodecontext so we can // exit without having to explicitly shutdownits thread pool. readaheadPool = ReadaheadPool.getInstance(); saslClient = newSaslDataTransferClient(dnConf.getConf(), dnConf.saslPropsResolver,dnConf.trustedChannelResolver); saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); startMetricsLogger(); if (dnConf.diskStatsEnabled) { diskMetrics = new DataNodeDiskMetrics(this, dnConf.outliersReportIntervalMs); }}一、初始化DataXceiverServer
点击initDataXceiver
private void initDataXceiver() throws IOException {//dataXceiverServer是一个服务,DN用来吸收客户端和其他DN发送过来的数据服务 this.dataXceiverServer= new Daemon(threadGroup, xserver); this.threadGroup.setDaemon(true); // autodestroy when empty ... ...}二、初始化HTTP服务
点击startInfoServer();
DataNode.java
private void startInfoServer() throws IOException { // SecureDataNodeStarter will bind theprivileged port to the channel if // the DN is started by JSVC, pass it along. ServerSocketChannel httpServerChannel =secureResources != null ? secureResources.getHttpServerChannel() :null; httpServer = new DatanodeHttpServer(getConf(), this,httpServerChannel); httpServer.start(); if (httpServer.getHttpAddress() != null) { infoPort =httpServer.getHttpAddress().getPort(); } if (httpServer.getHttpsAddress() != null) { infoSecurePort =httpServer.getHttpsAddress().getPort(); }}DatanodeHttpServer.java
public DatanodeHttpServer(final Configuration conf, final DataNode datanode, final ServerSocketChannel externalHttpChannel) throws IOException { ... ... HttpServer2.Builder builder= new HttpServer2.Builder() .setName("datanode") .setConf(confForInfoServer) .setACL(newAccessControlList(conf.get(DFS_ADMIN, " "))) .hostName(getHostnameForSpnegoPrincipal(confForInfoServer)) .addEndpoint(URI.create("http://localhost:"+ proxyPort)) .setFindPort(true); ... ...}三、初始化DN的RPC服务端
点击initIpcServer
DataNode.java
private void initIpcServer() throws IOException { InetSocketAddress ipcAddr = NetUtils.createSocketAddr( getConf().getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY)); ... ... ipcServer = new RPC.Builder(getConf()) .setProtocol(ClientDatanodeProtocolPB.class) .setInstance(service) .setBindAddress(ipcAddr.getHostName()) .setPort(ipcAddr.getPort()) .setNumHandlers( getConf().getInt(DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false) .setSecretManager(blockPoolTokenSecretManager).build(); ... ...}四、DN向NN注册
点击refreshNamenodes
BlockPoolManager.java
void refreshNamenodes(Configuration conf) throws IOException { ... ... synchronized (refreshNamenodesLock) { doRefreshNamenodes(newAddressMap, newLifelineAddressMap); }} private void doRefreshNamenodes( Map addrMap, Map lifelineAddrMap) throws IOException { … …. synchronized (this) { … … //Step 3. Start new nameservices if (!toAdd.isEmpty()) { for (String nsToAdd : toAdd) { … … BPOfferService bpos = createBPOS(nsToAdd, addrs,lifelineAddrs); bpByNameserviceId.put(nsToAdd, bpos); offerServices.add(bpos); } } startAll(); } … …} protected BPOfferService createBPOS( final String nameserviceId, List nnAddrs, ListlifelineNnAddrs) { // 根据NameNode个数创建对应的服务 return new BPOfferService(nameserviceId, nnAddrs,lifelineNnAddrs, dn);}点击startAll()
synchronized void startAll() throws IOException { try { UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction() { @Override public Object run() throws Exception{ for (BPOfferService bpos : offerServices) { // 启动服务 bpos.start(); } return null; } }); } catch (InterruptedException ex) { ... ... }}点击start ()
BPOfferService.java
void start() { for (BPServiceActor actor : bpServices){ actor.start(); }}点击start ()
BPServiceActor.java
void start() { ... ... bpThread = new Thread(this); bpThread.setDaemon(true); // needed for JUnittesting// 体现开启一个线程,所有查找该线程的run方法 bpThread.start(); if (lifelineSender != null) { lifelineSender.start(); }}ctrl + f 搜索run方法
public void run(){ LOG.info(this + " starting to offerservice"); try { while (true) { // init stuff try { // setup storage // 向NN 注册 connectToNNAndHandshake(); break; } catch (IOException ioe) { // Initial handshake, storage recoveryor registration failed runningState =RunningState.INIT_FAILED; if (shouldRetryInit()) { // Retry until all namenode's of BPOSfailed initialization LOG.error("Initialization failedfor " + this + " " + ioe.getLocalizedMessage()); // 注册失败,5s后重试 sleepAndLogInterrupts(5000,"initializing"); } else { runningState = RunningState.FAILED; LOG.error("Initialization failedfor " + this + ". Exiting. ", ioe); return; } } } … … while (shouldRun()) { try{ // 发送心跳 offerService(); }catch (Exception ex) { ...... } }} private void connectToNNAndHandshake() throwsIOException { // get NN proxy 获取NN的RPC客户端对象 bpNamenode= dn.connectToNN(nnAddr); // First phase of the handshake with NN - getthe namespace // info. NamespaceInfo nsInfo = retrieveNamespaceInfo(); // Verify that this matches the other NN inthis HA pair. // This also initializes our block pool inthe DN if we are // the first NN connection for this BP. bpos.verifyAndSetNamespaceInfo(this, nsInfo); /* set thread name again to includeNamespaceInfo when it's available. */ this.bpThread.setName(formatThreadName("heartbeating",nnAddr)); // 注册 register(nsInfo);} DatanodeProtocolClientSideTranslatorPBconnectToNN( InetSocketAddress nnAddr) throws IOException{ return new DatanodeProtocolClientSideTranslatorPB(nnAddr,getConf());}DatanodeProtocolClientSideTranslatorPB.java
public DatanodeProtocolClientSideTranslatorPB(InetSocketAddressnameNodeAddr, Configuration conf) throws IOException { RPC.setProtocolEngine(conf,DatanodeProtocolPB.class, ProtobufRpcEngine.class); UserGroupInformation ugi =UserGroupInformation.getCurrentUser(); rpcProxy= createNamenode(nameNodeAddr, conf, ugi);} private static DatanodeProtocolPB createNamenode( InetSocketAddress nameNodeAddr,Configuration conf, UserGroupInformation ugi) throwsIOException { return RPC.getProxy(DatanodeProtocolPB.class, RPC.getProtocolVersion(DatanodeProtocolPB.class), nameNodeAddr, ugi, conf, NetUtils.getSocketFactory(conf,DatanodeProtocolPB.class));}点击register
BPServiceActor.java
void register(NamespaceInfo nsInfo) throwsIOException { // 创建注册信息 DatanodeRegistration newBpRegistration =bpos.createRegistration(); LOG.info(this + " beginning handshakewith NN"); while (shouldRun()) { try { // Use returned registration fromnamenode with updated fields // 把注册信息发送给NN(DN调用接口方法,实行在NN) newBpRegistration = bpNamenode.registerDatanode(newBpRegistration); newBpRegistration.setNamespaceInfo(nsInfo); bpRegistration = newBpRegistration; break; } catch(EOFException e) { // namenode might have just restarted LOG.info("Problem connecting toserver: " + nnAddr + " :" + e.getLocalizedMessage()); sleepAndLogInterrupts(1000,"connecting to server"); } catch(SocketTimeoutException e) { // namenode is busy LOG.info("Problem connecting toserver: " + nnAddr); sleepAndLogInterrupts(1000,"connecting to server"); } } … …}ctrl + n 搜索NameNodeRpcServer
NameNodeRpcServer.java
ctrl + f 在NameNodeRpcServer.java中搜索registerDatanode
public DatanodeRegistration registerDatanode(DatanodeRegistrationnodeReg) throws IOException { checkNNStartup(); verifySoftwareVersion(nodeReg); // 注册DN namesystem.registerDatanode(nodeReg); return nodeReg;}FSNamesystem.java
void registerDatanode(DatanodeRegistrationnodeReg) throws IOException { writeLock(); try { blockManager.registerDatanode(nodeReg); } finally { writeUnlock("registerDatanode"); }}BlockManager.java
public void registerDatanode(DatanodeRegistrationnodeReg) throws IOException { assert namesystem.hasWriteLock(); datanodeManager.registerDatanode(nodeReg); bmSafeMode.checkSafeMode();} public void registerDatanode(DatanodeRegistration nodeReg) throws DisallowedDatanodeException,UnresolvedTopologyException { ... ... // register new datanode 注册DN addDatanode(nodeDescr); blockManager.getBlockReportLeaseManager().register(nodeDescr); // also treat the registration message as aheartbeat // no need to update its timestamp // because its is done when the descriptoris created // 将DN添加到心跳管理 heartbeatManager.addDatanode(nodeDescr); heartbeatManager.updateDnStat(nodeDescr); incrementVersionCount(nodeReg.getSoftwareVersion()); startAdminOperationIfNecessary(nodeDescr); success = true; ... ...} void addDatanode(final DatanodeDescriptor node) { // To keep host2DatanodeMap consistent withdatanodeMap, // remove from host2DatanodeMap the datanodeDescriptor removed // from datanodeMap before adding node tohost2DatanodeMap. synchronized(this) { host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node)); } networktopology.add(node);// may throwInvalidTopologyException host2DatanodeMap.add(node); checkIfClusterIsNowMultiRack(node); resolveUpgradeDomain(node); … …}五、向NN发送心跳
点击BPServiceActor.java中的run方法中的offerService方法
BPServiceActor.java
private void offerService() throws Exception { while (shouldRun()) { ... ... HeartbeatResponse resp = null; if (sendHeartbeat) { boolean requestBlockReportLease =(fullBlockReportLeaseId == 0) && scheduler.isBlockReportDue(startTime); if(!dn.areHeartbeatsDisabledForTests()) { // 发送心跳信息 resp = sendHeartBeat(requestBlockReportLease); assert resp != null; if(resp.getFullBlockReportLeaseId() != 0) { if (fullBlockReportLeaseId != 0){ ... ... } fullBlockReportLeaseId =resp.getFullBlockReportLeaseId(); } ... ... } } ... ... }} HeartbeatResponsesendHeartBeat(booleanrequestBlockReportLease) throws IOException { ... ... // 通过NN的RPC客户端发送给NN HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration, reports, dn.getFSDataset().getCacheCapacity(), dn.getFSDataset().getCacheUsed(), dn.getXmitsInProgress(), dn.getXceiverCount(), numFailedVolumes, volumeFailureSummary, requestBlockReportLease, slowPeers, slowDisks); ... ...}ctrl + n 搜索NameNodeRpcServer
NameNodeRpcServer.java
ctrl + f 在NameNodeRpcServer.java中搜索sendHeartbeat
public HeartbeatResponse sendHeartbeat(DatanodeRegistrationnodeReg, StorageReport[] report, longdnCacheCapacity, long dnCacheUsed, int xmitsInProgress, int xceiverCount, int failedVolumes, VolumeFailureSummaryvolumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers,@NonnullSlowDiskReports slowDisks) throws IOException { checkNNStartup(); verifyRequest(nodeReg); // 处理DN发送的心跳 return namesystem.handleHeartbeat(nodeReg, report, dnCacheCapacity, dnCacheUsed,xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary,requestFullBlockReportLease, slowPeers, slowDisks);} HeartbeatResponse handleHeartbeat(DatanodeRegistrationnodeReg, StorageReport[] reports, longcacheCapacity, long cacheUsed, int xceiverCount, int xmitsInProgress, intfailedVolumes, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, @Nonnull SlowDiskReports slowDisks) throwsIOException { readLock(); try { //get datanode commands final int maxTransfer = blockManager.getMaxReplicationStreams() - xmitsInProgress; // 处理DN发送过来的心跳 DatanodeCommand[] cmds =blockManager.getDatanodeManager().handleHeartbeat( nodeReg, reports, getBlockPoolId(),cacheCapacity, cacheUsed, xceiverCount, maxTransfer,failedVolumes, volumeFailureSummary, slowPeers, slowDisks); long blockReportLeaseId = 0; if (requestFullBlockReportLease) { blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg); } //create ha status final NNHAStatusHeartbeat haState = newNNHAStatusHeartbeat( haContext.getState().getServiceState(), getFSImage().getCorrectLastAppliedOrWrittenTxId()); // 响应DN的心跳 return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo, blockReportLeaseId); } finally { readUnlock("handleHeartbeat"); }}点击handleHeartbeat
DatanodeManager.java
public DatanodeCommand[] handleHeartbeat(DatanodeRegistrationnodeReg, StorageReport[] reports, final StringblockPoolId, long cacheCapacity, long cacheUsed, intxceiverCount, int maxTransfers, int failedVolumes, VolumeFailureSummary volumeFailureSummary, @Nonnull SlowPeerReports slowPeers, @Nonnull SlowDiskReports slowDisks) throwsIOException { ... ... heartbeatManager.updateHeartbeat(nodeinfo, reports,cacheCapacity, cacheUsed, xceiverCount, failedVolumes,volumeFailureSummary); ... ... }HeartbeatManager.java
synchronized void updateHeartbeat(final DatanodeDescriptornode, StorageReport[] reports, longcacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes, VolumeFailureSummary volumeFailureSummary){ stats.subtract(node); blockManager.updateHeartbeat(node, reports,cacheCapacity, cacheUsed, xceiverCount, failedVolumes,volumeFailureSummary); stats.add(node);}BlockManager.java
void updateHeartbeat(DatanodeDescriptor node,StorageReport[] reports, long cacheCapacity, long cacheUsed, intxceiverCount, int failedVolumes, VolumeFailureSummary volumeFailureSummary){ for (StorageReport report: reports) { provided StorageMap.updateStorage(node,report.getStorage()); } node.updateHeartbeat(reports, cacheCapacity, cacheUsed,xceiverCount, failedVolumes, volumeFailureSummary);}DatanodeDescriptor.java
void updateHeartbeat(StorageReport[] reports,long cacheCapacity, long cacheUsed, int xceiverCount, intvolFailures, VolumeFailureSummary volumeFailureSummary){ updateHeartbeatState(reports,cacheCapacity, cacheUsed, xceiverCount, volFailures, volumeFailureSummary); heartbeatedSinceRegistration = true;} void updateHeartbeatState(StorageReport[] reports,long cacheCapacity, long cacheUsed, int xceiverCount, intvolFailures, VolumeFailureSummary volumeFailureSummary){ // 更新存储 updateStorageStats(reports,cacheCapacity, cacheUsed, xceiverCount, volFailures, volumeFailureSummary); // 更新心跳时间 setLastUpdate(Time.now()); setLastUpdateMonotonic(Time.monotonicNow()); rollBlocksScheduled(getLastUpdateMonotonic());} private void updateStorageStats(StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, intvolFailures, VolumeFailureSummary volumeFailureSummary){ long totalCapacity = 0; long totalRemaining = 0; long totalBlockPoolUsed = 0; long totalDfsUsed = 0; long totalNonDfsUsed = 0; … … setCacheCapacity(cacheCapacity); setCacheUsed(cacheUsed); setXceiverCount(xceiverCount); this.volumeFailures = volFailures; this.volumeFailureSummary =volumeFailureSummary; for (StorageReport report : reports) { DatanodeStorageInfo storage = storageMap.get(report.getStorage().getStorageID()); if (checkFailedStorages) { failedStorageInfos.remove(storage); } storage.receivedHeartbeat(report); // skip accounting for capacity of PROVIDEDstorages! if (StorageType.PROVIDED.equals(storage.getStorageType())){ continue; } totalCapacity += report.getCapacity(); totalRemaining += report.getRemaining(); totalBlockPoolUsed +=report.getBlockPoolUsed(); totalDfsUsed += report.getDfsUsed(); totalNonDfsUsed += report.getNonDfsUsed(); } // Update total metrics for the node. // 更新存储相关信息 setCapacity(totalCapacity); setRemaining(totalRemaining); setBlockPoolUsed(totalBlockPoolUsed); setDfsUsed(totalDfsUsed); setNonDfsUsed(totalNonDfsUsed); if (checkFailedStorages) { updateFailedStorage(failedStorageInfos); } long storageMapSize; synchronized (storageMap) { storageMapSize = storageMap.size(); } if (storageMapSize != reports.length) { pruneStorageMap(reports); }}<hr>相关阅读:
Hadoop之HDFS的I/O流操作
Hadoop(HDFS)之 数据完备性
Hadoop(HDFS)之CheckPoint时间设置
Hadoop小试牛刀——HDFS集群压测
手把手教你搞定Hadoop源码编译 |