Flink DataStream 状态和容错 三:Savepoint 和 Restart

Savepoint

Savepoint 和 Checkpoint 的区别

Savepoint 是命令触发的 Checkpoint,对流式程序做一次完整的快照并将结果写到 State backend,可用于停止、恢复或更新 Flink 程序。整个过程依赖于 Checkpoint 机制。另一个不同之处是,Savepoint 不会自动清除。

分配 Operator IDs

Savepoint 中会以 Operator ID 作为 key 保存每个有状态算子的状态:

Operator ID State
source-id State of StatefulSource
mapper-id State of StatefulMapper

Operator ID 用于确定每个算子的状态,只要ID不变,就可以从 Savepoint 中恢复,Operator ID 如果不显示指定会自动生成,生成的ID取决于程序的结构,并且对程序更改很敏感。因此,建议手动分配这些ID:

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

Savepoint 操作

触发 Savepoint 时,会创建一个新的 Savepoint 目录,其中将存储数据和元数据。可以通过配置默认 targetDirectory 或指定自定义 targetDirectory:

state.savepoints.dir: hdfs:///flink/savepoints

如果既未配置缺省值也未指定自定义目录,Savepoint 将失败。

触发 Savepoint

$ bin/flink savepoint :jobId [:targetDirectory]

生成 Savepoint(以 jobId 作为唯一ID),并返回创建的 Savepoint 的路径,恢复时需要使用。

在 Yarn 集群触发 Savepoint

$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

要指定 jobId 和 yarnAppId(YARN应用程序ID),并返回创建的 Savepoint 的路径。

取消作业时生成 Savepoint

$ bin/flink cancel -s [:targetDirectory] :jobId

以原子方式触发具有 jobId 的 Savepoint,并取消作业。

恢复 Savepoint

$ bin/flink run -s :savepointPath [:runArgs]

提交作业,并指定要恢复的 Savepoint路径。

允许启动有未恢复 State

$ bin/flink run -s :savepointPath -n [:runArgs]

默认情况下,恢复操作将尝试将 Savepoint 的所有 State 恢复。如果删除了运算符,则可以通过 –allowNonRestoredState(简写为 -n) 选项跳过无法映射到新程序的状态。

删除 Savepoint

$ bin/flink savepoint -d :savepointPath

通过指定路径删除 Savepoint,也可以通过文件系统手动删除 Savepoint 数据,而不会影响其他 Savepoint 或 Checkpoint。

常见问题

应该为所有算子分配ID吗?
根据经验,是的。严格地说,只需要通过该uid()方法将ID分配给作业中的有状态 算子。Savepoint 仅包含这些算子的 State,无状态算子不是保存点的一部分。

如果在作业中新添加一个有状态算子,会发生什么?
新算子将在没有任何状态的情况下进行初始化,类似于无状态算子。

如果在作业删除一个有状态的算子,会发生什么?
如果没有指定允许启动有未恢复 State(–allowNonRestoredState / -n),启动会失败。

如果在作业中重新排列有状态算子,会发生什么?
如果手动这些算子分配了ID,作业将照常恢复。否则,重新排序后,有状态算子的自动生成ID很可能会更改,将导致无法从 Savepoint 恢复。

如果在作业中添加,删除或重新排序没有状态的算子,会发生什么?
如果为有状态算子手动分配了ID,作业将照常恢复,则无状态算子的改变不会影响。否则,重新排序后,有状态算子的自动生成ID很可能会更改,将导致无法从 Savepoint 恢复。

如果作业的并行性发生改变,会发生什么?
如果 Savepoint 的生成是使用 Flink 1.2.0 以及之后的版本,并且没有使用弃用状态API,可以正常恢复作业。

如果 Savepoint 的生成比 Flink 1.2.0 更早的版本,或者使用弃用状态API,则首先必须将作业和 Savepoint 升级到1.2.0以及之后的版本,然后才能更改并行度。请参考官方 升级指南

Restart

Flink 支持多种不同的重启策略,控制着作业失败后如何重启。集群可以设置默认的重启策略,作业提交的时候也可以指定重启策略,覆盖默认的重启策略。

默认的重启策略配置在 conf/flink-conf.yaml,参数 restart-strategy 定义了采用什么策略。如果 checkpoint 未启用,就会采用 “no restart” 策略,如果启用了 checkpoint 机制,但是未指定重启策略的话,就会采用 “fixed-delay” 策略。每个重启策略都有自己的参数来控制它的行为,这些值也可以在配置文件中设置,每个重启策略的描述都包含着各自的配置值信息。

以下是支持的三种重启策略的可配置项

重启策略 重启策略值
Fixed delay fixed-delay
Failure rate failure-rate
No restart None

除了定义一个默认的重启策略之外,你还可以为每一个Job指定它自己的重启策略,这个重启策略可以在ExecutionEnvironment中调用setRestartStrategy()方法来程序化地调用,这种方式同样适用于StreamExecutionEnvironment。

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
))

固定延迟重启策略(Fixed Delay Restart Strategy)

尝试一个给定的次数来重启Job,如果超过了最大的重启次数,Job最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。

参数配置 描述 默认值
restart-strategy.fixed-delay.attempts Flink尝试执行的次数 1,如果启用checkpoint的话是Integer.MAX_VALUE
restart-strategy.fixed-delay.delay 两次重启之间等待的时间 akka.ask.timeout,如果启用checkpoint的话是1s

flink-conf.yaml 参数配置:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
))

失败率重启策略(Failure Rate Restart Strategy)

Job失败后会重启次数如果超过失败率,Job会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。

配置参数 描述 默认值
restart-strategy.failure-rate.max-failures-per-interval Flink尝试执行的次数 1
restart-strategy.failure-rate.failure-rate-interval 计算失败率的时间间隔 1 min
restart-strategy.failure-rate.delay 两次重启之间等待的时间 akka.ask.timeout

flink-conf.yaml 参数配置:

restart-strategy:failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3 
restart-strategy.failure-rate.failure-rate-interval: 5 min 
restart-strategy.failure-rate.delay: 10 s
val env = ExecutionEnvironment.getExecutionEnvironment() 
env.setRestartStrategy(RestartStrategies.failureRateRestart( 
  3, // 每个测量时间间隔最大失败次数 
  Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔 
  Time.of(10, TimeUnit.SECONDS) // 两次连续重启尝试的时间间隔 
))

无重启策略(No Restart Strategy)

Job直接失败,不会尝试进行重启

flink-conf.yaml 参数配置:

restart-strategy: none
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/restart_strategies.html

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注