Flink 1.6.0 EventTime 和 Watermark

Flink 中提供了3种时间模型:EventTime、ProcessingTime、IngestionTime。
底层实现上分为2种:Processing Time 与 Event Time,Ingestion Time 本质上也是一种 Processing Time,官方文档 上对于3者的描述(参考下图):

  • EventTime 是事件创建的时间,即数据产生时自带时间戳。
  • IngestionTime 是事件进入 Flink 的时间,即进入 source operator 是给定的时间戳。
  • ProcessingTime 是每一个执行 window 操作的本地时间。

FlinkTimeModel

可以参考以下两篇 Blog 和 Paper 帮助对时间域的理解,也是官方推荐的
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102
https://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/43864.pdf
附笔者 翻译1 翻译2 翻译3

Flink 如何设置时间域?

调用 setStreamTimeCharacteristic 设置时间域,枚举类 TimeCharacteristic 预设了三种时间域,不显式设置的情况下,默认使用 TimeCharacteristic.ProcessTime。这也是 Flink 程序一般最开始的工作。

# Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

// 可选的:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

EventTime 与 WaterMarks

为什么必须处理事件时间?

在大多数情况下,消息进入系统中是无序的(网络、硬件、分布式逻辑都可能影响),并且会有消息延迟到达(例如移动场景中,由于手机无信号,导致一系列的操作消息在手机重新连接信号后发送),如果按照消息进入系统的时间计算,结果会与实时严重不符合。理想情况是 event time 和 processing time 是一致的(发生时间即处理时间),但是现实情况是不一致的,两者存在歪斜(skew)。

因此,支持事件时间的流式处理程序需要一种方法来测量事件时间的进度。例如,有一个按小时构建的窗口,当事件时间超过了一小时的时间范围,需要通知该窗口,以便关闭正在进行的窗口。

什么是水印(watermarks)

Flink 中检测事件时间处理进度的机制就是水印,Watermark 作为数据处理流中的一部分进行传输,并且携带一个时间戳t。一个 Watermark(t) 表示流中应该不再有事件时间比t小的元素(某个事件的时间戳比 Watermark 时间大)。


Watermark 有助于解决乱序问题

下图表示一个顺序的事件流中的 Watermark, Watermark 只代表一个简单的标记,
stream_watermark_in_order

下图表示一个乱序的事件流中的 Watermark,表示所有事件时间戳小于 Watermark 时间戳的数据都已经处理完了,任何事件大于 Watermark 的元素都不应该再出现,当然这只是一种推测性的结果(基于多种信息的推测),
stream_watermark_out_of_order

并行流中的水印

水印是在 Source function(源函数)处或之后生成的。源函数的每个并行子任务通常独立地生成水印。这些水印定义了该特定并行源的事件时间。

当水印经过流处理程序时,会将该算子的事件时间向前推进。当算子提前其事件时间时,会为后续的算子生成新水印。

一些算子使用多个输入流,例如,使用 union 或者 keyBy/partition 函数的算子。此类算子的当前事件时间是其输入流事件时间的最小值。
当它的输入流更新它们的事件时间时,算子也会更新。

下图显示了事件和水印经过并行流的示例,以及跟踪事件时间的运算符。

parallels_stream_watermark

延迟记录(Late Elements)

某些记录可能会违反水印的条件,事件时间小于t但是晚于水印t到达。实际运行过程中,事件可能被延迟任意的时间,所以不可能指定一个时间,保证该时间之前的所有事件都被处理了。而且,即使延时时间是有界限的,过多的延迟水印的时间也是不理想的,会造成时间窗口处理的太多延时。

生成时间戳和水印

  1. 首先设置时间域
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  1. 分配时间戳
    处理事件事件需要知道事件发生时间的时间戳,通常从流中数据元的某个字段提取时间戳。时间戳分配与生成水印密切相关,水印告诉系统事件时间的进展。

有两种方法可以分配时间戳并生成水印:
– 直接在数据流 source 中
– 通过时间戳分配器/水印生成器:在 Flink 中,时间戳分配器也会定义要发出的水印

带时间戳和水印的 Source Functions

Stream source 可以直接为生成的数据元分配时间戳,也可以发出水印。完成此 算子操作后,不需要时间戳分配器。如果使用了时间戳分配器,则 source 函数提供的任何时间戳和水印都将被覆盖。

要直接为源中的数据元分配时间戳,源必须使用 collectWithTimestamp(...) 方法作用域 SourceContext。要生成水印,源必须调用 emitWatermark(Watermark) 函数。

下面是一个分配时间戳并生成水印的简单示例:

override def run(ctx: SourceContext[MyType]): Unit = {
    while (/* condition */) {
        val next: MyType = getNext()
        ctx.collectWithTimestamp(next, next.eventTimestamp)

        if (next.hasWatermarkTime) {
            ctx.emitWatermark(new Watermark(next.getWatermarkTime))
        }
    }
}

时间戳分配器/水印生成器

时间戳分配器(Timestamp assigners)获取流并生成带有带时间戳数据元和水印的新流。如果原始流已经有时间戳或水印,时间戳分配器会覆盖它们。

时间戳分配器通常在数据源生成之后立即指定,但并非被严格要求这样做。常见的模式是在时间戳分配器之前执行解析(MapFunction)和过滤(FilterFunction)。在任何情况下,需要在第一个操作事件时间的算子执行之前指定时间戳分配器(例如第一个窗口算子操作)。
作为一种特殊情况,当使用 Kafka 作为流式作业的数据源时 ,Flink 允许在源(或消费者)本身内部指定时间戳分配器/水印发射器。更多信息相关信息请参考 Kafka Connector 文档

下面是一个时间戳分配器/水印生成器的简单示例(只介绍了必须实现的主要接口):

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// create stream source
val stream: DataStream[MyEvent] = env.readFile(
         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter())

// assign timestamp and watermark assigner after filter function
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

// window function and sink 
withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) => a.add(b) )
        .addSink(...)
使用周期性(periodically)水印

AssignerWithPeriodicWatermarks 分配时间戳并定期生成水印(可能取决于流数据元,或纯粹基于处理时间)。

生成水印的间隔(每n毫秒)使用 ExecutionConfig.setAutoWatermarkInterval(...)。每次调用分配器的方法 getCurrentWatermark(),如果返回的水印非空并且大于先前的水印,则将发出新的水印。

下面有两个例子,时间戳分配器使用周期性水印:

该例子假定元素到达时在一定程度上是无序的,某个时间戳t的最后达到元素相比时间戳t的最早到达元素,最大延迟n毫秒。

/**
The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxOutOfOrderness = 3500L // 3.5 seconds

    var currentMaxTimestamp: Long = _

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
    }
}

该例子假设元素在有界延迟后到达,生成器生成的水印比处理时间滞后固定时间长度。

class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxTimeLag = 5000L // 5 seconds

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current time minus the maximum time lag
        new Watermark(System.currentTimeMillis() - maxTimeLag)
    }
}

第二个例子比较容易理解,使用系统时间减去允许的延时时间作为 watermark 的时间。只跟当前系统时间有关系,如果大批事件出现延时的情况,可能很多在 watermark 的时间之后出现了,会被被丢弃。

第一个例子,在当前事件的事件时间和当前最大时间(记录最大的事件时间)中取最大值,得到最大的事件时间。用这个最大值减去一个允许的延时时间作为 watermark 时间。同样的如果大批事件发生延时,那么对应的 watermark 的时间就会向后推。

带标记(Punctuated)水印

使用 AssignerWithPunctuatedWatermarks 在某个事件指定生成新的水印的时候生成水印。这种情况下,Flink 首先会调用 extractTimestamp(...) 方法为数据分配时间戳,然后立即调用 checkAndGetNextWatermark(...)

checkAndGetNextWatermark(...) 方法传递在 extractTimestamp(...) 生成的时间戳,并且界定是否要生成水印。每当 checkAndGetNextWatermark(...) 方法返回非空水印,并且该水印大于先一个水印时,将向后发出新水印。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
        if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
    }
}

每个事件都可以生成水印。但是,由于水印会导致一些后续的计算,因此过多的水印会降低性能。

每个 Kafka 分区一个时间戳

当使用 Kafka 作为数据源的时候,每个分区可能有一个简单的事件时间模式(按时间戳升序或其他)。当消费来自 Kafka 的流时,多个分区一般会并行消费,分区中的事件交替消费,会破坏分区中的模式(Kafka 的消费者客户端工作方式)。

在这种情况下,可以使用 Flink 的 Kafka-partition-aware(分区感知)水印生成器。使用这个特性的时候,水印会在 Kafka 消费者内部为每个分区生成,并且每个分区水印的合并方式与在流shuffle时合并水印的方式相同。

例如,如果事件时间戳严格按每个 Kafka 分区升序排列,那么使用升序时间戳水印生成器,为每分区生成水印 将产生完美的总体水印。
下图显示了如何为每个 Kafka 分区生成水印,以及在这种情况下水印如何通过流式数据流传播。

val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)

// kafka source set timestamp assigner
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
    def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})

val stream: DataStream[MyType] = env.addSource(kafkaSource)

Kafka consumer watermark

预定义的 Timestamp Extractors / Watermark Emitters

Flink 提供抽象方式允许程序指定分配的时间戳并发出水印。可以通过实现 AssignerWithPeriodicWatermarksAssignerWithPunctuatedWatermarks 接口来实现,具体取决于场景。第一个会定期发出水印,第二个基于传入记录的某些属性发出水印(例如,在流中遇到指定数据时)。

为了进一步简化此类任务的编程工作,Flink 附带了一些预先实现的时间戳分配器。除了开箱即用外,还可以作为自定义实现的示例。

具有递增时间戳的 Assigner

定期生成水印的最简单的特殊情况是,给定的源任务看到的时间戳按升序出现的情况。在这种情况下,当前时间戳始终可以充当水印。

时间戳只需要在每个并行数据源任务中是升序的。例如,如果在特定设置中,每个并发的源实例读取一个 Kafka 分区,则只需要在每个 Kafka 分区中时间戳是递增。水印合并机制将生成正确的水印,当并行流被shuffle,union,connect 或 merge 时。

val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
允许固定时间延迟的 Assigner

另一个定期水印的例子是,当水印滞后于流中看到的最大时间戳(事件时间)一段固定的时间。包括,预先知道流中可能遇到的最大延迟的情况。Flink 提供了 BoundedOutOfOrdernessTimestampExtractor,使用参数 maxOutOfOrderness,计算给定窗口的最终结果时,允许元素延迟的最长时间,超过的会被忽略。延迟为 t - t_wt是数据的事件时间时间戳,t_w是前一个水印的时间戳),如果延迟大于0,数据被认为是迟到的,默认会在计算窗口的作业结果时被忽略。

val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamps_watermarks.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html
http://vishnuviswanath.com/flink_eventtime.html

Tags:

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注