流式处理的一些概念 三:会话窗口(译)

接着上一篇进行讨论,窗口化有多个纬度,重点看下其中的两个维度:处理时间中的固定窗口(processing-time windows)和事件时间中的会话窗口(session windows)。

Processing-time windows

处理时间窗口之所以重要,有两个原因:

  • 对于某些场景,如使用情况的监视(例如,Web服务流量QPS),希望分析所观察到的传入数据流,处理时间窗口绝对是应采取的适当方法。

  • 对于事件发生的时间很重要的用例(例如,分析用户行为趋势、计费、评分等),处理时间窗口绝对是错误的方法,并且能够识别这些情况是至关重要的。

因此,值得深入了解处理时间窗口和事件时间窗口之间的差异,特别是考虑到当今大多数流式系统中处理时间窗口普遍存在。

当作为第一类概念的窗口是严格基于事件时间的,可以使用两种方法来实现处理时间窗口:

  • 触发器(Trigger):忽略事件时间(使用跨越所有事件时间的全局窗口,global window),并使用触发器来提供处理时间轴中该窗口的快照。

  • 进入时间(Ingress Time):为数据到达时分配进入时间作为事件时间,然后使用从那里开始的事件时间进行窗口化。这基本上就是像 Spark streaming 现在所做的。

这两种方法或多或少是等价的,尽管在多级 pipeline 的情况下略有不同:在触发器版本中,每个阶段独立地切分处理时间窗口(例如,一个阶段的窗口X中的数据在下一阶段可能最终在窗口X-1或X+1中结束);在进入时间版本中,一旦数据被合并到窗口X中,数据将在 pipeline 期间保留在窗口X中,这是由于阶段间的进度同步(通过任何协调方式)。

处理时间窗口最大的缺点就是当输入的观察顺序改变时,窗口的内容会发生变化。为了更加具体地说明这一点,我们将看看这三个用例:

Event-time windowing
Processing-time windowing via triggers
Processing-time windowing via ingress time

我们将分别应用两个不同的输入集(总共有六个变体)。这两个输入集会有完全相同的事件(相同的值,相同的事件时间),但是具有不同的观察顺序,如下图。第一组是观察顺序,颜色是白色;第二组是在处理时间轴中移动后的值,颜色是紫色。

Event-time windowing

我们首先比较事件时间中的固定窗口和这两个观察顺序上的启发式水印。左边是是之前看到的结果;右边是第二个观察顺序的结果。即使输出的总体形状不同(由于处理时间中的观察顺序不同),四个窗口的最终结果仍然相同

Processing-time windowing via triggers

现在尝试触发器方法。有三个方面使处理时间“窗口化”以这种方式工作:

  • Windowing:使用全局事件时间窗口,因为我们基本上是用事件时间窗格模拟处理时间窗口。

  • Triggering:在处理时域中定期触发,基于处理时间窗口的期望大小。

  • Accumulation:使用丢弃模式来保持窗格彼此独立,从而让每个窗格都像一个独立的处理时间“窗口”。

伪代码如下

PCollection<KV<String, Integer>> scores = input
    .apply(Window.triggering(
            Repeatedly(AtPeriod(Duration.standardMinutes(2))))
        .discardingFiredPanes())
    .apply(Sum.integersPerKey());

处理结果如下面的所示,

  • 因为我们通过事件时间窗格模拟处理时间窗口,所以在处理时间轴中描绘“窗口”,这意味着它们的宽度是在Y轴而不是X轴上测量的。

  • 由于处理时间窗口对遇到输入数据的顺序很敏感,所以对于两个观察顺序中的每个“窗口”的结果都不同。

Processing-time windowing via ingress time

再来看一下通过将输入数据的事件时间映射为它们的进入时间来实现的处理时间窗口。这里有四个方面值得一提:

  • Time-shifting:当元素到达时,事件时间被进入时间。

  • Windowing:使用标准固定事件时间窗口。

  • Triggering:由于进入时间提供了计算完美水印的能力,所以可以使用默认触发器,在这种情况下,当水印通过窗口末尾时,默认触发器隐式触发一次。

  • Accumulation:因为我们每个窗口只有一个输出,所以累积模式是不相关的。

伪代码如下

PCollection<String> raw = IO.read().withIngressTimeAsTimestamp();
PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn());
PCollection<KV<String, Integer>> scores = input
    .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))
    .apply(Sum.integersPerKey());

处理结果如下面的所示,当数据到达时,它们的事件时间被更新以匹配它们的进入时间,导致向右水平移动到理想水印线上

  • 即使输入的值和事件时间保持不变,当输入顺序改变时,会得到不同的结果。

  • 窗口在事件时间域中(沿着X轴划分)。将处理时间映射到事件时间域上,擦除每个输入的原始发生记录,并用新的记录替换,该记录表示管道首次观察数据的时间。

由此我们可以得出一个结论,事件时间窗口是顺序无关的,至少是有限的(在输入完成之前,实际的窗格可能不同);而处理时间窗口不是。如果关心事件实际发生的时间,则必须使用事件时间窗口,否则结果将毫无意义。

Session windows

会话是一种特殊类型的窗口,它捕获数据中的活动周期,周期之间由不活动间隙(gap of inactivity)分割。在数据分析中特别有用,可以为特定用户提供他们在特定时间段内从事某种活动的视图。

  • Sessions 是数据驱动窗口的一种:窗口的位置和大小是输入数据本身来决定,而不是基于某个预定义的时间模式,比如固定窗口和滑动窗口。

  • Sessions 也是未对齐窗口的一种,不是均匀地应用于数据,而是仅应用于数据的特定子集(例如,每个用户)的窗口。

对于某些用例,可以用公共标识符在单个会话内标记数据,以确定哪些数据属于一个会话。在这种情况下,会话更容易构造,因为它基本上只是一种按键分组的形式。

然而,在更一般的情况下(在实际会话本身事先未知的情况下),会话必须由数据在时间内的位置独自构建。当处理无序数据时,这变得特别棘手。

在提供一般会话支持时,完整的会话窗口是由一组较小的重叠窗口组成的,每个窗口都包含单个记录,窗口序列中的每个记录与下一个记录之间的时间间隔不大于预定义的超时时间。因此,即使我们观察到会话中的数据处于无序状态,我们也可以简单地通过合并数据到达时的任何重叠窗口来构建最终会话。

看下面一段伪代码的,使用固定Gap分割Session,提前的数据固定周期更新结果,延迟的数据统计到就更新结果,累积结果并收缩前一步结果

PCollection<KV<String, Integer>> scores = input
    .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
        .triggering(
            AtWatermark()
                .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                .withLateFirings(AtCount(1)))
                .accumulatingAndRetractingFiredPanes())
    .apply(Sum.integersPerKey());

模拟执行结果如下图,并参考执行过程描述

  • 当遇到第一条记录5进入时,它将被放入一个原始会话窗口中,该窗口从这条记录的事件时间开始,并跨越会话间隙持续时间(session gap duration)的宽度(例如超出该数据发生点的一分钟)。将来的任何与此窗口重叠的窗口都应该是同一会话的一部分,并将合并到该会话中。

  • 到达的第二条记录是7,它同样被放入自己的原始会话窗口中,因为它与5所在的窗口没有重叠。

  • 与此同时,水印已经通过了第一个窗口的末尾,所以5值在 12:06 之前被具体化为准时结果(on-time result)。此后不久,当处理时间达到 12:06 时,第二窗口也被实现为具有值7的推测结果。

  • 接下来,我们观察一系列记录:3、4、3,它们都是相互重叠的原始会话。结果就是,它们全部合并在一起,并且当 12:07 的早期触发器启动时,发出一个值为10的窗口。

  • 当8此后不久到达时,它与值7的原始会话以及与值10的会话重叠。所有这三个回话合并在一起,形成一个新的会话与最新值25。

  • 当水印通过8这个会话的结尾时,它提交了新的会话与值25,以及对先前提交的两个窗口的缩回,-7和-10。

  • 当9晚到达时,会出现类似的现象,将具有值5的原始会话和值25的会话加入到新的值为39的单个较大会话中。对于5和25窗口,39和回缩都由后期数据触发器立即提交。

实际应用中可以将重点更多地放在手头有趣的业务逻辑上,而更少地放在将数据形成某种可用形式的细节上。


回顾一下之前的内容:

  • 事件时间与处理时间(event-time and processing-time):事件发生时间和数据处理系统观察到事件时间之间的最重要的区别。

  • 窗口化(windowing):通过沿着时间边界对无界数据进行切片来管理该数据的常用方法。

  • 水印(watermarks):表达事件时间中进度的强有力概念,它为无序处理系统操作无界数据时的完整性提供了一种推演方法。

  • 触发器(triggers):用于精确指定输出具体化的声明机制。

  • 累积(accumulation):在单个窗口随着它的发展而发生多次具体化的情况下,细化结果之间的关系。


原文:https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注

5 × 2 =