2019-01-03
HBase 1.2.0源码系列:BlockCache-LruBlockCache
HBase 提供了几种 BlockCache 方案:
- LruBlockCache
- SlabCache,0.92版本提供,在1.0版本后被废弃
- BucketCache,0.95版本提供
- ExternalBlockCache,1.10版本提供
这里介绍下 LruBlockCache 的源码实现,可以看到 BlockCahce 分为三级:
- single:block 被第一次访问,则该 Block 被放在这一优先级队列中。
- multi:如果一个 Block 被多次访问,则从 single 移到 multi 中。
- in memory:in memory 由用户指定,在内存中常驻,一般不推荐,只用系统表才使用 in memory 优先级。
分级的好处在于:
首先,通过 in memory 类型缓存,将重要的数据放到 RegionServer 内存中常驻,例如 Meta 或者 namespace 的元数据信息。
其次,通过区分 single 和 multi 类型缓存,可以防止由于 scan 操作带来的 cache 频繁更替。
默认配置下,对于整个 BlockCache,按照以下百分比分配给 single、multi 和 in memory 使用:0.25、0.5和0.25。无论哪个区,都会采用严格的 Least-Recently-Used 算法淘汰机制,最少使用的 Block 会被替换,为新加载的 Block 预留空间。
LruBlockCache 和 SlabCache 组合使用为 DoubleBlockCache。
LruBlockCache 和 BlockCache 组合使用为 CombinedBlockCache。
public class LruBlockCache implements ResizableBlockCache, HeapSize {
// 执行 evict 到此比例,0.95f
static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
// 缓存大于此比例,执行 evict(没有正在执行的eviction线程),0.99f
static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
// SINGLE 缓存比例 0.25f
static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage";
// MULTI 缓存比例 0.50f
static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.multi.percentage";
// MEMORY 缓存比例 0.25f
static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.memory.percentage";
// false
static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = "hbase.lru.rs.inmemoryforcemode";
static final float DEFAULT_LOAD_FACTOR = 0.75f;
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
static final int statThreadPeriod = 60 * 5;
// 最大缓存大小,16L * 1024L * 1024L bytes
private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
// 存储缓存,Concurrent map
private final Map<BlockCacheKey,LruCachedBlock> map;
// 锁,保证只有一个 eviction 在处理
private final ReentrantLock evictionLock = new ReentrantLock(true);
// eviction 状态
private volatile boolean evictionInProgress = false;
/** Eviction thread */
private final EvictionThread evictionThread;
/** Statistics thread schedule pool (for heavy debugging, could remove) */
private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
/** Current size of cache */
private final AtomicLong size;
/** Current number of cached elements */
private final AtomicLong elements;
/** Cache access count (sequential ID) */
private final AtomicLong count;
/** Cache statistics */
private final CacheStats stats;
/** Overhead of the structure itself */
private long overhead;
// L2 缓存
private BlockCache victimHandler = null;
/**
* 构造函数
* @param maxSize 缓存最大大小,bytes
* @param blockSize BlockSize, bytes
* @param evictionThread 是否在BG执行 evict,true
* @param mapInitialSize 缓存Map大小,(int)Math.ceil(1.2*maxSize/blockSize)
* @param mapLoadFactor 缓存Map的负载因子
* @param mapConcurrencyLevel 缓存Map的 concurrencyLevel
* @param minFactor 执行 evict 减少缓存到此比例
* @param acceptableFactor 触发 evict 的比例
* @param singleFactor single 缓存比例
* @param multiFactor multu 缓存比例
* @param memoryFactor memory 缓存比例
*/
public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
float minFactor, float acceptableFactor, float singleFactor,
float multiFactor, float memoryFactor, boolean forceInMemory, long maxBlockSize) {
// single、multi、memory 不能为负,和为1
// minFactor 小于 acceptableFactor,并且都小于1
// 变量赋值 ...
// 缓存Map初始化
map = new ConcurrentHashMap<BlockCacheKey,LruCachedBlock>(mapInitialSize,
mapLoadFactor, mapConcurrencyLevel);
// 启动 Eviction 线程(可选)
if(evictionThread) {
this.evictionThread = new EvictionThread(this);
this.evictionThread.start();
} else {
this.evictionThread = null;
}
// 缓存统计线程,默认每五分钟更新
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
}
// 缓存 Block
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
final boolean cacheDataInL1) {
// 缓存数据超过最大缓存大小
if (buf.heapSize() > maxBlockSize) {
return;
}
LruCachedBlock cb = map.get(cacheKey);
if (cb != null) {
// 缓存 Key 已经存在
// 如果 content 不同,抛出异常
// 如果 content 相同,返回
// This is harmless and can happen in rare cases (see HBASE-8547)
if (compare(buf, cb.getBuffer()) != 0) {
throw new RuntimeException("Cached block contents differ, which should not have happened."
+ "cacheKey:" + cacheKey);
}
return;
}
cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
// 更新 Metric,插入缓存Map
long newSize = updateSizeMetrics(cb, false);
map.put(cacheKey, cb);
long val = elements.incrementAndGet();
// 如果超过阈值,执行 eviction
if (newSize > acceptableSize() && !evictionInProgress) {
runEviction();
}
}
// 获取 Block 缓存
@Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
LruCachedBlock cb = map.get(cacheKey);
// 缓存不存在
if (cb == null) {
if (!repeat && updateCacheMetrics) stats.miss(caching, cacheKey.isPrimary());
// 如果有另一个 BlockCache(L2)
if (victimHandler != null && !repeat) {
Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
// 缓存提升到 当前缓存(L1)
if (result != null && caching) {
cacheBlock(cacheKey, result, /* inMemory = */ false, /* cacheData = */ true);
}
return result;
}
return null;
}
// 更新缓存状态(hit)
if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary());
cb.access(count.incrementAndGet());
return cb.getBuffer();
}
// 淘汰缓存
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
LruCachedBlock cb = map.get(cacheKey);
if (cb == null) return false;
evictBlock(cb, false);
return true;
}
// 对指定 HFile 的所有缓存清空,需要遍历整个Map的数据
@Override
public int evictBlocksByHfileName(String hfileName) {
int numEvicted = 0;
for (BlockCacheKey key : map.keySet()) {
if (key.getHfileName().equals(hfileName)) {
if (evictBlock(key))
++numEvicted;
}
}
if (victimHandler != null) {
numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
}
return numEvicted;
}
// 淘汰缓存
protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
map.remove(block.getCacheKey());
// ...
if (evictedByEvictionProcess && victimHandler != null) {
if (victimHandler instanceof BucketCache) {
boolean wait = getCurrentSize() < acceptableSize();
boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
((BucketCache)victimHandler).cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
inMemory, wait);
} else {
victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
}
}
return block.heapSize();
}
// 直接执行或启动线程执行 eviction
private void runEviction() {
if(evictionThread == null) {
evict();
} else {
evictionThread.evict();
}
}
// 执行 Eviction.
void evict() {
// 获取锁,保证只有一个 eviction 在处理
if(!evictionLock.tryLock()) return;
try {
evictionInProgress = true;
long currentSize = this.size.get();
long bytesToFree = currentSize - minSize();
if(bytesToFree <= 0) return;
// 实例化三个优先级的缓存
BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,
singleSize());
BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,
multiSize());
BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,
memorySize());
// Scan entire map putting into appropriate buckets
for(LruCachedBlock cachedBlock : map.values()) {
switch(cachedBlock.getPriority()) {
case SINGLE: {
bucketSingle.add(cachedBlock);
break;
}
case MULTI: {
bucketMulti.add(cachedBlock);
break;
}
case MEMORY: {
bucketMemory.add(cachedBlock);
break;
}
}
}
long bytesFreed = 0;
if (forceInMemory || memoryFactor > 0.999f) {
long s = bucketSingle.totalSize();
long m = bucketMulti.totalSize();
if (bytesToFree > (s + m)) {
// this means we need to evict blocks in memory bucket to make room,
// so the single and multi buckets will be emptied
bytesFreed = bucketSingle.free(s);
bytesFreed += bucketMulti.free(m);
bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
} else {
// this means no need to evict block in memory bucket,
// and we try best to make the ratio between single-bucket and
// multi-bucket is 1:2
long bytesRemain = s + m - bytesToFree;
if (3 * s <= bytesRemain) {
// single-bucket is small enough that no eviction happens for it
// hence all eviction goes from multi-bucket
bytesFreed = bucketMulti.free(bytesToFree);
} else if (3 * m <= 2 * bytesRemain) {
// multi-bucket is small enough that no eviction happens for it
// hence all eviction goes from single-bucket
bytesFreed = bucketSingle.free(bytesToFree);
} else {
// both buckets need to evict some blocks
bytesFreed = bucketSingle.free(s - bytesRemain / 3);
if (bytesFreed < bytesToFree) {
bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
}
}
}
} else {
PriorityQueue<BlockBucket> bucketQueue =
new PriorityQueue<BlockBucket>(3);
bucketQueue.add(bucketSingle);
bucketQueue.add(bucketMulti);
bucketQueue.add(bucketMemory);
int remainingBuckets = 3;
BlockBucket bucket;
while((bucket = bucketQueue.poll()) != null) {
long overflow = bucket.overflow();
if(overflow > 0) {
long bucketBytesToFree = Math.min(overflow,
(bytesToFree - bytesFreed) / remainingBuckets);
bytesFreed += bucket.free(bucketBytesToFree);
}
remainingBuckets--;
}
}
} finally {
stats.evict();
evictionInProgress = false;
evictionLock.unlock();
}
}
// BlockBucket
private class BlockBucket implements Comparable<BlockBucket> {
private LruCachedBlockQueue queue;
public void add(LruCachedBlock block) {
totalSize += block.heapSize();
queue.add(block);
}
public long free(long toFree) {
LruCachedBlock cb;
long freedBytes = 0;
while ((cb = queue.pollLast()) != null) {
freedBytes += evictBlock(cb, true);
if (freedBytes >= toFree) {
return freedBytes;
}
}
return freedBytes;
}
public long overflow() {
return totalSize - bucketSize;
}
}
// ...
}