HBase 1.2.0源码系列:BlockCache-LruBlockCache

HBase 提供了几种 BlockCache 方案:

  • LruBlockCache
  • SlabCache,0.92版本提供,在1.0版本后被废弃
  • BucketCache,0.95版本提供
  • ExternalBlockCache,1.10版本提供

这里介绍下 LruBlockCache 的源码实现,可以看到 BlockCahce 分为三级:

  • single:block 被第一次访问,则该 Block 被放在这一优先级队列中。
  • multi:如果一个 Block 被多次访问,则从 single 移到 multi 中。
  • in memory:in memory 由用户指定,在内存中常驻,一般不推荐,只用系统表才使用 in memory 优先级。

分级的好处在于:
首先,通过 in memory 类型缓存,将重要的数据放到 RegionServer 内存中常驻,例如 Meta 或者 namespace 的元数据信息。
其次,通过区分 single 和 multi 类型缓存,可以防止由于 scan 操作带来的 cache 频繁更替。
默认配置下,对于整个 BlockCache,按照以下百分比分配给 single、multi 和 in memory 使用:0.25、0.5和0.25。无论哪个区,都会采用严格的 Least-Recently-Used 算法淘汰机制,最少使用的 Block 会被替换,为新加载的 Block 预留空间。

LruBlockCache 和 SlabCache 组合使用为 DoubleBlockCache。
LruBlockCache 和 BlockCache 组合使用为 CombinedBlockCache。

public class LruBlockCache implements ResizableBlockCache, HeapSize {

    // 执行 evict 到此比例,0.95f
    static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";

    // 缓存大于此比例,执行 evict(没有正在执行的eviction线程),0.99f
    static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";

    // SINGLE 缓存比例 0.25f
    static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage";
    // MULTI 缓存比例 0.50f
    static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.multi.percentage";
    // MEMORY 缓存比例 0.25f
    static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.memory.percentage";

    // false
    static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = "hbase.lru.rs.inmemoryforcemode";

    static final float DEFAULT_LOAD_FACTOR = 0.75f;
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    static final int statThreadPeriod = 60 * 5;

    // 最大缓存大小,16L * 1024L * 1024L bytes
    private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";

    // 存储缓存,Concurrent map
    private final Map<BlockCacheKey,LruCachedBlock> map;

    // 锁,保证只有一个 eviction 在处理
    private final ReentrantLock evictionLock = new ReentrantLock(true);

    // eviction 状态
    private volatile boolean evictionInProgress = false;

    /** Eviction thread */
    private final EvictionThread evictionThread;

    /** Statistics thread schedule pool (for heavy debugging, could remove) */
    private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
    new ThreadFactoryBuilder().setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());

    /** Current size of cache */
    private final AtomicLong size;

    /** Current number of cached elements */
    private final AtomicLong elements;

    /** Cache access count (sequential ID) */
    private final AtomicLong count;

    /** Cache statistics */
    private final CacheStats stats;

    /** Overhead of the structure itself */
    private long overhead;

    // L2 缓存
    private BlockCache victimHandler = null;

    /**
     * 构造函数
     * @param maxSize 缓存最大大小,bytes
     * @param blockSize BlockSize, bytes
     * @param evictionThread 是否在BG执行 evict,true
     * @param mapInitialSize 缓存Map大小,(int)Math.ceil(1.2*maxSize/blockSize)
     * @param mapLoadFactor 缓存Map的负载因子
     * @param mapConcurrencyLevel 缓存Map的 concurrencyLevel
     * @param minFactor 执行 evict 减少缓存到此比例
     * @param acceptableFactor 触发 evict 的比例
     * @param singleFactor single 缓存比例
     * @param multiFactor multu 缓存比例
     * @param memoryFactor memory 缓存比例
     */
    public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
      int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
      float minFactor, float acceptableFactor, float singleFactor,
      float multiFactor, float memoryFactor, boolean forceInMemory, long maxBlockSize) {

        // single、multi、memory 不能为负,和为1

        // minFactor 小于 acceptableFactor,并且都小于1

        // 变量赋值 ...

        // 缓存Map初始化
        map = new ConcurrentHashMap<BlockCacheKey,LruCachedBlock>(mapInitialSize,
            mapLoadFactor, mapConcurrencyLevel);

        // 启动 Eviction 线程(可选)
        if(evictionThread) {
            this.evictionThread = new EvictionThread(this);
            this.evictionThread.start();
        } else {
            this.evictionThread = null;
        }

        // 缓存统计线程,默认每五分钟更新
        this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
            statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
    }

    // 缓存 Block
    @Override
    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
        final boolean cacheDataInL1) {

        // 缓存数据超过最大缓存大小
        if (buf.heapSize() > maxBlockSize) {
            return;
        }

        LruCachedBlock cb = map.get(cacheKey);
        if (cb != null) {
            // 缓存 Key 已经存在
            // 如果 content 不同,抛出异常
            // 如果 content 相同,返回
            // This is harmless and can happen in rare cases (see HBASE-8547)
            if (compare(buf, cb.getBuffer()) != 0) {
                throw new RuntimeException("Cached block contents differ, which should not have happened."
                + "cacheKey:" + cacheKey);
            }
            return;
        }

        cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
        // 更新 Metric,插入缓存Map
        long newSize = updateSizeMetrics(cb, false);
        map.put(cacheKey, cb);
        long val = elements.incrementAndGet();

        // 如果超过阈值,执行 eviction
        if (newSize > acceptableSize() && !evictionInProgress) {
            runEviction();
        }
    }

    // 获取 Block 缓存
    @Override
    public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
         boolean updateCacheMetrics) {

        LruCachedBlock cb = map.get(cacheKey);
        // 缓存不存在
        if (cb == null) {
            if (!repeat && updateCacheMetrics) stats.miss(caching, cacheKey.isPrimary());

            // 如果有另一个 BlockCache(L2)
            if (victimHandler != null && !repeat) {
                Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);

                // 缓存提升到 当前缓存(L1)
                if (result != null && caching) {
                    cacheBlock(cacheKey, result, /* inMemory = */ false, /* cacheData = */ true);
                }
                return result;
            }
            return null;
        }

        // 更新缓存状态(hit)
        if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary());
        cb.access(count.incrementAndGet());
        return cb.getBuffer();
    }

    // 淘汰缓存
    @Override
    public boolean evictBlock(BlockCacheKey cacheKey) {
        LruCachedBlock cb = map.get(cacheKey);
        if (cb == null) return false;
        evictBlock(cb, false);
        return true;
    }

    // 对指定 HFile 的所有缓存清空,需要遍历整个Map的数据
    @Override
    public int evictBlocksByHfileName(String hfileName) {
        int numEvicted = 0;
        for (BlockCacheKey key : map.keySet()) {
            if (key.getHfileName().equals(hfileName)) {
                if (evictBlock(key))
                ++numEvicted;
            }
        }
        if (victimHandler != null) {
            numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
        }
        return numEvicted;
    }

    // 淘汰缓存
    protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
        map.remove(block.getCacheKey());

        // ...

        if (evictedByEvictionProcess && victimHandler != null) {
            if (victimHandler instanceof BucketCache) {
                boolean wait = getCurrentSize() < acceptableSize();
                boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
                ((BucketCache)victimHandler).cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
                    inMemory, wait);
            } else {
                victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
            }
        }
        return block.heapSize();
    }

    // 直接执行或启动线程执行 eviction
    private void runEviction() {
        if(evictionThread == null) {
            evict();
        } else {
            evictionThread.evict();
        }
    }

    // 执行 Eviction.
    void evict() {

        // 获取锁,保证只有一个 eviction 在处理
        if(!evictionLock.tryLock()) return;

        try {
            evictionInProgress = true;
            long currentSize = this.size.get();
            long bytesToFree = currentSize - minSize();

            if(bytesToFree <= 0) return;

            // 实例化三个优先级的缓存
            BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,
                singleSize());
            BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,
                multiSize());
            BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,
                memorySize());

            // Scan entire map putting into appropriate buckets
            for(LruCachedBlock cachedBlock : map.values()) {
                switch(cachedBlock.getPriority()) {
                    case SINGLE: {
                        bucketSingle.add(cachedBlock);
                        break;
                    }
                    case MULTI: {
                        bucketMulti.add(cachedBlock);
                        break;
                    }
                    case MEMORY: {
                        bucketMemory.add(cachedBlock);
                        break;
                    }
                }
            }

            long bytesFreed = 0;
            if (forceInMemory || memoryFactor > 0.999f) {
                long s = bucketSingle.totalSize();
                long m = bucketMulti.totalSize();
                if (bytesToFree > (s + m)) {
                    // this means we need to evict blocks in memory bucket to make room,
                    // so the single and multi buckets will be emptied
                    bytesFreed = bucketSingle.free(s);
                    bytesFreed += bucketMulti.free(m);
                    bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);

                } else {
                    // this means no need to evict block in memory bucket,
                    // and we try best to make the ratio between single-bucket and
                    // multi-bucket is 1:2
                    long bytesRemain = s + m - bytesToFree;
                    if (3 * s <= bytesRemain) {
                        // single-bucket is small enough that no eviction happens for it
                        // hence all eviction goes from multi-bucket
                        bytesFreed = bucketMulti.free(bytesToFree);
                    } else if (3 * m <= 2 * bytesRemain) {
                        // multi-bucket is small enough that no eviction happens for it
                        // hence all eviction goes from single-bucket
                        bytesFreed = bucketSingle.free(bytesToFree);
                    } else {
                        // both buckets need to evict some blocks
                        bytesFreed = bucketSingle.free(s - bytesRemain / 3);
                        if (bytesFreed < bytesToFree) {
                            bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
                        }
                    }
                }
            } else {
                PriorityQueue<BlockBucket> bucketQueue =
                    new PriorityQueue<BlockBucket>(3);

                bucketQueue.add(bucketSingle);
                bucketQueue.add(bucketMulti);
                bucketQueue.add(bucketMemory);

                int remainingBuckets = 3;

                BlockBucket bucket;
                while((bucket = bucketQueue.poll()) != null) {
                    long overflow = bucket.overflow();
                    if(overflow > 0) {
                        long bucketBytesToFree = Math.min(overflow,
                            (bytesToFree - bytesFreed) / remainingBuckets);
                        bytesFreed += bucket.free(bucketBytesToFree);
                    }
                    remainingBuckets--;
                }
            }

        } finally {
            stats.evict();
            evictionInProgress = false;
            evictionLock.unlock();
        }
    }

    // BlockBucket
    private class BlockBucket implements Comparable<BlockBucket> {

        private LruCachedBlockQueue queue;

        public void add(LruCachedBlock block) {
            totalSize += block.heapSize();
            queue.add(block);
        }

        public long free(long toFree) {
            LruCachedBlock cb;
            long freedBytes = 0;
            while ((cb = queue.pollLast()) != null) {
                freedBytes += evictBlock(cb, true);
                if (freedBytes >= toFree) {
                    return freedBytes;
                }
            }

            return freedBytes;
        }

        public long overflow() {
            return totalSize - bucketSize;
        }
    }

    // ...
}


Add a Comment

电子邮件地址不会被公开。 必填项已用*标注