HBase 1.2.0源码系列:Compact流程

Compact 是指 HBase 表中 HRegion 上某个 Column Family 下,部分或全部 HFiles 的合并。由于数据持续写入的过程中,MemStore 达到一定阈值,被 flush 到磁盘上,形成许多的小文件,这些文件如果不做处理,将会严重影响 HBase 数据读取的效率。所以在 HBase 系统内部,需要定期在满足一定条件的情况下,或者由人为手动触发,将这许多文件合并成一个大文件,称为 Compact。

1. Compact通过RPC调用触发

RSRpcServices(RegionServer RPC Service),org.apache.hadoop.hbase.regionserver.RSRpcServices

@QosPriority(priority=HConstants.ADMIN_QOS) // 表示服务的相对优先级,Provides a basic notion of quality of service (QOS).
public CompactRegionResponse compactRegion(final RpcController controller,
      final CompactRegionRequest request) throws ServiceException {

    try {
        // 检查RegionServer的状态:isOnline,isAborted、isStopped、fsOk
        checkOpen(); 
        Region region = getRegion(request.getRegion()); // 获取要操作的Region
        ...
        boolean major = false; // 是否执行major compact
        byte [] family = null; // Request是否有column family信息
        Store store = null; // Column family对应的Store
        if (request.hasFamily()) { // 获取存储列族的Store
            family = request.getFamily().toByteArray();
            store = region.getStore(family);
            if (store == null) {
                throw new ServiceException(...);
            }
        }

        if (request.hasMajor()) { 
            major = request.getMajor();
        }

        // 如果有列族信息对列族的Store执行,否则对整个Region执行Major Compaction
        if (major) { 
            // 这里没有真正执行Compaction,只是设置 this.forceMajor = true;
            if (family != null) {
                store.triggerMajorCompaction(); 
            } else {
                region.triggerMajorCompaction();
            }
        }

        // 差别就是是否有Store
        if(family != null) {
            regionServer.compactSplitThread.requestCompaction(region, store, log, 
              Store.PRIORITY_USER, null, RpcServer.getRequestUser());
        } else {
            regionServer.compactSplitThread.requestCompaction(region, log, 
              Store.PRIORITY_USER, null, RpcServer.getRequestUser());
        }

        return CompactRegionResponse.newBuilder().build(); 
    } catch (IOException ie) {
        throw new ServiceException(ie);
    }
}

获取 Region

protected Region getRegionByEncodedName(byte[] regionName, String encodedRegionName)
      throws NotServingRegionException {
    // 从online的region列表中获取
    Region region = this.onlineRegions.get(encodedRegionName); 
    if (region == null) {
        throw new Excetion(....);
        // 不是online的region执行compact操作失败
        // 根据region的状态提示异常,例如:正在move的region
    }
    return region;
}

所有的因为 move 操作关闭的 region 维护在 movedRegions 中

protected Map<String, MovedRegionInfo> movedRegions;

 

2. 进入 Compact 逻辑

org.apache.hadoop.hbase.regionserver.CompactSplitThread

如果没有传入 Column family,遍历所有的 store,执行 requestCompactionInternal

private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
      int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
      throws IOException {

    for (Store s : r.getStores()) {
        CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
        ...
    }

}

– HStore 请求 requestCompaction
org.apache.hadoop.hbase.regionserver.HStore

@Override
public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
      User user) throws IOException {
    
    if (!this.areWritesEnabled()) { // 如果禁用写入,不进行压缩。
        return null;
    }

    removeUnneededFiles(); // 在进行压缩之前,试着去掉不需要的文件来简化事情。
    
    // 根据 hbase.store.delete.expired.storefile 判断是否删除过期的文件
    final CompactionContext compaction = storeEngine.createCompaction();
    CompactionRequest request = null;
    this.lock.readLock().lock(); // 只读锁
    try {
        synchronized (filesCompacting) {
            final Store thisStore = this;
            if (this.getCoprocessorHost() != null) {
                // Coprocessor是0.92之后引入的协处理器,实现一些特性:建立二次索引、复杂过滤器以及访问控制等,先不看这部分逻辑
            }

            // 通用情况
            if (!compaction.hasSelection()) { // this.request != null;
                boolean isUserCompaction = priority == Store.PRIORITY_USER; // true
                boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
                offPeakCompactionTracker.compareAndSet(false, true); // 判断是否是使用高峰
                try {
                    compaction.select(this.filesCompacting, isUserCompaction,
                      mayUseOffPeak, forceMajor && filesCompacting.isEmpty()); // 调用以选择用于压缩的文件
                } catch (IOException e) {
                    // ...
                    throw e;
                }
                // ...
            }

            if (baseRequest != null) {
                // 如果baseRequest不是null,比较baseReques和compaction的Request,判断哪些文件需要压缩
            }

            //得到结果文件列表
            request = compaction.getRequest();
            final Collection<StoreFile> selectedFiles = request.getFiles();
            if (selectedFiles.isEmpty()) {
                return null;
            }

            addToCompactingFiles(selectedFiles); // 添加到filesCompacting

            // 根据request判断是否是major compacti
            this.forceMajor = this.forceMajor && !request.isMajor();

            // 设置公共请求属性,设置优先级
            request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
            request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
        }
    } finally {
        this.lock.readLock().unlock();
    }
    this.region.reportCompactionRequestStart(request.isMajor()); // 计数
    return compaction;
}

storeEngine 根据 hbase.hstore.engine.class 配置获取,默认是 DefaultStoreEngine
所以 compaction 默认是 org.apache.hadoop.hbase.regionserver.DefaultCompactionContext 实现

this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
CompactionContext compaction = storeEngine.createCompaction();

 

compactionPolicy 根据 hbase.hstore.defaultengine.compactionpolicy.class 获取,默认是 ExploringCompactionPolicy
selectCompaction 由父类 org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy 实现,不深入介绍

request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),
filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor);

 

– requestCompactionInternal

// request = null, selectNow = true
private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
      final String why, int priority, CompactionRequest request, boolean selectNow, User user)
      throws IOException {

    if (this.server.isStopped()
      || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
        // 如果Region所属的table设置了COMPACTION_ENABLED=false,不会执行任何Compaction
        return null;
    }

    CompactionContext compaction = null;
    if (selectNow) {
        compaction = selectCompaction(r, s, priority, request, user); // 对Store获取CompactionContext,包含需要压缩的文件
        // CompactionContext是合并的上下文类。该类含有运行一个合并所必需的全部“物理”细节
        if (compaction == null) return null;
    }

    //这里假设大多数压缩是小的。因此,将系统压缩放入小池中,在必要时移动到大型池中。
    // throttleCompaction判断compactionSize > comConf.getThrottlePoint();
    // hbase.regionserver.thread.compaction.throttle参数设置
    ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
      ? longCompactions : shortCompactions;
    pool.execute(new CompactionRunner(s, r, compaction, pool, user)); // 多线程执行Compact,执行逻辑在CompactionRunner
    return selectNow ? compaction.getRequest() : null;
}

 

3. 线程执行Compact

private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
    @Override
    public void run() {
        if (server.isStopped()
          || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
            return; //判断RegionServer状态和Tabel是否开启Compaction
        }
        doCompaction(user);
    }

    private void doCompaction(User user) {
        // 通用逻辑,系统compaction,不包含file selection
        if (this.compaction == null) {
            // 这里判断Store的优先级是否改变,以避免阻塞潜在的更高优先级。
        }

        this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
        // 这里重复执行,之前selectNow=true时已经执行过一次,可能为了防止数据改变

        // 接下来可以进行压缩
        this.compaction.getRequest().beforeExecute(); //现在没有做任何操作
        try {
            boolean completed = region.compact(compaction, store, compactionThroughputController, user);
            if (completed) {
                if (store.getCompactPriority() <= 0) {
                    // 退化情况:重新执行requestCompactionInternal
                    requestSystemCompaction(region, store, "Recursive enqueue");
                } else {
                    // 查看压缩后是否导致超出最大区域大小,需要进行Split。参考Split操作
                    requestSplit(region);
                }
            }
        } catch (Exception ex) {
            // ...
        } finally {
            LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
        }
        this.compaction.getRequest().afterExecute(); //现在没有做任何操作
    }
}

– 真正的compact逻辑
org.apache.hadoop.hbase.regionserver.HRegion

public boolean compact(CompactionContext compaction, Store store,
      CompactionThroughputController throughputController, User user) throws IOException {
    if (this.closing.get() || this.closed.get()) { //判断Region是否close
        store.cancelRequestedCompaction(compaction);
        return false;
    }

    MonitoredTask status = null;
    boolean requestNeedsCancellation = true;
    lock.readLock().lock();
    try {
        byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
        if (stores.get(cf) != store) { // 如果因为各种情况导致:根据cf获取的store和之前获取的store已经不一样了,退出compact
            return false;
        }

        if (this.closed.get()) { //再判断Region是否close
            return false;
        }

        try {
            store.compact(compaction, throughputController, user); // Store执行compact
        } catch (InterruptedIOException iioe) {
        // ...
        }

        return true;
    } finally {

        try {
            if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
            if (status != null) status.cleanup();
        } finally {
            lock.readLock().unlock();
        }
    }
}

– 文件Compact
org.apache.hadoop.hbase.regionserver.HStore

@Override
public List<StoreFile> compact(CompactionContext compaction,
      CompactionThroughputController throughputController, User user) throws IOException {
    List<StoreFile> sfs = null;
    CompactionRequest cr = compaction.getRequest();
    try {
        // 如果有一个有效的压缩请求,在这里做所有明智性检查(sanity check),因为我们需要在下面的最后一个块中清除它之后的清理。
        Collection<StoreFile> filesToCompact = cr.getFiles();
        synchronized (filesCompacting) {
            // sanity check:正在压缩这个Store的文件
            Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
        }

        // 开始压缩
        List<Path> newFiles = compaction.compact(throughputController, user);

        long outputBytes = 0L;
        if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
            // 压缩被停止的处理
        }

        // 完成压缩所需的步骤。
        sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
        writeCompactionWalRecord(filesToCompact, sfs);
        replaceStoreFiles(filesToCompact, sfs);
        // ...
        // 这时候Store将使用所有新的文件。
        completeCompaction(filesToCompact, true); // 存档旧文件和更新存储大小。

        if (region.getRegionServerServices() != null
          && region.getRegionServerServices().getMetrics() != null) {
            region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
              now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
              outputBytes);
        }

        return sfs;
    } finally {
        finishCompactionRequest(cr);
    }
}

Compact的主要启动流程如上所示。欢迎补充和提出错误。

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注

3 × 3 =