HBase 1.2.0源码系列:MemStoreFlusher流程

Memstore Flush 流程

为了减少 flush 过程对读写的影响,HBase 采用了类似于两阶段提交的方式,将整个 flush 过程分为三个阶段:

prepare 阶段:遍历所有 Memstore,将 Memstore 中当前数据集 kvset 做一个快照 snapshot,然后再新建一个新的 kvset。后期的所有写入操作都会写入新的 kvset 中,而整个 flush 阶段读操作会首先分别遍历 kvset 和 snapshot,如果查找不到再会到 HFile 中查找。prepare 阶段需要加一把 updateLock 对写请求阻塞,结束之后会释放该锁。因为此阶段没有任何费时操作,因此持锁时间很短。

flush 阶段:遍历所有 Memstore,将 prepare 阶段生成的 snapshot 持久化为临时文件,临时文件会统一放到目录.tmp下。这个过程因为涉及到磁盘IO操作,因此相对比较耗时。

commit 阶段:遍历所有的 Memstore,将 flush 阶段生成的临时文件移到指定的 ColumnFamily 目录下,针对 HFile 生成对应的 storefile 和 Reader,把 storefile 添加到 HStore 的 storefiles 列表中,最后再清空 prepare 阶段生成的 snapshot。

日志分析

/******* MemStoreFlush初始化阶段 ********/
2018-07-06 09:39:24,880 INFO [regionserver/host/ip:16020] regionserver.MemStoreFlusher: globalMemStoreLimit=1.5 G, globalMemStoreLimitLowMark=1.5 G, maxHeap=3.8 G

/******* prepare阶段 ********/
2018-07-06 18:33:31,329 INFO [MemStoreFlusher.1] regionserver.HRegion: Started memstore flush for [table],,1528539945017.80ab9764ae70fa97b75057c376726653., current region memstore size 21.73 MB, and 1/1 column families' memstores are being flushed.

/******* flush阶段 ********/
2018-07-06 18:33:31,696 INFO [MemStoreFlusher.1] regionserver.DefaultStoreFlusher: Flushed, sequenceid=40056, memsize=21.7 M, hasBloomFilter=true, into tmp file hdfs://ns/hbase/data/default/[table]/80ab9764ae70fa97b75057c376726653/.tmp/f71e7e8c15774da683bdecaf7cf6cb99

/******* commit阶段 ********/
2018-07-06 18:33:31,718 INFO [MemStoreFlusher.1] regionserver.HStore: Added hdfs://ns/hbase/data/default/[table]/80ab9764ae70fa97b75057c376726653/d/f71e7e8c15774da683bdecaf7cf6cb99, entries=119995, sequenceid=40056, filesize=7.3 M

 

源码分析

1. MemStoreFlusher初始化

public MemStoreFlusher(final Configuration conf, 
      final HRegionServer server) {
    ... 
    // hbase.server.thread.wakefrequency,检查 MemStore 的线程周期
    this.threadWakeFrequency =
      conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);

    // 获取JVM使用内存
    long max = -1L; 
    final MemoryUsage usage = HeapMemorySizeUtil.safeGetHeapMemoryUsage();
    if (usage != null) {
        max = usage.getMax();
    } 

    float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
    // 全部的 MemStore 占用超过 heap 的 upperLimit 和 lowerLimit
    this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
    this.globalMemStoreLimitLowMarkPercent =
      HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
    this.globalMemStoreLimitLowMark =
      (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
    
    // flush 阻塞时间,如果调低会加快 flush 速度,但是 Compact 需要配个,否则文件会越来越多
    this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
      90000);
    // flush 的线程数, 线程数越多, 增加 HDFS 的负载
    int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
    this.flushHandlers = new FlushHandler[handlerCount];
    
}

 

2. Flush 启动

public class HRegionServer {
    private void startServiceThreads() {
        ...
        this.cacheFlusher.start(uncaughtExceptionHandler);
    }
}

public class MemStoreFlusher {
    synchronized void start(UncaughtExceptionHandler eh) {
        ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
            server.getServerName().toShortString() + "-MemStoreFlusher", eh);
            for (int i = 0; i < flushHandlers.length; i++) {
                flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
                flusherThreadFactory.newThread(flushHandlers[i]);
                flushHandlers[i].start();
            }
        }
}

 

2.1 FlushHandler 多线程执行 flush
private final BlockingQueue<FlushQueueEntry> flushQueue =
      new DelayQueue<FlushQueueEntry>(); // 无界的BlockingQueue

private class FlushHandler extends HasThread {

    @Override
    public void run() {
        while (!server.isStopped()) {
            FlushQueueEntry fqe = null;
            try {
                // allow someone to wake us up again
                wakeupPending.set(false); 
                
                // 从队列中取出一个 flush request,如果 flushQueue 队列中没有值阻塞
                fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); 
                if (fqe == null || fqe instanceof WakeupFlushThread) {
                    // 如果没有 flush request 或者 flush request 是一个全局 flush 的 request
                    if (isAboveLowWaterMark()) {
                        // 检查所有的 memstore 是否超过 max_heap * hbase.regionserver.global.memstore.lowerLimit(0.35)
                        // 超过配置的最小 memstore 的值,flush 最大的一个 memstore 的 region
                        if (!flushOneForGlobalPressure()) {
                            // 如果没有任何 Region 需要 flush,但已经超过了 lowerLimit。
                            // 这种情况不太可能发生,除非可能会在关闭整个服务器时发生,即有另一个线程正在执行 flush regions
                            // 只里只需要 sleep 一下,然后唤醒任何被阻塞的线程再次检查
                            // HRegionServer 执行数据更新的相关方法如果发现 memstore 的总和超过配置的最大值时
                            // 会wait更新线程,等待 flush
                            Thread.sleep(1000);
                            wakeUpIfBlocking();
                        }     

                        // 发起另一个唤醒的全局 flush request,生成 WakeupFlushThread 的 request
                        wakeupFlushThread();
                    }
                    continue;
                } 

                // 如果是正常的 flush request
                // 单个 region memstore 大小超过 hbase.hregion.memstore.flush.size 配置的值,默认128M,执行 flush 操作
                FlushRegionEntry fre = (FlushRegionEntry) fqe;
                if (!flushRegion(fre)) {
                    break;
                }
            } catch (Exception ex) {
                ...
            }
        }
        // 结束 MemStoreFlusher 的线程调用,通常是 regionserver stop
        synchronized (regionsInQueue) {
            regionsInQueue.clear();
            flushQueue.clear();
        }

        // 通知其他线程
        wakeUpIfBlocking();
    }
}

 

2.2 取出所有 Region 中 MemStore 最大的一个 Region,并执行 flush 操作
private boolean flushOneForGlobalPressure() {
    // 取出所有 Region,以 Size 排序
    SortedMap<Long, Region> regionsBySize = server.getCopyOfOnlineRegionsSortedBySize();
    Set<Region> excludedRegions = new HashSet<Region>();

    // 2.0版本 Replica 新增
    // 如果最大的 replica region 的 memstore 已经超过了最大的主region memstore的内存的4倍,就主动触发一次StoreFile Refresher去更新文件列表
    // 即获取hbase.region.replica.storefile.refresh.memstore.multiplier
    double secondaryMultiplier
      = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);

    boolean flushedOne = false;
    while (!flushedOne) {
        // 是按region的memstore的大小从大到小排序组成。取出满足以下条件的最大的memstore的region
        // 如果都不满足,返回null

        // bestFlushableRegion:
        // 1.region的writestate.flushing==false && writestate.writesEnabled==true
        // 2.region中所有的store中的storefile的个数小于hbase.hstore.blockingStoreFiles配置的值,默认为7
        Region bestFlushableRegion = getBiggestMemstoreRegion(regionsBySize, excludedRegions, true);

        // bestAnyRegion:
        // 1.region的writestate.flushing==false && writestate.writesEnabled==true
        // 此处不检查region中是否有store的文件个数超过指定的配置值。
        Region bestAnyRegion = getBiggestMemstoreRegion(regionsBySize, excludedRegions, false);

        // bestRegionReplica:
        // 1.region的replicaId!=0
        Region bestRegionReplica = getBiggestMemstoreOfRegionReplica(regionsBySize, excludedRegions);

        // 如果没有拿到bestAnyRegion或bestRegionReplica,表示没有需要flush的region
        if (bestAnyRegion == null && bestRegionReplica == null) {
            return false;
        }

        Region regionToFlush;
        if (bestFlushableRegion != null &&
          bestAnyRegion.getMemstoreSize() > 2 * bestFlushableRegion.getMemstoreSize()) {

        // 得到最需要进行flush的region
        // 如果bestAnyRegion(memstore最大的region的region)memory使用大小 
        // 超过bestFlushableRegion(storefile个数没有超过配置的memstore最大的region)的memory大小的2倍
        // 优先flush掉此region的memstore,这里的设计为了防止在低压下做非常多的小flush,导致compaction

        // 代码注释:
        // Even if it's not supposed to be flushed, pick a region if it's more than twice as big as the best flushable one 
        // otherwise when we're under pressure we make lots of little flushes and cause lots of compactions, etc, which just makes life worse!

        regionToFlush = bestAnyRegion;
    } else {
        if (bestFlushableRegion == null) {
            // 如果要flush的region中没有一个region的storefile个数没有超过配置的值
            // 即所有region中都有store的file个数超过了配置的store最大storefile个数,优先flush掉memstore的占用最大的region
            regionToFlush = bestAnyRegion;
        } else {
            /**
             * 如果要flush的region中,有Region的Store还没有超过配置的最大StoreFile个数,优先flush这个Region
             * 目的是为了减少一小部分Region数据写入过热,compact太多,而数据写入较冷的region一直没有被flush
             */
            regionToFlush = bestFlushableRegion
        }
    }

    // ...

    if (regionToFlush == null ||
         (bestRegionReplica != null &&
           ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
        (bestRegionReplica.getMemstoreSize()
          > secondaryMultiplier * regionToFlush.getMemstoreSize()))) {

        /**
         * 开启Replica的逻辑
         * RegionReplica存在,并且Replica的Size大于最大的主region memstore的内存的n倍
         * 触发一次StoreFile Refresher去更新文件列表
         * 
         * 参考replica memstore过大导致写阻塞的问题
         */
        flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);

        if (!flushedOne) { // always false
            excludedRegions.add(bestRegionReplica);
        } else {
            /**
             * 执行flush操作,设置全局flush的标识为true
             * 如果flush操作出现错误,需要把此region添加到excludedRegions列表中,
             * 表示这次flush一个region的行为中跳过此region,找下一个memstore最大的region进行flush
             */
            flushedOne = flushRegion(regionToFlush, true, true);

            if (!flushedOne) {
                excludedRegions.add(regionToFlush);
            }
        }
    }
    return true;
}

 

2.3 flush region

Region 数据落盘

// 第二个参数true表示全局flush,否则表示region的memstore达到指定大小
private boolean flushRegion(final Region region, final boolean emergencyFlush,
      boolean forceFlushAllStores) {
    synchronized (this.regionsInQueue) {
        // 从regionsInQueue列表中移出此region,并得到region的flush请求
        FlushRegionEntry fqe = this.regionsInQueue.remove(region);
        if (fqe != null && emergencyFlush) {
            // 如果是全局的flush请求,从flushQueue队列中移出此flush请求
            flushQueue.remove(fqe);
        }
    }

    lock.readLock().lock(); // Add ReadLock
    try {
        // 执行HRegion.flushcache操作,返回true表示需要做compact,否则表示不需要发起compact请求
        notifyFlushRequest(region, emergencyFlush);
        FlushResult flushResult = region.flush(forceFlushAllStores);
        boolean shouldCompact = flushResult.isCompactionNeeded();

        /**
         * 检查是否需要进行split操作,以下条件不做split:
         * a.如果是meta表,不做split操作。
         * b.如果region配置有distributedLogReplay,同时region在open后,还没有做replay,isRecovering=true
         * c.splitRequest的值为false,true表示通过client调用过regionServer.splitregion操作。
         * d.如果c为false,同时当前region中有store的大小不超过hbase.hregion.max.filesize的配置值,默认10g
         * 或者不超过了hbase.hregion.memstore.flush.size配置的值,默认为128m
         * 此region所在的table在当前rs中的所有region个数 * 此region所在的table在当前rs中的所有region个数
         * e.如果c为false,或者store中有storefile的类型为reference,也就是此storefile引用了另外一个storefile
         * f.如果cde的检查结果为true,同时client发起过split请求,如果client发起请求时指定了在具体的split row时
         * 但此row在当前region中并不存在,不需要做split
         * g.以上检查都是相反的值时,此时需要做split操作。
         */
        boolean shouldSplit = ((HRegion)region).checkSplit() != null;
        if (shouldSplit) {
            // 如果需要进行region的split操作,发起split请求
            this.server.compactSplitThread.requestSplit(region);
        } else if (shouldCompact) {
            // 如果需要做compact发起一个系统的compact请求
            server.compactSplitThread.requestSystemCompaction(
              region, Thread.currentThread().getName());
        }
    } catch (DroppedSnapshotException ex) {
        server.abort("Replay of WAL required. Forcing server shutdown", ex);
        return false;
    } catch (IOException ex) {
        
        if (!server.checkFileSystem()) {
            return false;
        }
    } finally {
        lock.readLock().unlock();
        wakeUpIfBlocking();
    }
    return true;
}

 

2.3 refreshStoreFilesAndReclaimMemory

开启Replica的逻辑
RegionReplica存在,并且Replica的Size大于最大的主region memstore的内存的n倍,触发一次StoreFile Refresher去更新文件列表
参考replica memstore过大导致写阻塞的问题

private boolean refreshStoreFilesAndReclaimMemory(Region region) {
    try {
        return region.refreshStoreFiles();
    } catch (IOException e) {
        LOG.warn("Refreshing store files failed with exception", e);
    }
    return false;
}

 

3. 其他

replica memstore 过大导致写阻塞的问题
replica region 中 memstore 是不会主动 flush 的,只有收到主 region 的 flush 操作,才会去 flush。
同一台 RegionServer 上可能有一些 region replica 和其他的主 region 同时存在。
这些 replica 可能由于复制延迟(没有收到 flush marker),或者主 region 没有发生 flush,导致一直占用内存不释放。
这会造成整体的内存超过水位线,导致正常的写入被阻塞。
为了防止这个问题的出现,HBase 中有一个参数叫做 hbase.region.replica.storefile.refresh.memstore.multiplier,默认值是4。
这个参数的意思是说,如果最大的 replica region 的 memstore 已经超过了最大的主 region memstore 的内存的4倍,就主动触发一次 StoreFile Refresher 去更新文件列表,如果确实发生了 flush,那么 replica 内存里的数据就能被释放掉。
但是,这只是解决了 replication 延迟导致的未 flush 问题,如果这个 replica 的主 region 确实没有 flush 过,内存还是不能被释放。写入阻塞还是会存在。

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注

11 + 9 =