Hbase Region Split compaction 过程分析以及调优

Hbase Region Split compaction 过程分析以及调优

Hbase 写入流程

要想了解 hbase 的 Compact 和 Split 功能必须先从 hbase 的写入过程说起,以 hbase 的 multi RPC 调用为入口(客户端多个 PUT 实际会调用到该方法),其具体处理逻辑如下:

如上图,任何一次写请求都有可能触发 split 或者是 compaction,下面将就上图各个模块的角色做详细描述,首先看 RsRpcServices,该类实现所有 RegionServer 的 RPC 调用方法。

RSRpcServices

实现 regionserver 的 rpc 调用,在入口处会做基于用户维度的 quota 控制(主要包括读写频率,数据量控制等),这些校验通过后会调用类 HRegion 相关方法。

HRegion

写请求达到 HRegion 后,hregion 首先会加行锁,然后进行 checkResource 操作,在 checkResource操作里主要检查 memstoreSize 是否大于 blockingMemStoreSize,其中 blockingMemStoreSize 由等于 memstoreFlushSize*hbase.hregion.memstore.block.multiplier,

hbase.hregion.memstore.block.multiplier默认值是 4,而参数 memstoreFlushSize 大小由参数 hbase.hregion.memstore.flush.size 指定,该值默认值是 128M,hbase.hregion.memstore.block.multiplier 设置的太大在写入量大的时候很可能会导致机器内存耗尽而引发 OutofMem 错误,如果当前 memstoreSize 比 blockingMemStoreSize 大,那么抛出 RegionTooBusyException 异常,此时客户端会进行重试,同时并请求刷 memstore 动作,请求刷 memstore 动作本质是向 MemStoreFlusher 的队列里添加刷内存请求,如果 checkResource 通过,那么将把数据写进 MemStore 并写 WAL,在写 WAL(看用户策略是否需要)成功后更新 mvcc 版本号数据才对外可见。

MemStoreFlusher

MemStoreFlusher 的职责是接受 Hregion 的刷新请求并调度该请求,在 MemStoreFlusher 内部有一个 flushQueue 队列,消费此队列的是 FlushHandler,FlushHandler 的个数由参数hbase.hstore.flusher.count 设定(默认值是 2),FlushHandler 从队列 flushQueue 取出需要刷新的请求,从队列里取请求超时时间是参数:hbase.server.thread.wakefrequency 控制该值默认是 10 秒,FlushHandler 线程在处理的时候如果当前队列为空或者需要立刻刷新,首先会检查当前内存水位,具体示意图如下:

FlushHandler 在处理 flush 请求的时候,会查看当前 region 的 hfile 是否过多,如果过多就会暂停 flush 而进行 compact 操作并阻塞一定时间后才进行 flush 操作,阻塞文件参数由参数 hbase.hstore.blockingStoreFiles 控制该值默认为 7,阻塞时间由参数 hbase.hstore.blockingWaitTime 控制,该值默认为 90000 毫秒。如果非阻塞则会查看是否需要 split,如果需要 split 则会发送 split 请求。

Split 策略

hbase 有两种分裂策略(Hbase-1.1.3),分别是

IncreasingToUpperBoundRegionSplitPolicy

该方法中的分配策略,是根据 table 中 region 的个数平方,乘以 memstore 的大小。得出应当切分的大小。假设 memstore size 配置为 128M,则在 memstore 第一次刷入 HFile 数据时,进行第一次 split,1 1 128M = 128M。

当 region 数达到 2 个时,2 2 128M = 512M。

当 region 数达到 3 个时,3 3 128M = 1152M。

依此类推

DisabledRegionSplitPolicy

禁止 Split

hbase 的分裂策略可以通过表的属性 SPLIT_POLICY 指定,也可以通过 hbase-site.xml 全局指派,参数为:hbase.regionserver.region.split.policy,默认为IncreasingToUpperBoundRegionSplitPolicy。

Split

Hbase 的 split 主要实现在类 SplitTransactionImpl,中默认的分裂点是当前 region 的中间 key 一分为而,当一个 splitrequest 加入到 CompactSplitThread 的 splits 队列后,由 split 线程负责 split,在研究 split 之前先看下 CompactSplitThread 类的结构,其内部结构如下:

在 CompactSplitThread 内部维护了三个队列,分别是 longcompaction、shortcompact 和 splt 合并相关的下面在介绍,split 队列有一个调度线程池来消费该队列,handler 个数默认为 1,split 最终会调用到 SplitTransactionImpl,hbase 的 split 详细过程如下:

split 相关的参数主要有 hbase.regionserver.thread.split,执行 split 的线程数,默认为 1,

hbase.regionserver.regionSplitLimit 当前 regionserver 的 region 个数最大值,如果当前 regionserver 的 region 个数超过该值,那么将不会在进行 split 操作。

Compaction

通过上面的分析知道 Hbase 的 compaction 在 CompactSplitThread 中分 longCompation 和 smallCompaction,而触发 compaction 除了每次 flush 内存之后还有入口外部 rpc 调用和 regionserver 的周期性检查,regionserver 周期性检查的频率由参数 hbase.server.thread.wakefrequency 控制,默认是 10 秒,而是否决定 compaction 操作则需要根据 compaction 的策略而定,hbase 的 compaction 策略即是挑选需要合并的 hfile,挑选的原则是文件数不能太多、不能太少、文件大小不能太大等等,最理想的情况是,选取那些承载 IO 负载重、文件小的文件集,实际实现中,HBase 提供了多个文件选取算法:RatioBasedCompactionPolicy、ExploringCompactionPolicy 和 StripeCompactionPolicy 等,用户也可以通过特定接口实现自己的 Compaction 算法,Hbase 默认的策略是 ExploringCompactionPolicy,不过可以通过参数修改为适合自己的挑选策略,对应的参数为 hbase.hstore.defaultengine.compactionpolicy.class,在默认的 ExploringCompactionPolicy 挑选算法中判断是否需要合并的逻辑是当前 hfile 的个数是否大于参数 hbase.hstore.compaction.min 设定的值,该值默认为 3,当 compaction 请求到达 CompactSplitThread 后具体是放入 long 还是 small 队列,具体由以下原因决定:如果 selectNow 为 false,即系统自身引发的合并,比如 MemStore flush、compact 检查线程等,统一放入到 shortCompactions 中,即放入 small pool  , 而如果是人为触发的,比如 HBase shell,则还要看 HStore 中合并请求大小是否超过阈值,超过则放入 longCompactions,即 large pool,否则还是 small pool,综上,hbase 的 compaction 简化流程图如下:

1、Memstore Flush: 应该说 compaction 操作的源头就来自 flush 操作,memstore flush 会产生 HFile 文件,文件越来越多就需要 compact。因此在每次执行完 Flush 操作之后,都会对当前 Store 中的文件数进行判断,一旦文件数# > ,就会触发 compaction。需要说明的是,compaction 都是以 Store 为单位进行的,而在 Flush 触发条件下,整个 Region 的所有 Store 都会执行 compact,所以会在短时间内执行多次 compaction。

2、 后台线程周期性检查: 后台线程 CompactionChecker 定期触发检查是否需要执行 compaction,检查周期为:hbase.server.thread.wakefrequency*hbase.server.compactchecker.interval.multiplier。和 flush 不同的是,该线程优先检查文件数#是否大于,一旦大于就会触发 compaction。如果不满足,它会接着检查是否满足 major compaction 条件,简单来说,如果当前 store 中 hfile 的最早更新时间早于某个值 mcTime,就会触发 major compaction,HBase 预想通过这种机制定期删除过期数据。上文 mcTime 是一个浮动值,浮动区间默认为[7-7*0.2,7 7*0.2],其中 7 为 hbase.hregion.majorcompaction,0.2 为 hbase.hregion.majorcompaction.jitter,可见默认在 7 天左右就会执行一次 major compaction。用户如果想禁用 major compaction,只需要将参数 hbase.hregion.majorcompaction 设为 0。

3、手动触发:一般来讲,手动触发 compaction 通常是为了执行 major compaction,原因有三,其一是因为很多业务担心自动 major compaction 影响读写性能,因此会选择低峰期手动触发;其二也有可能是用户在执行完 alter 操作之后希望立刻生效,执行手动触发 major compaction;其三是 HBase 管理员发现硬盘容量不够的情况下手动触发 major compaction 删除大量过期数据;无论哪种触发动机,一旦手动触发,HBase 会不做很多自动化检查,直接执行合并。

选择合适 HFile 合并

选择合适的文件进行合并是整个 compaction 的核心,因为合并文件的大小以及其当前承载的 IO 数直接决定了 compaction 的效果。最理想的情况是,这些文件承载了大量 IO 请求但是大小很小,这样 compaction 本身不会消耗太多 IO,而且合并完成之后对读的性能会有显著提升。然而现实情况可能大部分都不会是这样,在 0.96 版本和 0.98 版本,分别提出了两种选择策略,在充分考虑整体情况的基础上选择最佳方案。无论哪种选择策略,都会首先对该 Store 中所有 HFile 进行一一排查,排除不满足条件的部分文件:

1、 排除当前正在执行 compact 的文件及其比这些文件更新的所有文件(SequenceId 更大)

2、排除某些过大的单个文件,如果文件大小大于 hbase.hzstore.compaction.max.size( 默认 Long 最大值 ),则被排除,否则会产生大量 IO 消耗,经过排除的文件称为候选文件,HBase 接下来会再判断是否满足 major compaction 条件,如果满足,就会选择全部文件进行合并。

判断条件有下面三条,只要满足其中一条就会执行 major compaction:

1、用户强制执行 major compaction

2、 长时间没有进行 compact(CompactionChecker 的判断条件 2)且候选文件数小于 hbase.hstore.compaction.max(默认 10)

3、Store 中含有 Reference 文件,Reference 文件是 split region 产生的临时文件,只是简单的引用文件,一般必须在 compact 过程中删除,如果不满足 major compaction 条件,就必然为 minor compaction,HBase 主要有两种 minor 策略:RatioBasedCompactionPolicy 和 ExploringCompactionPolicy,下面分别进行介绍:

 RatioBasedCompactionPolicy

 从老到新逐一扫描所有候选文件,满足其中条件之一便停止扫描:

 (1)当前文件大小 < 比它更新的所有文件大小总和 * ratio,其中 ratio 是一个可变的比例,在高峰期时 ratio 为 1.2,非高峰期为 5,也就是非高峰期允许 compact 更大的文件。那什么时候是高峰期,什么时候是非高峰期呢?用户可以配置参数 hbase.offpeak.start.hour 和 hbase.offpeak.end.hour 来设置高峰期

 (2)当前所剩候选文件数 <= hbase.store.compaction.min(默认为 3)

停止扫描后,待合并文件就选择出来了,即为当前扫描文件 比它更新的所有文件

 ExploringCompactionPolicy

 该策略思路基本和 RatioBasedCompactionPolicy 相同,不同的是,Ratio 策略在找到一个合适的文件集合之后就停止扫描了,而 Exploring 策略会记录下所有合适的文件集合,并在这些文件集合中寻找最优解。最优解可以理解为:待合并文件数最多或者待合并文件数相同的情况下文件大小较小,这样有利于减少 compaction 带来的 IO 消耗。

挑选合适的线程池

 HBase 实现中有一个专门的线程 CompactSplitThead 负责接收 compact 请求以及 split 请求,而且为了能够独立处理这些请求,这个线程内部构造了多个线程池:largeCompactions、smallCompactions 以及 splits 等,其中 splits 线程池负责处理所有的 split 请求,largeCompactions 和 smallCompaction 负责处理所有的 compaction 请求,其中前者用来处理大规模 compaction,后者处理小规模 compaction。这里需要明白三点:

 1、上述设计目的是为了能够将请求独立处理,提供系统的处理性能。

 2、哪些 compaction 应该分配给 largeCompactions 处理,哪些应该分配给 smallCompactions 处理?是不是 Major Compaction 就应该交给 largeCompactions 线程池处理?不对。这里有个分配原则:待 compact 的文件总大小如果大于值 throttlePoint(可以通过参数 hbase.hregion.majorcompaction 配置, 默认为 2.5G),分配给 largeCompactions 处理,否则分配给 smallCompactions 处理。

 3、largeCompactions 线程池和 smallCompactions 线程池默认都只有一个线程,用户可以通过参数 hbase.regionserver.thread.compaction.large 和 hbase.regionserver.thread.compaction.small 进行配置

执行 HFile 文件合并

上文一方面选出了待合并的 HFile 集合,一方面也选出来了合适的处理线程,万事俱备,只欠最后真正的合并。合并流程说起来也简单,主要分为如下几步:

 1、分别读出待合并 hfile 文件的 KV,并顺序写到位于./tmp 目录下的临时文件中

 2、将临时文件移动到对应 region 的数据目录

 3、将 compaction 的输入文件路径和输出文件路径封装为 KV 写入 WAL 日志,并打上 compaction 标记,最后强制执行 sync

 4、将对应 region 数据目录下的 compaction 输入文件全部删除

上述四个步骤看起来简单,但实际是很严谨的,具有很强的容错性和完美的幂等性:

 1、 如果 RS 在步骤 2 之前发生异常,本次 compaction 会被认为失败,如果继续进行同样的 compaction,上次异常对接下来的 compaction 不会有任何影响,也不会对读写有任何影响。唯一的影响就是多了一份多余的数据。

 2、如果 RS 在步骤 2 之后、步骤 3 之前发生异常,同样的,仅仅会多一份冗余数据。

 3、如果在步骤 3 之后、步骤 4 之前发生异常,RS 在重新打开 region 之后首先会从 WAL 中看到标有 compaction 的日志,因为此时输入文件和输出文件已经持久化到 HDFS,因此只需要根据 WAL 移除掉 compaction 输入文件即可

如上即使 HbaseFlush 内存以及 split 和合并的大致流程,希望对有用到的同学有一定的帮助。

updatedupdated2024-08-252024-08-25