HBase 1.2.0源码系列:Split流程

当 Region 发生以下操作时会判断是否 Split:

  1. 当 Memstore flush 操作后,HRegion 写入新的 HFile,有可能产生较大的 HFile,会判断是否需要执行 Split。
  2. HStore 执行完成 Compact 操作后可能产生较大的 HFile,会判断是否需要执行 Split。
  3. HBaseAdmin/Shell 手动执行 split 命令时,会触发 Split。

Split 的触发方法为:

public class CompactSplitThread 
    implements CompactionRequestor, PropagatingConfigurationObserver {
    // ...

    public synchronized boolean requestSplit(final Region r) {
        if (shouldSplitRegion() 
            && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {

            byte[] midKey = ((HRegion)r).checkSplit();
            if (midKey != null) {
                requestSplit(r, midKey);
                return true;
            }
        }
        return false;
    }
}

0. Split 判断

条件一:当前 Online Region 数量小于集群最大的 Region 数量限制。

// this.regionSplitLimit = conf.getInt("hbase.regionserver.regionSplitLimit", 1000);

private boolean shouldSplitRegion() {
    // ...
    return (regionSplitLimit > server.getNumberOfOnlineRegions());
}

条件二:Compact 的优先级大于 Store.PRIORITY_USER

// ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER

public int getCompactPriority() {
    int count = Integer.MAX_VALUE;
    for (Store store : stores.values()) {
        count = Math.min(count, store.getCompactPriority());
    }
    return count;
}

// class HStore 

// 获取 Compact 优先级
public int getCompactPriority() {
    int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
    if (priority == PRIORITY_USER) {
        LOG.warn("Compaction priority is USER despite there being no user compaction");
    }
    return priority;
}

// class DefaultStoreFileManager

// Region 的 storefile 个数达到该值则 block 写入,等待 compact
// this.blockingFileCount = conf.getInt("hbase.hstore.blockingStoreFiles", 7);
public int getStoreCompactionPriority() {
    // HFile 数量越接近 blockingFileCount,Compact 的优先级更高
    // 否则,flush 的优先级更高,Compact 可以在 flush 后进行 
    int priority = blockingFileCount - storefiles.size();
    return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority;
}

条件三:Region 存在 midKey

byte[] midKey = ((HRegion)r).checkSplit();
if (midKey != null) {
    requestSplit(r, midKey);
    return true;
}

checkSplit() 方法,会使用 splitPolicy 进行判断

public byte[] checkSplit() {
    // META 表和 NAMESPACE 表不能被 split

    // Recovering 状态的表不能被 split

    // split 策略判断
    // 默认为 IncreasingToUpperBoundRegionSplitPolicy
    // 
    if (!splitPolicy.shouldSplit()) {
      return null;
    }

    // 获取 splitPoint,返回(如果不在当前 Region 返回 null)
    ...
  }

1. 调用 Split

Split 线程数由参数 hbase.regionserver.thread.split 决定,默认值为1

public synchronized void requestSplit(final Region r, byte[] midKey, User user) {

    // 线程池启动 SplitRequest
    this.splits.execute(new SplitRequest(r, midKey, this.server, user));
}

SplitRequest 实现逻辑,Split 的执行交给 SplitTransactionImpl

class SplitRequest implements Runnable {
    @Override
    public void run() {
        doSplitting(user);
    }

    private void doSplitting(User user) {
        // Split 事务
        SplitTransactionImpl st = new SplitTransactionImpl(parent, midKey);
        try {

            // 获取表的读锁 TableLock
            tableLock.acquire();

            // Split 执行类似两阶段提交,首先调用 prepare
            if (!st.prepare()) return;
            try {
                // Split 执行,execute
                st.execute(this.server, this.server, user);
            } catch (Exception e) {
                // Split 失败,执行 rollback
                if (st.rollback(this.server, this.server)) {
                    // ...
                }
            }
        } finally {
            // 释放表锁 TableLock
            releaseTableLock();
        }
    }
}

1.1 Split prepare

public boolean prepare() throws IOException {
    // 判断 parent 是否可以被 split
    // 1. parent 不是 close 或 closing 状态
    // 2. parent 没有 reference 引用(表示是另一个 Region split 的结果,还没有经过 Compact)
    if (!this.parent.isSplittable()) return false;

    // Split key 不能为 null 
    if (this.splitrow == null) return false;

    HRegionInfo hri = this.parent.getRegionInfo();
    parent.prepareToSplit();   // do nothing

    // Check splitrow in Region ...

    // 构造 regionId(时间戳,要大于 parent regionId)
    long rid = getDaughterRegionIdTimestamp(hri);

    // 创建两个 daughter regions 的 RegionInfo
    this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
    this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);

    // Split 状态机控制(用于 Rollback 等)
    transition(SplitTransactionPhase.PREPARED);

    return true;
}

1.2 Split execute

public PairOfSameType<Region> execute(final Server server,
      final RegionServerServices services, User user) throws IOException {

    // 创建 daughter regions
    PairOfSameType<Region> regions = createDaughters(server, services, user);

    // 上线 daughter region,修改ZK里面的信息
    regions = stepsAfterPONR(server, services, regions, user);

    // Split 状态机控制
    transition(SplitTransactionPhase.COMPLETED);

    return regions;
  }

创建子 Region,META表下线 parent,上线 daughter regions

PairOfSameType<Region> createDaughters(final Server server,
      final RegionServerServices services, User user) throws IOException {

    // 协处理器的钩子函数 和 Split 状态机状态更新...

    if (!testing && useZKForAssignment) {
      if (metaEntries == null || metaEntries.isEmpty()) {
        MetaTableAccessor.splitRegion(server.getConnection(),
          parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
          daughterRegions.getSecond().getRegionInfo(), server.getServerName(),
          parent.getTableDesc().getRegionReplication());
      } else {

        // 更新META信息,parent 和 daughter
        offlineParentInMetaAndputMetaEntries(server.getConnection(),
          parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
              .getSecond().getRegionInfo(), server.getServerName(), metaEntries,
              parent.getTableDesc().getRegionReplication());
      }
    } 

    return daughterRegions;
}

Open 新的 Region

public PairOfSameType<Region> stepsBeforePONR(final Server server,
      final RegionServerServices services, boolean testing) throws IOException {

    // 创建 Split 目录
    this.parent.getRegionFileSystem().createSplitsDir();

    // Close parent
    try{
      hstoreFilesToSplit = this.parent.close(false);
    } catch (Exception e) {
      exceptionToThrow = e;
    }

    if (!testing) {
      services.removeFromOnlineRegions(this.parent, null);
    }

    // 创建 Reference ...
    Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);

    Region a = this.parent.createDaughterRegionFromSplits(this.hri_a);
    Region b = this.parent.createDaughterRegionFromSplits(this.hri_b);

    return new PairOfSameType<Region>(a, b);
}

1.2 Split Rollback

Split 任务失败时,根据状态机的状态进行回滚

public boolean rollback(final Server server, final RegionServerServices services, User user)
  throws IOException {

    while (iterator.hasPrevious()) {

        switch(je.getPhase()) {

            case SET_SPLITTING:
            case CREATE_SPLIT_DIR:
            case CLOSED_PARENT_REGION:
            case STARTED_REGION_A_CREATION:
            case STARTED_REGION_B_CREATION:
            case OFFLINED_PARENT:
            case PONR:
            // ...
      }
    }

    // 协处理器处理 ...
    return result;
}

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注

19 + 13 =