impala因为invalidate语句导致执行DDL语句卡住

现象

impala突然无法执行DDL语句,一直卡住,累积的很多任务。初步怀疑impala并发有问题,采取了一些减少并发的方法,有一点作用,但故障没有完全消除。

排查

怀疑超时时间太短(2分钟),修改代码更改超时时间到10分钟,没效果,但发现一个规律,有些任务在超时后17秒成功。

开启详细debug日志。发现impalad的日志显示是获取元数据超时,但发现catalogd的获取元素据只需要10+秒。catalogd没有把结果返回给impalad?

这里解释一下impalad和catalogd的关系。客户端直接访问impalad,impalad在本地缓存找不到这个表的元素据,就会向catalogd发起请求,catalogd从hive metastore获取元数据以及namenode获取block信息,返回给impalad。impalad在获取的过程中,每隔2秒检查一下本地的缓存,看这个表获取完元素据了没有,直到超时。

但是。。。由于代码太复杂,C++和java的代码互相调用,我看不懂传递的过程。期望从代码了解为什么丢了结果是没办法了。另外还有一个办法是在代码里加上一些跟踪的日志重新编译,但耗时可能很长,没有继续下去。

偶然发现有些卡住的语句,在我执行refresh语句时居然不卡了。想起我们执行语句前加了-r参数,每次都会执行invalidate metadata语句,猜想是不是因为这个引起。取消这个参数后一切正常,之前加这个参数是为了避免有些表没有更新元素据,可能会导致table not found的错误。(现在不需要这个参数,建表和更改时会执行这个语句)

代码分析

由于看不懂全部代码,所以以下分析可能有误。来看一下invalidate metadata做了什么。从注释来看应该没找错方法=。=

  /**
   * Executes a TResetMetadataRequest and returns the result as a
   * TResetMetadataResponse. Based on the request parameters, this operation
   * may do one of three things:
   * 1) invalidate the entire catalog, forcing the metadata for all catalog
   *    objects to be reloaded.
   * 2) invalidate a specific table, forcing the metadata to be reloaded
   *    on the next access.
   * 3) perform a synchronous incremental refresh of a specific table.
   *
   * For details on the specific commands see comments on their respective
   * methods in CatalogServiceCatalog.java.
   */
  public TResetMetadataResponse execResetMetadata(TResetMetadataRequest req)

这个方法里调用了catalog_.reset();

///impala-fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java
 
  /**
   * Resets this catalog instance by clearing all cached table and database metadata.
   */
  public void reset() throws CatalogException {
    catalogLock_.writeLock().lock();
    try {
      nextTableId_.set(0);
 
      // Since UDFs/UDAs are not persisted in the metastore, we won't clear
      // them across reset. To do this, we store all the functions before
      // clearing and restore them after.
      // TODO: Everything about this. Persist them.
      List<Pair<String, HashMap<String, List<Function>>>> functions =
          Lists.newArrayList();
      for (Db db: dbCache_.get().values()) {
        if (db.numFunctions() == 0) continue;
        functions.add(Pair.create(db.getName(), db.getAllFunctions()));
      }
 
      // Build a new DB cache, populate it, and replace the existing cache in one
      // step.
      ConcurrentHashMap<String, Db> newDbCache = new ConcurrentHashMap<String, Db>();
      List<TTableName> tblsToBackgroundLoad = Lists.newArrayList();
      MetaStoreClient msClient = metaStoreClientPool_.getClient();
      try {
        for (String dbName: msClient.getHiveClient().getAllDatabases()) {
          Db db = new Db(dbName, this);
          db.setCatalogVersion(incrementAndGetCatalogVersion());
          newDbCache.put(db.getName().toLowerCase(), db);
 
          for (String tableName: msClient.getHiveClient().getAllTables(dbName)) {
            Table incompleteTbl = IncompleteTable.createUninitializedTable(
                getNextTableId(), db, tableName);
            incompleteTbl.setCatalogVersion(incrementAndGetCatalogVersion());
            db.addTable(incompleteTbl);
            if (loadInBackground_) {
              tblsToBackgroundLoad.add(
                  new TTableName(dbName.toLowerCase(), tableName.toLowerCase()));
            }
          }
        }
      } finally {
        msClient.release();
      }
 
      // Restore UDFs/UDAs.
      for (Pair<String, HashMap<String, List<Function>>> dbFns: functions) {
        Db db = null;
        try {
          db = newDbCache.get(dbFns.first);
        } catch (Exception e) {
          continue;
        }
        if (db == null) {
          // DB no longer exists - it was probably dropped externally.
          // TODO: We could restore this DB and then add the functions back?
          continue;
        }
 
        for (List<Function> fns: dbFns.second.values()) {
          for (Function fn: fns) {
            if (fn.getBinaryType() == TFunctionBinaryType.BUILTIN) continue;
            fn.setCatalogVersion(incrementAndGetCatalogVersion());
            db.addFunction(fn);
          }
        }
      }
      dbCache_.set(newDbCache);
      // Submit tables for background loading.
      for (TTableName tblName: tblsToBackgroundLoad) {
        tableLoadingMgr_.backgroundLoad(tblName);
      }
    } catch (Exception e) {
      LOG.error(e);
      throw new CatalogException("Error initializing Catalog. Catalog may be empty.", e);
    } finally {
      catalogLock_.writeLock().unlock();
    }
  }

reset里面,dbCache_可能是impalad的本地缓存(又或者是catalogd的…),里面存有各个表的元数据信息(包含block信息)。这里重新从metastore获取了所有的表信息(不包含分区和block信息),这些表称之为incomplete Table,因为还没有获取分区和block信息。如果启用了loadInBackground,就会在后台获取分区等信息,没启用的话就等使用的时候获取。最后设为dbCache_,相当于清空所有元素据缓存。

结论

从代码上解释,impalad在向catalogd获取元素据信息过程中,执行了invalidate metadata,导致本地缓存清空,impalad在检查缓存时,发现元素据还是空的,一直等到超时重试。这也能解释并发多时,一直失败,因为出现invalidate metadata的概率多了。

另外取消这个后,应该能提高impalad的查询速度,能够利用本地缓存。

后续关注

去掉这个参数后会不会导致元素据不同步,导致读取不到最新的数据。




fatkun

  1. 现在我有一个问题就是每天产生一个日志文件大概有2.5G,然后晚上12点进行使用insert …select归档写入分区表parquet文件中,由于parquet按1G块划分的,最终的结果就是会产生三个文件。。。我想过直接写HDFS的方式写入PARQUET文件中,这样只会产生一个文件,但是官方文档说不要这样做,因为最终的PARQUET会产生多个HDFS块,会造成网络传输,效率更低,我不知道你能否给个建议。谢谢!

      • 这玩意,我只想到一个办法,就是在日志归档后(压缩的,肯定一般1G文本只有300多M),再用程序跑一遍,把多个文件合并写到1个G,剩下的再合并,如此往复。哈哈,有点像HBASE的hstore功能。。