0)在pom.xml中增长如下依靠
org.apache.hadoop hadoop-client 3.1.3 org.apache.hadoop hadoop-hdfs 3.1.3 org.apache.hadoop hadoop-hdfs-client 3.1.3 provided 1)ctrl + n 全局查找datanode,进入DataNode.java
DataNode官方说明
DataNode is a class (and program) that stores a set of blocks for a DFS deployment. A single deployment can have one or many DataNodes. Each DataNode communicates regularly with asingle NameNode. It also communicates with client code and other DataNodes fromtime to time. DataNodes store a series of named blocks. The DataNode allows client code to read these blocks, or to write new block data. The DataNode mayalso, in response to instructions from its NameNode, delete blocks or copy blocks to/from other DataNodes. The DataNode maintains just one critical table:block-> stream of bytes (of BLOCK_SIZE or less) This info is stored on alocal disk. The DataNode reports the table's contents to the NameNode upon startup and every so often afterwards. DataNodes spend their lives in anendless loop of asking the NameNode for something to do. A NameNode can not connect to a DataNode directly; a NameNode simply returns values from functions invoked by a DataNode. DataNodes maintain an open server socket so that client code or other DataNodes can read/write data. The host/port for this server is reported to the NameNode, which then sends that information to clients or other DataNodes that might be interested.
2)ctrl + f,查找main方法
DataNode.java
public static void main(String args[]) { if (DFSUtil.parseHelpArgument(args,DataNode.USAGE, System.out, true)) { System.exit(0); } secureMain(args, null);} public static void secureMain(String args[],SecureResources resources) { int errorCode = 0; try { StringUtils.startupShutdownMessage(DataNode.class, args, LOG); DataNode datanode = createDataNode(args, null, resources); … … } catch (Throwable e) { LOG.error("Exception insecureMain", e); terminate(1, e); } finally { LOG.warn("Exiting Datanode"); terminate(errorCode); }} public static DataNode createDataNode(String args[], Configurationconf, SecureResources resources) throwsIOException { // 初始化DN DataNode dn = instantiateDataNode(args, conf, resources); if (dn != null) { // 启动DN历程 dn.runDatanodeDaemon(); } return dn;} public static DataNode instantiateDataNode(String args [],Configuration conf, SecureResources resources) throws IOException { ... ... return makeInstance(dataLocations, conf, resources);} static DataNode makeInstance(CollectiondataDirs, Configuration conf, SecureResourcesresources) throws IOException { ... ... return new DataNode(conf, locations, storageLocationChecker, resources);} DataNode(final Configuration conf, final List dataDirs, final StorageLocationChecker storageLocationChecker, final SecureResources resources)throws IOException { super(conf); ... ... try { hostName = getHostName(conf); LOG.info("Configured hostname is{}", hostName); // 启动DN startDataNode(dataDirs,resources); } catch (IOException ie) { shutdown(); throw ie; } ... ...} void startDataNode(List dataDirectories, SecureResources resources ) throws IOException { ... ... // 创建数据存储对象 storage = new DataStorage(); // global DN settings registerMXBean(); // 初始化DataXceiver initDataXceiver(); // 启动HttpServer startInfoServer(); pauseMonitor = new JvmPauseMonitor(); pauseMonitor.init(getConf()); pauseMonitor.start(); // BlockPoolTokenSecretManager is required tocreate ipc server. this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager(); // Login is done by now. Set the DN username. dnUserName =UserGroupInformation.getCurrentUser().getUserName(); LOG.info("dnUserName = {}",dnUserName); LOG.info("supergroup = {}",supergroup); // 初始化RPC服务 initIpcServer(); metrics = DataNodeMetrics.create(getConf(),getDisplayName()); peerMetrics = dnConf.peerStatsEnabled ? DataNodePeerMetrics.create(getDisplayName(), getConf()) : null; metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); ecWorker = new ErasureCodingWorker(getConf(),this); blockRecoveryWorker = new BlockRecoveryWorker(this); // 创建BlockPoolManager blockPoolManager = new BlockPoolManager(this); // 心跳管理 blockPoolManager.refreshNamenodes(getConf()); // Create the ReadaheadPool from the DataNodecontext so we can // exit without having to explicitly shutdownits thread pool. readaheadPool = ReadaheadPool.getInstance(); saslClient = newSaslDataTransferClient(dnConf.getConf(), dnConf.saslPropsResolver,dnConf.trustedChannelResolver); saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); startMetricsLogger(); if (dnConf.diskStatsEnabled) { diskMetrics = new DataNodeDiskMetrics(this, dnConf.outliersReportIntervalMs); }}
一、初始化DataXceiverServer
点击initDataXceiver
private void initDataXceiver() throws IOException {//dataXceiverServer是一个服务,DN用来吸收客户端和其他DN发送过来的数据服务 this.dataXceiverServer= new Daemon(threadGroup, xserver); this.threadGroup.setDaemon(true); // autodestroy when empty ... ...}
二、初始化HTTP服务
点击startInfoServer();
DataNode.java
private void startInfoServer() throws IOException { // SecureDataNodeStarter will bind theprivileged port to the channel if // the DN is started by JSVC, pass it along. ServerSocketChannel httpServerChannel =secureResources != null ? secureResources.getHttpServerChannel() :null; httpServer = new DatanodeHttpServer(getConf(), this,httpServerChannel); httpServer.start(); if (httpServer.getHttpAddress() != null) { infoPort =httpServer.getHttpAddress().getPort(); } if (httpServer.getHttpsAddress() != null) { infoSecurePort =httpServer.getHttpsAddress().getPort(); }}DatanodeHttpServer.java
public DatanodeHttpServer(final Configuration conf, final DataNode datanode, final ServerSocketChannel externalHttpChannel) throws IOException { ... ... HttpServer2.Builder builder= new HttpServer2.Builder() .setName("datanode") .setConf(confForInfoServer) .setACL(newAccessControlList(conf.get(DFS_ADMIN, " "))) .hostName(getHostnameForSpnegoPrincipal(confForInfoServer)) .addEndpoint(URI.create("http://localhost:"+ proxyPort)) .setFindPort(true); ... ...}