[HADOOP-11238]hdfs namenode getGroups延迟

Tags: ,

在namenode日志看到如下错误

2016-05-11 01:00:26,360 WARN org.apache.hadoop.security.Groups: Potential performance problem: getGroups(user=xxx) took 5046 milliseconds

这个方法是hadoop为了获取某个用户是哪个组。

分析补丁

补丁:https://issues.apache.org/jira/browse/HADOOP-11238
先贴出修改前的源代码

  public List<String> getGroups(String user) throws IOException {
    // No need to lookup for groups of static users
    List<String> staticMapping = staticUserToGroupsMap.get(user);
    if (staticMapping != null) {
      return staticMapping;
    }
    // Return cached value if available
    CachedGroups groups = userToGroupsMap.get(user);
    long startMs = timer.monotonicNow();
    if (!hasExpired(groups, startMs)) {
      if(LOG.isDebugEnabled()) {
        LOG.debug("Returning cached groups for '" + user + "'");
      }
      if (groups.getGroups().isEmpty()) {
        // Even with enabling negative cache, getGroups() has the same behavior
        // that throws IOException if the groups for the user is empty.
        throw new IOException("No groups found for user " + user);
      }
      return groups.getGroups();
    }
 
    // Create and cache user's groups
    List<String> groupList = impl.getGroups(user);
    long endMs = timer.monotonicNow();
    long deltaMs = endMs - startMs ;
    UserGroupInformation.metrics.addGetGroups(deltaMs);
    if (deltaMs > warningDeltaMs) {
      LOG.warn("Potential performance problem: getGroups(user=" + user +") " +
          "took " + deltaMs + " milliseconds.");
    }
    groups = new CachedGroups(groupList, endMs);
    if (groups.getGroups().isEmpty()) {
      if (isNegativeCacheEnabled()) {
        userToGroupsMap.put(user, groups);
      }
      throw new IOException("No groups found for user " + user);
    }
    userToGroupsMap.put(user, groups);
    if(LOG.isDebugEnabled()) {
      LOG.debug("Returning fetched groups for '" + user + "'");
    }
    return groups.getGroups();
  }

导致缓慢的原因是,当缓存超过时间失效时,同时有多个线程调用impl.getGroups(user)方法,我们默认是使用org.apache.hadoop.security.ShellBasedUnixGroupsMapping的实现,这个实际是调用系统命令id来获取用户组信息。多线程调用时,可能会同时启动几十个进程,导致返回缓慢。

我之前想到的修改方法是使用线程同步的方式,避免多个线程同时更新cache。搜索一下issue果然有人实现了,但那个是很久就提交了,没实现NegativeCache,所以官方合并是我上面给出来的补丁。

HADOOP-11238实现的方案是使用Google Guava第三方库来缓存。不了解具体实现,看注释同样实现了多线程请求时,如果在缓存中不存在,也只有一个线程加载,其他线程等待这个线程加载完成。如果在缓存中存在,直接返回缓存内容,如果缓存超时,刷新数据过程中,其他线程仍然获取旧数据,不会等待这个刷新。
NegativeCache是为了缓存空组(某个用户不属于任何组)的情况,超时时间和cache不一样,所以额外加了个变量。

相关参数

hadoop.security.group.mapping
可选值有
org.apache.hadoop.security.ShellBasedUnixGroupsMapping 直接调用系统命令id
org.apache.hadoop.security.JniBasedUnixGroupsMapping 使用jni调用hadoop自己实现获取组的c++程序
org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMappingWithFallback 使用jni调用,如果加载c++程序失败,改用ShellBase的方式
org.apache.hadoop.security.LdapGroupsMapping 使用ldap方式获取

也可以自己实现一个方法,可以通过读取文件的方式来获取组和用户的映射

踩坑

cloudera manager里面的hadoop.security.group.mapping配置是在“服务范围”内的,也就是对客户端和服务端都会生效。设置为jni的方式后,客户端可能会加载不到native lib导致任务失败。


DataNode启动流程源码分析

背景

最近打算要重启DataNode,之前有试过重启过程中导致业务任务失败的情况。所以想了解DataNode什么时候才算启动完成,以及能否检测DataNode是否已经准备好了。
本文分析的源码版本是hadoop cdh5.4.0

源码分析

从Datanode.java main方法开始

  public static void main(String args[]) {
    if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
      System.exit(0);
    }
 
    secureMain(args, null);
  }

进入secureMain方法,这里调用了createDataNode方法

  public static DataNode createDataNode(String args[], Configuration conf,
      SecureResources resources) throws IOException {
    // 实例化datanode
    DataNode dn = instantiateDataNode(args, conf, resources);
    if (dn != null) {
      // 启动BPOfferService、dataXceiverServer、ipcServer等
      dn.runDatanodeDaemon();
    }
    return dn;
  }
  public static DataNode instantiateDataNode(String args [], Configuration conf,
      SecureResources resources) throws IOException {
    if (conf == null)
      conf = new HdfsConfiguration();
 
    if (args != null) {
      // parse generic hadoop options
      GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
      args = hParser.getRemainingArgs();
    }
 
    if (!parseArguments(args, conf)) {
      printUsage(System.err);
      return null;
    }
    // 获取实际的存储路径(根据配置dfs.datanode.data.dir)
    Collection<StorageLocation> dataLocations = getStorageLocations(conf);
    UserGroupInformation.setConfiguration(conf);
    SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
        DFS_DATANODE_KERBEROS_PRINCIPAL_KEY);
    return makeInstance(dataLocations, conf, resources);
  }
  static DataNode makeInstance(Collection<StorageLocation> dataDirs,
      Configuration conf, SecureResources resources) throws IOException {
    LocalFileSystem localFS = FileSystem.getLocal(conf);
    FsPermission permission = new FsPermission(
        conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
                 DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
    // DataNode磁盘检查类,检查目录权限以及是否能够创建目录
    DataNodeDiskChecker dataNodeDiskChecker =
        new DataNodeDiskChecker(permission);
    // 找出能够正常读写的路径
    List<StorageLocation> locations =
        checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
    DefaultMetricsSystem.initialize("DataNode");
 
    assert locations.size() > 0 : "number of data directories should be > 0";
    return new DataNode(conf, locations, resources);
  }

进入DataNode构造方法,主要看startDataNode(conf, dataDirs, resources)方法

  void startDataNode(Configuration conf, 
                     List<StorageLocation> dataDirs,
                     SecureResources resources
                     ) throws IOException {
 
    // settings global for all BPs in the Data Node
    this.secureResources = resources;
    synchronized (this) {
      this.dataDirs = dataDirs;
    }
    this.conf = conf;
    this.dnConf = new DNConf(conf);
    checkSecureConfig(dnConf, conf, resources);
 
    this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
 
    if (dnConf.maxLockedMemory > 0) {
      if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {
        throw new RuntimeException(String.format(
            "Cannot start datanode because the configured max locked memory" +
            " size (%s) is greater than zero and native code is not available.",
            DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
      }
      if (Path.WINDOWS) {
        NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory);
      } else {
        long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
        if (dnConf.maxLockedMemory > ulimit) {
          throw new RuntimeException(String.format(
            "Cannot start datanode because the configured max locked memory" +
            " size (%s) of %d bytes is more than the datanode's available" +
            " RLIMIT_MEMLOCK ulimit of %d bytes.",
            DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
            dnConf.maxLockedMemory,
            ulimit));
        }
      }
    }
    LOG.info("Starting DataNode with maxLockedMemory = " +
        dnConf.maxLockedMemory);
 
    storage = new DataStorage();
 
    // global DN settings
    registerMXBean();
    // 初始化xceiverServer一些对象
    initDataXceiver(conf);
    // 启动http web hdfs
    startInfoServer(conf);
    pauseMonitor = new JvmPauseMonitor(conf);
    pauseMonitor.start();
 
    // BlockPoolTokenSecretManager is required to create ipc server.
    this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
 
    // Login is done by now. Set the DN user name.
    dnUserName = UserGroupInformation.getCurrentUser().getShortUserName();
    LOG.info("dnUserName = " + dnUserName);
    LOG.info("supergroup = " + supergroup);
    // 初始化ipc服务
    initIpcServer(conf);
 
    metrics = DataNodeMetrics.create(conf, getDisplayName());
    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
 
    // 重要,构造BlockPoolManager
    blockPoolManager = new BlockPoolManager(this);
    blockPoolManager.refreshNamenodes(conf);
 
    // Create the ReadaheadPool from the DataNode context so we can
    // exit without having to explicitly shutdown its thread pool.
    readaheadPool = ReadaheadPool.getInstance();
    saslClient = new SaslDataTransferClient(dnConf.conf, 
        dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
    saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
  }

在blockPoolManager.refreshNamenodes里面调用了doRefreshNamenodes()方法

  private void doRefreshNamenodes(
      Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
    assert Thread.holdsLock(refreshNamenodesLock);
 
    Set<String> toRefresh = Sets.newLinkedHashSet();
    Set<String> toAdd = Sets.newLinkedHashSet();
    Set<String> toRemove;
 
    synchronized (this) {
      // Step 1. For each of the new nameservices, figure out whether
      // it's an update of the set of NNs for an existing NS,
      // or an entirely new nameservice.
      for (String nameserviceId : addrMap.keySet()) {
        if (bpByNameserviceId.containsKey(nameserviceId)) {
          toRefresh.add(nameserviceId);
        } else {
          toAdd.add(nameserviceId);
        }
      }
 
      // Step 2. Any nameservices we currently have but are no longer present
      // need to be removed.
      toRemove = Sets.newHashSet(Sets.difference(
          bpByNameserviceId.keySet(), addrMap.keySet()));
 
      assert toRefresh.size() + toAdd.size() ==
        addrMap.size() :
          "toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
          "  toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) +
          "  toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh);
 
 
       // 由于是重启,所以都是在toAdd里面
      // Step 3. Start new nameservices
      if (!toAdd.isEmpty()) {
        LOG.info("Starting BPOfferServices for nameservices: " +
            Joiner.on(",").useForNull("<default>").join(toAdd));
        // 这里遍历的是nameServices,如果用了hdfs federation就会有多个
        for (String nsToAdd : toAdd) {
          ArrayList<InetSocketAddress> addrs =
            Lists.newArrayList(addrMap.get(nsToAdd).values());
          // 创建BPOfferService
          BPOfferService bpos = createBPOS(addrs);
          // 加入bpByNameserviceId里面,下次再执行这个方法的话,就是加入toRefresh里面了
          bpByNameserviceId.put(nsToAdd, bpos);
          offerServices.add(bpos);
        }
      }
      // 启动所有offerServices的BPOfferService
      startAll();
    }

关注一下createBPOS方法,里面会包含多个BPServiceActor,存储在 BPOfferService.bpServices 变量内

  protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
    return new BPOfferService(nnAddrs, dn);
  }
 
  BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
    Preconditions.checkArgument(!nnAddrs.isEmpty(),
        "Must pass at least one NN.");
    this.dn = dn;
 
    // 如果有standby和active两个namenode,就会创建两个BPServiceActor
    for (InetSocketAddress addr : nnAddrs) {
      this.bpServices.add(new BPServiceActor(addr, this));
    }
  }

上面的startAll()把所有的BPOfferService都启动了,实际是调用下面的BPServiceActor.start(),BPServiceActor的作用在这文件的最上面注释写着:1.和namenode预注册,2.和namenode注册,3.周期性发送心跳到namenode,4.处理来自于namenode的命令。继续看这个类的run()方法

  public void run() {
    LOG.info(this + " starting to offer service");
 
    try {
      while (true) {
        // init stuff
        try {
          // 向namenode注册以及初始化blockPool
          // setup storage
          connectToNNAndHandshake();
          break;
        } catch (IOException ioe) {
          // Initial handshake, storage recovery or registration failed
          runningState = RunningState.INIT_FAILED;
          if (shouldRetryInit()) {
            // Retry until all namenode's of BPOS failed initialization
            LOG.error("Initialization failed for " + this + " "
                + ioe.getLocalizedMessage());
            sleepAndLogInterrupts(5000, "initializing");
          } else {
            runningState = RunningState.FAILED;
            LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
            return;
          }
        }
      }
 
      runningState = RunningState.RUNNING;
 
      while (shouldRun()) {
        try {
          // 更新当前那个是active actor,发送blockReport给namenode等
          offerService();
        } catch (Exception ex) {
          LOG.error("Exception in BPOfferService for " + this, ex);
          sleepAndLogInterrupts(5000, "offering service");
        }
      }
      runningState = RunningState.EXITED;
    } catch (Throwable ex) {
      LOG.warn("Unexpected exception in block pool " + this, ex);
      runningState = RunningState.FAILED;
    } finally {
      LOG.warn("Ending block pool service for: " + this);
      cleanUp();
    }
  }

关注connectToNNAndHandshake()方法,这里初始化了blockPool

  private void connectToNNAndHandshake() throws IOException {
    // get NN proxy
    bpNamenode = dn.connectToNN(nnAddr);
 
     // 从namenode获取一些版本等信息用于校验
    // First phase of the handshake with NN - get the namespace
    // info.
    NamespaceInfo nsInfo = retrieveNamespaceInfo();
 
    // Verify that this matches the other NN in this HA pair.
    // This also initializes our block pool in the DN if we are
    // the first NN connection for this BP.
    bpos.verifyAndSetNamespaceInfo(nsInfo);
 
     // 向namenode注册,并且设定了一下blockReport的时间(当前时间 - (blockReportInterval - delay)),默认是马上
    // Second phase of the handshake with the NN.
    register();
  }

在verifyAndSetNamespaceInfo()方法里,主要看dn.initBlockPool(this);

  void initBlockPool(BPOfferService bpos) throws IOException {
    NamespaceInfo nsInfo = bpos.getNamespaceInfo();
    if (nsInfo == null) {
      throw new IOException("NamespaceInfo not found: Block pool " + bpos
          + " should have retrieved namespace info before initBlockPool.");
    }
 
    setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
 
    // 把BlockPoolId和BPOfferService关联起来,存放在bpByBlockPoolId
    // Register the new block pool with the BP manager.
    blockPoolManager.addBlockPool(bpos);
 
    // 初始化data变量,创建出FsDatasetImpl
    // In the case that this is the first block pool to connect, initialize
    // the dataset, block scanners, etc.
    initStorage(nsInfo);
 
    // 去掉坏了的磁盘
    // Exclude failed disks before initializing the block pools to avoid startup
    // failures.
    checkDiskError();
 
    // 开启DirectoryScanner,定期运行,用于处理内存中的对象和实际存储文件的差异
    initDirectoryScanner(conf);
    // 添加blockPool
    data.addBlockPool(nsInfo.getBlockPoolID(), conf);
    blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
  }

这里面关注的是data.addBlockPool方法

  public void addBlockPool(String bpid, Configuration conf)
      throws IOException {
    LOG.info("Adding block pool " + bpid);
    synchronized(this) {
      // 创建BlockPoolSlice对象,加入bpSlices变量内
      volumes.addBlockPool(bpid, conf);
      // 初始化ReplicaMap对象(ReplicaMap:Maintains the replica map)
      volumeMap.initBlockPool(bpid);
    }
    // 获取所有磁盘的副本map
    volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
  }

volumes.addBlockPool方法里,是创建BlockPoolSlice。BlockPoolSlice的介绍是说这是BlockPool存储在一个磁盘的一部分,里面主要是记录一些目录,还有使用大小等。注意这个有个计算磁盘使用大小非常耗时,这里使用了缓存,每600秒更新一次,在datanode退出的时候会写把数值写到文件里。

  void addBlockPool(final String bpid, final Configuration conf) throws IOException {
    long totalStartTime = Time.monotonicNow();
 
    final List<IOException> exceptions = Collections.synchronizedList(
        new ArrayList<IOException>());
    List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
    for (final FsVolumeImpl v : volumes.get()) {
      Thread t = new Thread() {
        public void run() {
          try (FsVolumeReference ref = v.obtainReference()) {
            FsDatasetImpl.LOG.info("Scanning block pool " + bpid +
                " on volume " + v + "...");
            long startTime = Time.monotonicNow();
            // 创建BlockPoolSlice
            v.addBlockPool(bpid, conf);
            long timeTaken = Time.monotonicNow() - startTime;
            FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid +
                " on " + v + ": " + timeTaken + "ms");
          } catch (ClosedChannelException e) {
            // ignore.
          } catch (IOException ioe) {
            FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
                ". Will throw later.", ioe);
            exceptions.add(ioe);
          }
        }
      };

volumes.getAllVolumesMap()方法,实际是调用每一个BlockPoolSlice.getVolumeMap(),我们当前版本在这里没有cache,因为要遍历目录,所以这里的操作比较耗时。添加cache具体见HDFS-7928补丁。

  void getVolumeMap(ReplicaMap volumeMap,
                    final RamDiskReplicaTracker lazyWriteReplicaMap)
      throws IOException {
    // Recover lazy persist replicas, they will be added to the volumeMap
    // when we scan the finalized directory.
    if (lazypersistDir.exists()) {
      int numRecovered = moveLazyPersistReplicasToFinalized(lazypersistDir);
      FsDatasetImpl.LOG.info(
          "Recovered " + numRecovered + " replicas from " + lazypersistDir);
    }
 
    // 遍历底下所有目录,构造副本map
    // add finalized replicas
    addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
    // add rbw replicas
    addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
  }

回到最上面,执行完bpos.verifyAndSetNamespaceInfo(nsInfo);后,执行register()方法,向namenode注册。执行完后,再往上就是调用offerService(),发送心跳,blockMap等。

结论

namenode启动过程中会有多个线程,http和rpc的端口会先启动,但是这个时候还是不能提供服务的。当前版本主要耗时在构建volumeMap上,需要遍历磁盘目录。
初始化完volumeMap会向namenode注册,注册成功后会在日志打印successfully registered with NN(有两条,active和standby namenode)
之后datanode向namenode汇报块信息,发送完成后在日志打印Successfully sent block report(同样有两条)
只有在blockReport完成后才算真的启动完成。

在重启的过程中,如果时间不算太长,在namenode中还没有判定这个datanode为dead node,里面的block map信息还保留着,所以在完成磁盘扫描 volumeMap 之后也算完成了。

其他

代码比较多,限于个人能力,可能理解也有误,如果发现错误以后再更新。
同时也没找出可编程的方法来判断datanode是否启动完成,除非修改datanode代码自己加标志位。

参考

第七章:小朱笔记hadoop之源码分析-hdfs分析 第五节:Datanode 分析 版本差异比较大
记一次DataNode慢启动问题


HDFS-8824 平衡数据的时候不移动小文件

Tags: ,

Do not use small blocks for balancing the cluster

https://issues.apache.org/jira/browse/HDFS-8824


hdfs complete file 超时

Tags: ,

修改一下超时次数,具体代码在 DFSOutputStream.java

  private void completeFile(ExtendedBlock last) throws IOException {
    long localstart = Time.now();
    long localTimeout = 400;
    boolean fileComplete = false;
    int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
    while (!fileComplete) {
      fileComplete =
          dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
      if (!fileComplete) {
        final int hdfsTimeout = dfsClient.getHdfsTimeout();
        if (!dfsClient.clientRunning &brvbar;&brvbar;
              (hdfsTimeout > 0 && localstart + hdfsTimeout < Time.now())) {
            String msg = "Unable to close file because dfsclient " +
                          " was unable to contact the HDFS servers." +
                          " clientRunning " + dfsClient.clientRunning +
                          " hdfsTimeout " + hdfsTimeout;
            DFSClient.LOG.info(msg);
            throw new IOException(msg);
        }
        try {
          if (retries == 0) {
            throw new IOException("Unable to close file because the last block"
                + " does not have enough number of replicas.");
          }
          retries--;
          Thread.sleep(localTimeout);
          localTimeout *= 2;
          if (Time.now() - localstart > 5000) {
            DFSClient.LOG.info("Could not complete " + src + " retrying...");
          }
        } catch (InterruptedException ie) {
          DFSClient.LOG.warn("Caught exception ", ie);
        }
      }
    }
  }
<property>
  <name>dfs.client.block.write.locateFollowingBlock.retries</name>
  <value>10</value>
</property>

hdfs启动相关文章

Tags: ,

记一次DataNode慢启动问题 :启动过程中,datanode为了获取used size,如果超过时间,可能会执行DU。

 


hadoop检查目录健康状态

DiskChecker负责检查目录是否有建目录和判断权限。

datanode

BlockPoolSlice.checkDirs()
DataNode.checkDiskError()

nodeManager

LocalDirsHandlerService.checkDirs()