Flink DataStream 状态和容错 二:Checkpoint 和 StateBackends

Checkpoint

Flink 中的 State 在上一篇中介绍过,为了使 State 容错,需要有 State checkpoint(状态检查点)。Checkpoint 允许 Flink 恢复流的 State 和处理位置,从而为程序提供与无故障执行相同的语义。Checkpoint 机制在 Flink 容错机制 中有更详细介绍。

Checkpoint 使用的先决条件:

  1. 一个持久化的,能够在一定时间范围内重放记录的数据源。例如,持久化消息队列:Apache Kafka,RabbitMQ,Amazon Kinesis,Google PubSub 或文件系统:HDFS,S3,GFS,NFS,Ceph…
  2. State 持久化存储系统,通常是分布式文件系统:HDFS,S3,GFS,NFS,Ceph…

启用和配置

Checkpoint 默认情况下是不启用的。StreamExecutionEnvironment 对象调用 enableCheckpointing(n) 启用 Checkpoint,其中n是以毫秒为单位的 Checkpoint 间隔。

Checkpoint 的配置项包括:

  • 恰好一次(exactly-once)或至少一次(at-least-once):Checkpoint 支持这两种模式。对于大多数应用来说,恰好一次是优选的。至少一次可能在某些要求超低延迟(几毫秒)的应用程序使用。

  • Checkpoint 超时时间:在超时时间内 checkpoint 未完成,则中止正在进行的 checkpoint。

  • Checkpoint 最小间隔时间(毫秒):如果设置为5000,表示在上一个 checkpoint 完成后的至少5秒后才会启动下一个 checkpoint,不论 checkpoint 的持续时间和间隔是多少。即使 checkpoint 间隔永远不会小于此参数。是为了保证 checkpoint 之间能够完成一定量的数据处理工作。

    配置 time between checkpoint 相比配置 checkpoint interval 通常更容易。因为 checkpoint 耗时有时会明显比平时更长,time between checkpoint 更不容易收到影响(例如,目标存储系统临时性的响应缓慢)

    这个值还意味着并发 checkpoint 的数量是一个

  • Checkpoint 并发数:默认情况下,当一个 checkpoint 处于运行状态时,系统不会触发另一个 checkpoint。确保整个拓扑结构不会花费太多时间用于 checkpoint。该设置可以设置多个重叠的 checkpoint,特点的场景可能会需要。

    当设置 time between checkpoint 时,不能使用此配置。

  • 外部 checkpoint:可以配置在系统外部持久化 checkpoint。Checkpoint 信息写入外部持久存储,在作业失败时不会自动清除,因此作业失败时可以用来恢复。

  • Checkpoint 出错时,任务状态:决定了如果在 checkpoint 过程中发生错误,当前任务是否将失败或继续执行。默认会任务失败。

val env = StreamExecutionEnvironment.getExecutionEnvironment()

// 启用 checkpoint 间隔 1000 ms
env.enableCheckpointing(1000)

// 高级选项:

// 设置 exactly-once 模式
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// 设置 checkpoint 最小间隔 500 ms 
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

// 设置 checkpoint 必须在1分钟内完成,否则会被丢弃
env.getCheckpointConfig.setCheckpointTimeout(60000)

// 设置 checkpoint 失败时,任务不会 fail,该 checkpoint 会被丢弃
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)

// 设置 checkpoint 的并发度为 1
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

相关配置

更多相关参数可以通过 conf/flink-conf.yaml 全局配置

配置项 默认值 描述
state.backend (none) 选择 state backend 实现
state.backend.async true state backend 使用异步方法。有些不支持异步,或者仅支持异步的可并忽略此选项
state.backend.fs.memory-threshold 1024 存储 state 数据文件的最小规模,如果小于该值则会存储在 root checkpoint metadata file
state.backend.incremental false 是否采用增量 checkpoint,有些不支持增量的可并忽略此选项
state.backend.local-recovery false
state.checkpoints.dir (none) 用于指定 checkpoint 数据存储目录,目录必须对所有参与的 TaskManagers 和 JobManagers 可见
state.checkpoints.num-retained 1 指定保留已完成的 checkpoint 数量
state.savepoints.dir (none) 用于指定 savepoint 数据存储目录
taskmanager.state.local.root-dirs (none)

选择 State backend

Checkpoint 的存储的位置取决于配置的 State backend(JobManager 内存,文件系统,数据库…)。

默认情况下,State 存储在 TaskManager 内存中,Checkpoint 存储在 JobManager 内存中。Flink 支持在其他 state backend 中存储 State 和 Checkpoint。可以通过如下方法配置:StreamExecutionEnvironment.setStateBackend(…),下面有更详细的介绍。

迭代任务中使用

Flink 目前仅为没有迭代的作业提供处理保证。在迭代作业上启用 checkpoint 会导致异常。为了强制对迭代程序执行 checkpoing,需要设置一个特殊标志:env.enableCheckpointing(interval, force = true)

在失败期间,处在循环边界的记录(以及与相关的 State 变化)将丢失。

State backend

Flink 提供了不同的 State backend,支持不通的 State 存储方式和位置。默认会使用配置文件 flink-conf.yaml 指定的选项,也可以在每个作业中设置来覆盖默认选项:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(...);

Flink 自带了以下几种开箱即用的 state backend:

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

在没有配置的情况下,系统默认使用 MemoryStateBackend

三种 State backend 介绍

MemoryStateBackend

使用 MemoryStateBackend,在 checkpoint 中对 State 做一次快照,并在向 JobManager 发送 checkpoint 确认完成的消息中带上此快照数据,然后快照就会存储在 JobManager 的内存堆中。

MemoryStateBackend 的限制:

  • 单个 State 的大小默认限制为5MB,可以在 MemoryStateBackend 的构造函数中增加。
  • 不论如何配置,State 大小都无法大于 akka.framesize(JobManager 和 TaskManager 之间发送的最大消息的大小)
  • JobManager 必须有足够的内存大小

MemoryStateBackend 适用以下场景:

  • 本地开发和调试
  • 只持有很小的状态,如方法:Map、FlatMap、Filter… 或 Kafka Consumer

FsStateBackend

FsStateBackend 需要配置一个文件系统的URL来,如 “hdfs://namenode:40010/flink/checkpoint” 或 “file:///data/flink/checkpoints”。

FsStateBackend 在 TaskManager 的内存中持有正在处理的数据。Checkpoint 时将 state snapshot 写入文件系统目录下的文件中,文件的路径会传递给 JobManager,存在其内存中。

FsStateBackend 默认是异步操作,以避免在写 state snapshot 时阻塞处理程序。如果要禁用异步,可以在 FsStateBackend 构造函数中设置:

new FsStateBackend(path, false);

FsStateBackend 适用以下场景:

  • State 较大,窗口时间较长和 key/value 较大的 State
  • 所有高可用性的情况

RocksDBStateBackend

RocksDBStateBackend 需要配置一个文件系统的URL来,如 “hdfs://namenode:40010/flink/checkpoint” 或 “file:///data/flink/checkpoints”。

RocksDBStateBackend 在 RocksDB 中持有正在处理的数据,RocksDB 在 TaskManager 的数据目录下。Checkpoint 时将整个 RocksDB 写入文件系统目录下的文件中,文件的路径会传递给 JobManager,存在其内存中。

RocksDBStateBackend 通常也是异步的。

RocksDBStateBackend 的限制:
RocksDB JNI API 是基于 byte[],因此 key 和 value 最大支持大小为2^31 个字节。RocksDB 自身在支持较大 value 时候有一些问题。

RocksDBStateBackendFsStateBackend 同样适用以下场景:
– State 较大,窗口时间较长和 key/value 较大的 State
– 所有高可用性的情况
目前唯一支持增量 checkpoint

与前两者相比(处理状态下的 State 还是保存在内存中),使用 RocksDB 可以保存的状态量仅受可用磁盘空间量的限制。这也意味着可以实现的最大吞吐量更低,后台的所有读/写都必须通过序列化和反序列化来检索/存储 State,这也比使用基于堆内存的方式代价更昂贵。

性能比较

Flink 支持 Standalone 和 on Yarn 的集群部署模式,以 Windowed Word Count 处理为例测试三种 State backends 在不通集群部署上的性能差异(来源:美团 Flink _Benchmark

Standalone 时的存储路径为 JobManager 上的一个文件目录,on Yarn 时存储路径为 HDFS 上一个文件目录。

不同 State backend 吞吐量对比

Throughput

  • 使用 FileSystem 和 Memory 的吞吐差异不大(都是使用堆内存管理处理中的数据),使用 RocksDB 的吞吐差距明显。
  • Standalone 和 on Yarn 的总体差异不大,使用 FileSystem 和 Memory 时 on Yarn 模式下吞吐稍高,相反的使用 RocksDB 时 Standalone 模式下的吞吐稍高。

不同 State backend 延迟对比

Latency

  • 使用 FileSystem 和 Memory 时延迟基本一致且较低。
  • 使用 RocksDB 时延迟稍高,且由于吞吐较低,在达到吞吐瓶颈附近延迟陡增。其中 on Yarn 模式下吞吐更低,延迟变化更加明显。

State backend 的选择

StateBackend in-flight checkpoint 吞吐 推荐使用场景
MemoryStateBackend TM Memory JM Memory 调试、无状态或对数据丢失或重复无要求
FsStateBackend TM Memory FS/HDFS 普通状态、窗口、KV 结构
RocksDBStateBackend RocksDB on TM FS/HDFS 超大状态、超长窗口、大型 KV 结构

Reference:
https://flink.xskoo.com/dev/stream/state/checkpointing.html
https://tech.meituan.com/Flink_Benchmark.html

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注