Flink 1.13 State 和 Fault Tolerance(一)

[toc]

Overview

有状态函数(Stateful function)和算子(Operator)在处理独立的数据或事件时存储数据,使得状态(State)成为任何复杂算子中的关键部分,例如:

  • 当应用检索特定的事件模式,State 将存储到接收到的事件的序列
  • 当按照分钟/小时/天的唯独聚合事件时,状态将保留待处理的聚合状态
  • 当在流数据上进行机器学习的模型训练时,状态保存模型当前版本的参数
  • 当需要管理历史数据时,状态允许有效地访问过去发生的事件

Flink 需要知道 State,以便使程序的 checkpoint 和 savepoint 可用。

有关状态的知识还允许重新缩放Flink应用程序,这意味着Flink负责在并行实例之间重新分配状态。

Work with State

State 分类

Flink 中的状态分为两种:Keyed state 和 Operator state

Operator State

每个 Operator state 都绑定到一个并行的算子的实例上。可以参考 Kafka connector 的例子,Kafka 消费者的每个并行实例都维护一个 topic 分区和 offset 的对应关系作为 Operator state。Operator state 支持当并行度更改时,在并行的算子实例间重新分配 State,进行这种重新分配有多种不同的方案(后面会介绍)。

Operator state 在 Python DataStream API 中还不支持。

Keyed State

Keyed state 与 key 强相关,只能在 KeyedStream 上应用的函数和算子内使用。

可以将 Keyed state 认为是分区后的 Operator state,每个 key 有一个 State 的分区。逻辑上,每个 Keyed state 绑定唯一的 <parallel-operator-instance, key>(算子并行实例和 key 的一对元组),可以将其简单地视为 <operator, key>(因为每个 key 属于算子的唯一一个并行实例)。

Keyed state 进一步被组织为 Key groups。Key groups 是 Flink 重新分配 Keyed state 的原子单位,Key groups 的数量与最大并行度相同。在程序执行期间 keyed operator 的每个并行实例都使用一个或多个 Key groups 的 keys。

Broadcast State

Broadcast State 是一种特殊的 Operator State。Broadcast State 是 Flink 支持的另一种扩展方式。用来支持将某一个流的数据广播到所有下游任务,数据被存储在本地,接受到广播的流在操作时可以使用这些数据。

Broadcast state 的特点是:

  • 使用 Map 类型的数据结构
  • 仅适用于同时具有广播流和非广播流作为数据输入的特定算子
  • 可以具有多个不同名称的 Broadcast state

Broadcast state 在 Python DataStream API 中还不支持。

Raw State 和 Managed State

State 有两种状态:托管状态(managed)和原生状态(raw)。

托管状态(managed state) 由 Flink runtime 管理的数据结构表示,例如内部哈希表或 RocksDB。Flink runtime 对 State 进行编码并写入 checkpoint。

原生状态(raw state) 是在算子内部的数据结构中的保存。Checkpoint 只会保存 State 内容的字节序列,State 的真实数据结构对 Flink 是透明的。

所有数据流函数都可以使用托管状态(managed state),而原生状态(raw state)只能在具体实现算子时使用。建议使用托管状态,因为在托管状态下,Flink 能够在并行度改变时自适应地重新分配 State,并且在内存管理方面可以做的更好。

Keyed DataStream

如果要使用 keyed state,首先需要在 DataStream 上指定 key(可以使用方法 keyBy(KeySelector) ),用于对状态以及流中的记录进行分区,这将产生 KeyedStream,可以使用 Keyed state。

KeySelector 函数将单个记录作为输入并返回该记录的 key 值,可以是任何类型,并且必须从确定性计算中派生。

Flink 的数据模型不是基于 key-value 的。因此,不需要将数据集类型物理地加入到 key 和 value 中。key 被定义用于指导分组算子。

public class WC {
  public String word;
  // ...

  public String getWord() { return word; }
}

DataStream<WC> words = ...
KeyedStream<WC> keyed = words.keyBy(WC::getWord);

Tuple Keys and Expression Keys

Flink 还有两种定义 key 的方法:Java/scala api 中的 tuple keys 和 expression keys(python api中仍然不支持)。

可以使用元组(Tuple)字段索引或表达式来选择对象的字段来指定 key。现在不推荐使用这些,使用 KeySelector 函数是绝对优越的:使用 lambda,易于使用,在运行时的开销可能更小。

Keyed State 和 Operator State

Use Keyed State

Keys state 提供多种不同类型 State,作用域都是当前输入数据的键,只能用于 KeyedStream,可以通过 stream.keyBy(…) 创建。

首先看有哪些不同类型的状态,以及如何在程序中使用:

  • ValueState<T>:保存了一个值,可以更新和读取(算子操作的每个 key 可能有一个 value)

    update(T) 更新
    T value() 取值

  • ListState<T>:保存了一个列表,可以追加元素,可以获取到一个包含所有当前存储的元素的迭代器
    add(T)addAll(List<T>) 添加到列表
    Iterable<T> get() 获取迭代器
    update(List<T>) 使用新的列表覆盖现有列表

  • ReducingState<T>:保存一个值,表示添加到 State 的所有值的聚合结果。提供的接口类似于 ListState
    add(T) 函数会使用指定的函数(ReduceFunction)对添加的值进行聚合

  • AggregatingState<IN, OUT>:保存一个值,表示添加到 State 的所有值的聚合结果。与 ReducingState 不同的是,聚合结果的数据类型可以与添加到 State 的元素的数据类型不同。接口同样类似于 ListState
    add(IN) 函数会使用指定的函数(AggregateFunction)对添加的值进行聚合

  • MapState<UK, UV>:保存一个 Map。可以将 key/value 存入 State,也可以获取到一个包含所有当前存储的元素的迭代器
    put(UK, UV)putAll(Map<UK, UV>) 添加 key/value 到 Map
    get(UK) 获取与指定 key 的 value
    entries()keys()values() 对 Map 的元素/键/值遍历访问

所有类型的 State 都有 clear() 方法来清除当前状态。

首先要记住这些 State 对象仅用于有状态接口,State 不一定存储在内存,也可能存储在磁盘或其他位置。其次要记住的是,从 State 获得的值取决于输入数据的 key,如果处理的 keys 不同,定义的函数的调用结果会不同。

要想操作 State 对象,首先必须创建一个 StateDescriptor,该对象拥有 State 名称(可以创建多个 State,必须具有唯一的名称来引用 State),State 所持有的值的类型,可能还有用户指定的函数(例如 ReduceFunction)。对应不同的 State 类型,有如下类对象:ValueStateDescriptorListStateDescriptorAggregatingStateDescriptor, ReducingStateDescriptorMapStateDescriptor

然后使用 RuntimeContext 可以才访问到 State,因此只能在 RichFunction 中使用,在 RichFunction 方法中 RuntimeContext 访问各种类型 State 的方法:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

下面是在 FlatMapFunction 中的使用示例:

实现了一个简单的计数窗口,通过输入元组的第一个参数分组,在分组的流中,每接收到两个元组,返回两那个元组的第二个参数的平均值

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {

    // 使用 ValueState 保存元素求和值
    // 元组第一个参数为求和个数 count,第二个参数为求和值 sum
    private var sum: ValueState[(Long, Long)] = _

    override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
        // 访问 State 的值
        val tmpCurrentSum = sum.value

        // 初始值 (0, 0)
        val currentSum = if (tmpCurrentSum != null) {
            tmpCurrentSum
        } else {
            (0L, 0L)
        }

        // 求和并更新 State
        val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
        sum.update(newSum)

        // 如果 state value 达到 2, 发送统计的平均值并清空 state
        if (newSum._1 >= 2) {
            out.collect((input._1, newSum._2 / newSum._1))
            sum.clear()
        }
    }

    override def open(parameters: Configuration): Unit = {

        // 通过 RuntimeContext 和 ValueStateDescriptor 获取 ValueState
        sum = getRuntimeContext.getState(
            new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
        )
    }
}

object ExampleCountWindowAverage extends App {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // create keyed stream and call CountWindowAverage
    env.fromCollection(List(
        (1L, 3L), (1L, 5L), (1L, 7L), (1L, 4L), (1L, 2L)
    ))
        .keyBy(_._1)
        .flatMap(new CountWindowAverage())
        .print()

    // 输出 (1,4) (1,5)
    env.execute("ExampleManagedState")
}

状态生存周期 (TTL)

状态生存周期(TTL)可以被指定给任何类型的 Keyed state,如果配置了 TTL 并且 State value 已过期,State value 会被清除。所有集合类型 State(ListState 和 MapState)都支持为每个条目设置 TTL。

为了启用 State TTL,首先需要构建 StateTtlConfig 对象,然后通过在 ValueStateDescriptor(其他类型同理)构造中传入该对象来启用 TTL,参考下面的例子:

// Time.seconds(1) 生存时间,必填项
// 
// StateTtlConfig.UpdateType 更新类型
//   - UpdateType.OnCreateAndWrite - 创建和写入时更新(默认)
//   - UpdateType.OnReadAndWrite - 读取和写入时更新
// 
// StateTtlConfig.StateVisibility 状态可见性,访问时是否返回已经过期的值
//   - StateVisibility.NeverReturnExpired - 永远不会返回过期的值(默认),对于不能访问过期数据的场景有用
//   - StateVisibility.ReturnExpiredIfNotCleanedUp -如果可以读到会返回
// 
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) 
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build
// 构建 ValueStateDescriptor
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
// 启用 TTL
stateDescriptor.enableTimeToLive(ttlConfig)

补充:

  • 状态上次的修改时间会和数据一起保存在 state backend 中,因此开启 TTL 特性会增加状态数据的存储。
  • 目前TTL仅支持处理时间(processing time)。
  • 如果之前没有配置TTL,而状态恢复时启用TTL(相反的情况同样),会引起兼容性错误和 StateMigrationException 异常。
  • TTL配置不是 checkpoint 或 savepoint 的一部分,而是在 Flink 运行时做处理。
  • 对于 Map 类型,只有序列化方法支持空值时,TTL的设置才支持空值,否则需要使用 NullableSerializer 进行封装(序列化会占用额外的字节)。
  • State TTL 当前在 PyFlink DataStream API 中还不支持。

过期数据清理

默认情况下,过期值在读取时被显式删除(如调用 ValueState.value()),如果配置的 state 后端支持,则定期在后台进行垃圾收集。可以在 StateTtlConfig 中禁用后台清理:

某些老版本,如果未读取过期状态的数据,则不会将其删除,这可能会导致状态不断增长

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .disableCleanupInBackground()
    .build();

为了对后台的某些特殊清理进行更细粒度的控制,可以按如下所述单独配置。目前,heap state backend 依赖于增量清理,RocksD backend 使用压缩过滤器进行后台清理。

全量快照时进行清理

可以指定在获取完整的 State 快照时激活清理方法,以减小快照的大小。本地状态在当前实现下未被清除,但当从上一个快照恢复时,不会包括已删除的过期 State。

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();

注意:

  • 这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。
  • 这种清理方式可以在任何时候通过 StateTtlConfig 启用或者关闭,比如在从 Savepoint 恢复时。
增量数据清理

可以选择增量方式清理状态数据,在状态访问或处理时进行。如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器。 每次触发增量清理时,从迭代器中选择已经过期的数进行清理。

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally(10, true)
    .build();

该策略有两个参数。 第一个是每次清理时检查状态的条目数,在每个状态访问时触发。第二个参数表示是否在处理每条记录时触发清理。 Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。

注意:

  • 如果没有 state 访问,也没有处理数据,则不会清理过期数据。
  • 增量清理会增加数据处理的耗时。
  • 现在仅 Heap state backend 支持增量清除机制。在 RocksDB state backend 上启用该特性无效。
  • 如果 Heap state backend 使用同步快照方式,则会保存一份所有 key 的拷贝,从而防止并发修改问题,因此会增加内存的使用。但异步快照则没有这个问题。
  • 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。
在 RocksDB 压缩时清理

如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。 Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build();

Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期,可以通过 StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法指定处理状态的条数。

时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能。RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。

可以通过配置开启 RocksDB 过滤器的 debug 日志查看清理操作: log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG

注意:

  • 压缩时调用 TTL 过滤器会降低速度。TTL 过滤器需要解析上次访问的时间戳,并对每个将参与压缩的状态进行是否过期检查。对于集合型状态类型(比如 list 和 map),会对集合中每个元素进行检查。
  • 对于元素序列化后长度不固定的列表状态,TTL 过滤器需要在每次 JNI 调用过程中,额外调用 Flink 的 Java 序列化器,从而确定下一个未过期数据的位置。
  • 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。

Use Operator State

使用托管的 Operator State,有状态函数需要实现 CheckpointedFunction 接口。

CheckpointedFunction

CheckpointedFunction 接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:

// 执行 Checkpoint 时调用
void snapshotState(FunctionSnapshotContext context) throws Exception;

// 初始化函数时调用
void initializeState(FunctionInitializationContext context) throws Exception;

进行 checkpoint 时会调用 snapshotState()。 用户自定义函数初始化时会调用 initializeState(),初始化包括第一次自定义函数初始化和从之前的 checkpoint 恢复。 因此 initializeState() 不仅是定义不同状态类型初始化的地方,也需要包括状态恢复的逻辑

Operator state 的数据结构不像 Keyed state 丰富,只支持 List,可以认为是可序列化对象的列表,彼此独立。这些对象在动态扩展时是可以重新分配 non-keyed state 的最小单元。目前支持几种动态扩展方式:

  • Even-split redistribution:算子并发度发生改变的时候,并发的每个实例取出 State 列表,合并到一个新的列表上,形成逻辑上完整的 State。然后根据列表元素的个数,均匀分配给新的并发实例(Task)。例如,如果并行度为1,算子的 State checkpoint 包含数据元 element1 和 element2,当并行度增加到2时,element1 会在 算子实例0中,而element2在算子实例1中。
  • Union redistribution:相比于平均分配更加灵活,把完整 State 划分的方式交给用户去做。并发度发生改变的时候,按同样的方式取到完整的 State 列表,然后直接交给每个实例。

下面的例子是有状态的 SinkFunction,利用 CheckpointedFunction 在将数据元发送之前进行缓存,checkpoint 时缓存写入 State,启动时判断是否使用 重发的 State 恢复缓存:

class BufferingSink(threshold: Int = 0) extends SinkFunction[(String, Int)]
    with CheckpointedFunction {

    @transient
    private var checkpointedState: ListState[(String, Int)] = _

    // 输入数据缓存
    private val bufferedElements = ListBuffer[(String, Int)]()

    override def invoke(value: (String, Int)): Unit = {
        // 输入数据存入 buffer
        bufferedElements += value

        // 当缓存数量达到阈值,发送数据,并清理 buffer
        if (bufferedElements.size == threshold) {
            for (element <- bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear()
        }
    }

    // 执行 Checkpoint 时调用
    // 清除前一个检查点包含的所有对象
    // 缓存数据添加到 State
    override def snapshotState(context: FunctionSnapshotContext): Unit = {
        checkpointedState.clear()
        for (element <- bufferedElements) {
            checkpointedState.add(element)
        }
    }

    // 用于首次初始化或从 checkpoint 恢复
    override def initializeState(context: FunctionInitializationContext): Unit = {

        val descriptor = new ListStateDescriptor[(String, Int)](
            "buffered-elements",
            TypeInformation.of(new TypeHint[(String, Int)]() {})
        )

        // 使用 ctx 和 ListStateDescriptor 访问 ListState
        // 注意调用方法 getListState(descriptor)
        checkpointedState = context.getOperatorStateStore.getListState(descriptor)

        // 根据 isRestored() 方法来检查当前是否是 checkpoint 恢复的情况
        if(context.isRestored) {
            // 如果 true,表示是恢复失败的情况,应用恢复数据的逻辑:
            //   恢复数据添加到 buffer 中
            for(element <- checkpointedState.get()) {
                bufferedElements += element
            }
        }
    }

}

initializeState 方法接收一个 FunctionInitializationContext 参数,会用来初始化 non-keyed state 的 “容器”。这些容器是一个 ListState 用于在 checkpoint 时保存 non-keyed state 对象。

注意这些状态是如何初始化的,和 keyed state 类似,StateDescriptor 会包括状态名字、以及状态类型相关信息。

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "buffered-elements",
        TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

调用不同的获取状态对象的接口,会使用不同的状态分配算法。比如 getUnionListState(descriptor) 会使用 union redistribution 算法, 而 getListState(descriptor) 则简单的使用 even-split redistribution 算法。

当初始化好状态对象后,我们通过 isRestored() 方法判断是否从之前的故障中恢复回来,如果该方法返回 true 则表示从故障中进行恢复,会执行接下来的恢复逻辑。

正如代码所示,BufferingSink 中初始化时,恢复回来的 ListState 的所有元素会添加到一个局部变量中,供下次 snapshotState() 时使用。 然后清空 ListState,再把当前局部变量中的所有元素写入到 checkpoint 中。

另外,我们同样可以在 initializeState() 方法中使用 FunctionInitializationContext 初始化 keyed state。

有状态的 Source Functions

带状态的数据源比其他的算子需要注意更多东西。为了保证更新状态以及输出的原子性(用于支持 exactly-once 语义),用户需要在发送数据前获取数据源的全局锁。

class CounterSource extends RichParallelSourceFunction[Long]
       with ListCheckpointed[Long] {

    @volatile
    private var isRunning = true

    private var offset = 0L

    override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {

        // 获取锁
        val lock = ctx.getCheckpointLock

        while (isRunning) {
            // 输出和更新在一个原子操作中
            lock.synchronized({
                ctx.collect(offset)
                offset += 1
            })
        }
    }

    override def cancel(): Unit = isRunning = false

    override def restoreState(state: util.List[Long]): Unit =
        for (s <- state) {
            offset = s
        }

    override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
        Collections.singletonList(offset)
}

Broadcast State Pattern

Broadcast State API

下面是一个例子来展示 Broadcast State API 的使用。在这个示例中,要处理的数据流中是有不同颜色(Color)和形状(Shape)属性的对象,希望在流中找到一对具有相同颜色的,并且遵循一个特定的形状模式的对象组(例如,一个红色长方形后面紧跟着一个红色三角形的组合)。 同时希望寻找的模式也会随着时间而改变。

第一个数据流是要处理的数据源,流中的对象是 Item,具有 ColorShape 属性

// 使用 Color 作为键来分组,保证相同 Color 的数据进入到同一个子任务中
KeyedStream<Item, Color> colorPartitionedStream = shapeStream
                        .keyBy(new KeySelector<Shape, Color>(){...});

第二个数据流是要广播的数据,流中的对象是匹配规则(Rules)

// MapState 中保存 (RuleName,Rule) ,在描述类中指定 State name
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
                "RulesBroadcastState",
                    BasicTypeInfo.STRING_TYPE_INFO,
                    TypeInformation.of(new TypeHint<Rule>() {}));

// ruleStream 使用 MapStateDescriptor 作为参数广播,得到广播流
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
                        .broadcast(ruleStateDescriptor);

然后需要做的是,连接两个流并且指定两个连接后的处理逻辑(使用广播的 Rules 解析流中匹配的数据组,并返回)

DataStream<Match> output = colorPartitionedStream
                .connect(ruleBroadcastStream)
                .process(
                                    /**
                                     * 各个参数含义:   
                                         * Color,非广播流 keyed stream 的键的类型 
                                         * Item,非广播流的对象类型
                                         * Rule,广播流的对象类型  
                                         * String,返回结果的类型
                                     **/
                    new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
                        // matching logic
                    }
                )

如何连接两个流:非广播流调用 connect(BroadcastStream) 方法用来连接广播流和非广播流,BroadcastStream 作为参数,返回一个 BroadcastConnectedStream 对象。BroadcastConnectedStream 调用 process() 方法执行处理逻辑,需要指定一个逻辑实现类作为参数,而具体的需要实现的抽象类取决于非广播流的类型:

  • 如果非广播流是 keyed stream,需要实现 KeyedBroadcastProcessFunction
  • 如果非广播流是 non-keyed stream,需要实现 BroadcastProcessFunction

BroadcastProcessFunction 和 KeyedBroadcastProcessFunction

这两个抽象函数有两个相同的需要实现的接口:

  • processBroadcastElement() 处理广播流数据
  • processElement() 处理非广播流数据

用于处理非广播流是 non-keyed stream 的情况

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}

用于处理非广播流是 keyed stream 的情况

public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}

可以看到这两个接口提供的上下文对象有所不同。处理非广播流(processElement())使用 ReadOnlyContext,处理广播流(processBroadcastElement())使用 Context

这两个上下文对象(简称 ctx)提供的方法接口:

  1. 访问 Broadcast state:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
    1. getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同。
  2. 查询数据的时间戳:ctx.timestamp()
  3. 获取当前水印:ctx.currentWatermark()
  4. 获取当前处理时间:ctx.currentProcessingTime()
  5. 向旁路输出(side-outputs)发送数据:ctx.output(OutputTag<X> outputTag, X value)

这两个上下文对象不同之处在于对 Broadcast state 的访问限制:处理广播流元素,具有读和写的权限(read-write),处理非广播流元素只有读的权限(read-only)

这么设计的原因是,保证 Broadcast state 在算子的所有并行实例中是相同的。由于 Flink 中没有跨 Task 的通信机制,在一个任务实例中的修改不能在并行 Task 间传递。而广播端在所有并行任务中都能看到相同的数据元素,只对广播端提供可写的权限。同时要求在广播端的每个并行任务中,对接收数据的处理是相同的。如果忽略此规则会破坏 State 的一致性保证,从而导致不一致且难以诊断的结果。也就是说,processBroadcast() 的实现逻辑必须在所有并行实例中具有相同的确定性行为。

KeyedBroadcastProcessFunction 在 Keyed Stream 上工作,提供了一些 BroadcastProcessFunction 没有的功能:

  1. processElement() 的参数 ReadOnlyContext 提供了方法能够访问 Flink 的定时器服务,可以注册事件定时器(event-time timer)或者处理时间的定时器(processing-time timer)。当定时器触发时,会调用 onTimer() 方法, 提供了 OnTimerContext,具有 ReadOnlyContext 的全部功能,并且提供:
    • 查询当前触发的是一个事件还是处理时间的定时器
    • 查询定时器关联的key
  2. processBroadcastElement() 方法中的参数 Context 会提供方法 applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function)。 这个方法使用一个 KeyedStateFunction 能够对 stateDescriptor 对应的 state 中所有 key 的存储状态进行某些操作。

注册一个定时器只能在 KeyedBroadcastProcessFunctionprocessElement() 方法中进行。在 processBroadcastElement() 方法中不能注册定时器,因为广播的元素中并没有关联的 key。

回到前面的例子,KeyedBroadcastProcessFunction 的实现可能看起来如下:

new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {

    // 存储部分匹配的结果,即匹配了一个元素,正在等待第二个元素
    // 我们用一个数组来存储,因为同时可能有很多第一个元素正在等待
    private final MapStateDescriptor<String, List<Item>> mapStateDesc =
        new MapStateDescriptor<>(
            "items",
            BasicTypeInfo.STRING_TYPE_INFO,
            new ListTypeInfo<>(Item.class));

    // 与之前的 ruleStateDescriptor 相同
    private final MapStateDescriptor<String, Rule> ruleStateDescriptor = 
        new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Rule>() {}));

    @Override
    public void processBroadcastElement(Rule value,
                                        Context ctx,
                                        Collector<String> out) throws Exception {
        ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
    }

    @Override
    public void processElement(Item value,
                               ReadOnlyContext ctx,
                               Collector<String> out) throws Exception {

        final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
        final Shape shape = value.getShape();

        for (Map.Entry<String, Rule> entry :
                ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
            final String ruleName = entry.getKey();
            final Rule rule = entry.getValue();

            List<Item> stored = state.get(ruleName);
            if (stored == null) {
                stored = new ArrayList<>();
            }

            if (shape == rule.second && !stored.isEmpty()) {
                for (Item i : stored) {
                    out.collect("MATCH: " + i + " - " + value);
                }
                stored.clear();
            }

            // 不需要额外的 else{} 段来考虑 rule.first == rule.second 的情况
            if (shape.equals(rule.first)) {
                stored.add(value);
            }

            if (stored.isEmpty()) {
                state.remove(ruleName);
            } else {
                state.put(ruleName, stored);
            }
        }
    }
}

重要考虑因素

使用 Broadcast state 时要注意的是:

  • 没有跨 Task 的通信,这就是为什么只有处理广播流元素可以修改 Broadcast state 的原因。用户需要保证所有 Task 对于 Broadcast state 的处理方式是一致的,否则会造成不同 Task 读取 Broadcast state 时内容不一致的情况,最终导致结果不一致。
  • 跨 Task 的 Broadcast state 中的事件顺序可能不同,尽管广播流的元素可以保证都将转到所有下游 Task,但元素可能以不同的顺序到达下游 Task。因此,Broadcast state 更新不能依赖传入事件的顺序。
  • 所有 Task 都会把 Broadcast state 存入 checkpoint,而不仅仅是其中一,虽然 checkpoint 发生时所有任务在具有相同的 broadcast state。这是为了避免在恢复期间所有任务从同一文件中进行恢复(避免热点),代价是 state checkpoint 的大小增加了并行度数量的倍数。Flink 会保证在恢复状态/改变并发的时候数据没有重复没有缺失。 在作业恢复时,如果与之前具有相同或更小的并发度,所有的 Task 读取之前已经在 checkpoint 中的 state。在增大并发的情况下,多出来的并发会使用轮询调度算法读取之前 Task 的 state。
  • 不支持 RocksDB state backend,broadcast state 在运行时保存在内存中,需要保证内存充足。这一特性同样适用于所有其他 Operator State。

主要引用:

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/broadcast_state/

Tags:

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注