Flink 数据类型及序列化(1.13)

[toc]

Apache Flink 以一种独特的方式处理数据类型和序列化(构建了自己的类型系统,包含类型描述、泛型类型提取和序列化框架)。易于做类型检查,根据类型选取序列化方式,节省存储空间等。

Supported Data Types

Flink 现阶段(1.13)支持的类型:
– Java Tuples and Scala Case Classes
– Java POJOs
– Primitive Types
– Regular Classes
– Values
– Hadoop Writables
– Special Types

Tuples and Case Classes

Tuple 是一个组合数据类型,包含了固定数量的特定类型的字段(字段可以是任意 Flink 支持的类型,包括 Tuple)。Java API 支持 Tuple0 到 Tuple25(数字表示包含的字段数量个数),不支持 null 值。

// Java Code
// Tuple 字段的访问 tuple.f0 或 tuple.getField(0)

DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
    new Tuple2<String, Integer>("hello", 1),
    new Tuple2<String, Integer>("world", 2));

wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
    @Override
    public Integer map(Tuple2<String, Integer> value) throws Exception {
        return value.f1;
    }
});

wordCounts.keyBy(value -> value.f0);

在 Scala 中使用 Case Class

// Scala Code
// Scala 中 Tuple 是 Case Class 的一种
case class WordCount(word: String, count: Int)
val input = env.fromElements(
    WordCount("hello", 1),
    WordCount("world", 2)) // Case Class Data Set

input.keyBy(_.word)

val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set

input2.keyBy(value => (value._1, value._2))

Java POJOs

如果 Java 和 Scala 类满足以下要求,将被视为特殊的 POJO 数据类型:
– 必须用 public 修饰
– 必须要有一个 public 无参构造函数
– 所有字段要么是 public 的,要么必须要有 setter 和 getter 方法
– 类型必须是 Flink 支持的

Flink 可以比一般类型更有效地处理 Pojo。

// Java Code

public class WordWithCount {
    public String word;
    public int count;

    public WordWithCount() {}

    public WordWithCount(String word, int count) {
        this.word = word;
        this.count = count;
    }
}

DataStream<WordWithCount> wordCounts = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2));

wordCounts.keyBy(value -> value.word);
// Scala Code
class WordWithCount(var word: String, var count: Int) {
    def this() {
      this(null, -1)
    }
}

val input = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2)) // Case Class Data Set

input.keyBy(_.word)

Primitive Types

Flink 支持所有 Java 和 Scala 类型,如 Integer/int、String 和 Double/double。

Regular Classes

Flink 支持大多数 Java 和 Scala 类)。不包含带有无法序列化的字段的类(如文件指针、I/O流等)。遵循 Java Bean 规范的类通常支持得很好。

所有未标识为 POJO 类型的类,都作为一般类型处理。Flink 将这些数据类型视为黑盒,无法访问它们的内容(例如,排序的场景)。一般类型是使用序列化框架 Kryo 序列化和反序列化。

Values

Value 类型指定了序列化和反序列化方式,没有通用的序列化框架(通过继承 org.apache.flink.types.Value 接口,实现 read() 和 write() 方法)。

当通用的序列化框架效率较低时(针对特殊的数据类型的场景),可以使用。

基础类型有对应的预定义的 Value 类型(IntValue、DoubleValue、StringValue…),可以被重写。

Hadoop Writables

实现了 org.apache.hadoop.Writable 接口的类型,实现 readFields() 和 write() 方法用于序列化。

Special Types

Scala 中的 Either、Option、Try,Java API 实现了 Either。对于错误处理或需要输出两种不同类型记录的算子,这种类型都很有用。

类型擦除(Type Erasure)与类型推断(Type Inference)

仅与 Java API 有关

Java 编译器在编译之后丢弃了许多泛型类型信息,在 Java 中称为类型擦除。也就是在运行时,DataStream 和 DataStream 类型的实例在 JVM 看来是相同的。

Flink 在调用 main 方法时,需要类型信息。Flink Java API 会尝试重构类型信息,并将其显式存储在数据集和操作符中。可以通过 DataStream.getType() 检索类型(返回 TypeInformation 对象,即 Flink 内部表示类型的方式)。

但是类型推断有局限性,在一些场景下需要程序显示提供。ResultTypeQueryable 接口可以明确地告诉 API 返回类型信息。

Flink 中的类型处理

Flink 会尽力推断有关数据类型的信息,这些类型信息可以帮助 Flink 实现一些特性:
– Flink 对数据类型了解的越多,序列化和数据布局方案就越好。
– 可以使用户在大多数情况下免于担心序列化框架以及类型注册。

通常,类型信息在应用运行之前的阶段(pre-flight phase)需要,也就是在程序对 DataStream 或者 DataSet 的操作调用之后,在 execute()、print()、count()、collect() 调用之前。

最常见的问题

用户需要与处理数据类型的最常见问题是:
– 注册子类型(Registering subtypes):如果函数签名只包含 supertypes,但实际在执行期间使用他们的子类型,则使 Flink 感知这些子类型可能会提高性能。可以为每一个子类型调用 StreamExecutionEnvironment 或者 ExecutionEnvironment 的 registerType(clazz) 方法。
– 注册自定义序列化器(Registering custom serializers):当 Flink 无法通过自身处理类型时会使用到 Kryo 进行序列化处理。 并非所有的类型都可以被 Kryo 处理。例如谷歌的 Guava 集合类型默认情况下是没办法很好处理的。 解决方案是为这些引起问题的类型注册额外的序列化器。调用 StreamExecutionEnvironment 或者 ExecutionEnvironment 的 .getConfig().addDefaultKryoSerializer(clazz, serializer) 方法注册 Kryo 序列化器。更多可以参考(https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/custom_serializers/)
– 添加类型提示(Adding Type Hints):有时, Flink 无法推断出泛型类型,用户需要提供类型提示。通常只在 Java API 中需要。
– 手动创建 TypeInformation: 可能是某些 API 调用所必需的,因为 Java 的泛型类型擦除会导致 Flink 无法推断出数据类型。

Flink 的 TypeInformation 类

TypeInformation 类是所有类型描述符的基类。提示类型的一些基本属性,可以生成序列化器,还可以生成比较器。

Flink 内部对类型做了如下区分:
– 基础类型(Basic types):所有的 Java 基础类型以及他们的包装类,再加上 void、String、Date、BigDecimal 以及 BigInteger。
– 数组类型(Array):基础类型数据和对象数组
– 复合类型(Composite types):
– Tuples(Java API)
– Case class(Scala)
– Row,具有任意数量字段的元组并且支持 null 字段。。
– POJO
– 辅助类型(Auxiliary types):Option、Either、Lists、Maps 等
– 泛型(Generic types)

POJOs 是特别好用的,他们支持复杂类型的创建以及在键的定义中直接使用字段名: dataSet.join(another).where(“name”).equalTo(“personName”)。并且 Flink 运行时可以非常高效地处理。

以上类型,都继承 TypeInformation 类表示。

TypeInformation 的一个重要的功能就是创建 TypeSerializer 序列化器,为该类型的数据做序列化。每种类型都有一个对应的序列化器来进行序列化。


https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/serialization/types_serialization/

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/

https://developer.aliyun.com/article/721997?spm=a2c6h.14164896.0.0.48231f28tgVoBQ

Tags:

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注