Flink 1.6.0 DataSet API

Data Source

Data Source 创建初始数据集。Flink 附带了几种内置输入格式,可以从通用文件格式创建数据集。ExecutionEnvironment 上有创建的方法。

基于文件的:

  • readTextFile(path) / TextInputFormat,按行读取文件并将其作为字符串返回。

  • readTextFileWithValue(path) / TextValueInputFormat,按行读取文件并将其作为 StringValues 返回。StringValues是可变字符串。

  • readCsvFile(path) / CsvInputFormat,按行读取文件,解析逗号(或其他字符)分隔,并返回元组或对象的数据集。

  • readFileOfPrimitives(path, delimiter) / PrimitiveInputFormat,解析基本数据类型(字符串或整数)的文件,以换行符(或其他字符)分隔。

  • readSequenceFile(Key, Value, path) / SequenceFileInputFormat,从指定路径读取解析 SequenceFile,并返回 <key, value> 元组。

基于集合:

  • fromCollection(Seq),用对象序列创建数据集。集合中的所有元素必须属于同一类型。

  • fromCollection(Iterator),用迭代器创建数据集。指定迭代器返回的元素的数据类型。

  • fromElements(elements: _*),根据给定的对象序列创建数据集。所有对象必须属于同一类型。

  • fromParallelCollection(SplittableIterator),并行地从迭代器创建数据集。指定迭代器返回的元素的数据类型。

  • generateSequence(from, to),并行生成给定间隔的数字序列。

通用:

  • readFile(inputFormat, path) / FileInputFormat,接受文件输入格式。

  • createInput(inputFormat) / InputFormat,接受通用输入格式。

示例代码:

val env  = ExecutionEnvironment.getExecutionEnvironment

// read text file from local files system
val localLines = env.readTextFile("file:///path/to/my/textfile")

// read text file from a HDFS running at nnHost:nnPort
val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")

// read a CSV file with three fields
val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")

// read a CSV file with five fields, taking only two of them
val csvInput = env.readCsvFile[(String, Double)](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// CSV input can also be used with Case Classes
case class MyCaseClass(str: String, dbl: Double)
val csvInput = env.readCsvFile[MyCaseClass](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// read a CSV file with three fields into a POJO (Person) with corresponding fields
val csvInput = env.readCsvFile[Person](
  "hdfs:///the/CSV/file",
  pojoFields = Array("name", "age", "zipcode"))

// create a set from some given elements
val values = env.fromElements("Foo", "bar", "foobar", "fubar")

// generate a number sequence
val numbers = env.generateSequence(1, 10000000)

// read a file from the specified path of type SequenceFileInputFormat
val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text],
 "hdfs://nnHost:nnPort/path/to/file")

读压缩文件

Flink 目前支持输入文件的透明解压缩,如果文件标有适当的文件扩展名。这意味着不需要进一步配置输入格式,并且任何 FileInputFormat 支持压缩,包括自定义输入格式。压缩文件可能无法并行读取,从而影响作业可伸缩性。当前支持的压缩方法:

压缩方法 文件扩展名 可并行
DEFLATE .deflate no / not
GZip .gz,.gzip no / not
Bzip2 .bz2 no / not
XZ .xz no / not

Transformation

DataSet 的基础算子与 DataStream 算子大致相同,可以互作参考:

DataSet

Map
一个数据元生成一个新的数据元

data.map { x => x.toInt }

FlatMap
一个数据元生成多个数据元(可以为0)

data.flatMap { str => str.split(" ") }

MapPartition
函数处理包含一个分区所有数据的“迭代器”,可以生成任意数量的结果值。每个分区中的元素数量取决于并行度和先前的算子操作。

data.mapPartition { in => in map { (_, 1) } }

Filter
每个数据元执行布尔函数,只保存函数返回 true 的数据元。

data.filter { _ > 1000 }

Distinct
对数据集中的元素除重并返回新的数据集。

val input: DataSet[(Int, String, Double)] = // [...]
val output = input.distinct()

// Distinct with field position keys
val input: DataSet[(Int, Double, String)] = // [...]
val output = input.distinct(0,2)

// Distinct with KeySelector function
val input: DataSet[Int] = // [...]
val output = input.distinct {x => Math.abs(x)}

// Distinct with key expression
case class CustomType(aName : String, aNumber : Int) { }

val input: DataSet[CustomType] = // [...]
val output = input.distinct("aName", "aNumber")

Reduce
作用于整个 DataSet,合并该数据集的元素。

val intNumbers = env.fromElements(1,2,3)
// 输出 6
val sum = intNumbers.reduce (_ + _)

Aggregate
对一组数据求聚合值,聚合可以应用于完整数据集或分组数据集。聚合转换只能应用于元组(Tuple)数据集,并且仅支持字段位置键进行分组。有一些常用的聚合算子,提供以下内置聚合函数(Aggregations):
– Sum
– Min
– Max

val input: DataSet[(Int, String, Double)] = env.fromElements(
  (1, "a", 10d), (1, "b", 20d), (2, "a", 30d), (3, "c", 50d)
)

val output: DataSet[(Int, String, Double)] = input.aggregate(Aggregations.SUM, 0).aggregate(Aggregations.MIN, 2)

// 输出 (7,c,50.0)


// 简化语法
val output: DataSet[(Int, String, Double)] = input.sum(0).min(2)

MinBy / MaxBy
取元组数据集中指定一个或多个字段的值最小(最大)的元组,可以应用于完整数据集或分组数据集。用于比较的字段必须可比较的。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。

val input: DataSet[(Int, String, Double)] = env.fromElements(
  (1, "b", 20d), (1, "a", 10d), (2, "a", 30d)
)

// 比较元组的第一个字段
val output: DataSet[(Int, String, Double)] = input.minBy(0)
// 输出 (1,b,20.0)


// 比较元组的第一、三个字段
val output: DataSet[(Int, String, Double)] = input.minBy(0,2)
// 输出 (1,a,10d)

Grouped DataSet

Grouped DataSet 方法 groupBy() 用来将数据分组,有多种数据分组方法:
– 对于 Pojo 类型,可以根据 KeyExpression 或 KeySelector 分区

class WC(val word: String, val count: Int) {
  def this() {
    this(null, -1)
  }
  // [...]
}

val words: DataSet[WC] = // [...]

// Grouped by Key Expression
val wordCounts1 = words.groupBy("word")

// Grouped by KeySelector Function
val wordCounts2 = words.groupBy { _.word } 
  • 对于元组(Tuple)类型,可以根据字段位置分组
val tuples = DataSet[(String, Int, Double)] = // [...]

// 根据元组的第一和第二个字段分组
val reducedTuples = tuples.groupBy(0, 1)
  • 分组并排序:
val input: DataSet[(Int, String)] = // [...]
val output = input.groupBy(0).sortGroup(1, Order.ASCENDING)

Reduce
作用于整个分组元素,合并该组的所有元素。

val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy("word").reduce {
  (w1, w2) => new WC(w1.word, w1.count + w2.count)
}

ReduceGroup
通过将此数据集中的所有元素传递给函数,创建一个新的数据集。该函数可以使用收集器输出零个或多个元素。也可以作用与完整数据集,迭代器会返回完整数据集的元素。

val input: DataSet[(Int, String)] = env.fromElements((1, "a"), (1, "b"), (2, "a"), (3, "c"))
val output = input.groupBy(0).reduceGroup {
    (in, out: Collector[(Int, String)]) =>
        out.collect(in.reduce((x, y) => (x._1, x._2 + y._2)))
}

// 输出 (3,c),(1,ab),(2,a)

Aggregate
聚合可以应用于分组数据集。

val input: DataSet[(Int, String, Double)] = env.fromElements(
      (1, "a", 10d), (1, "b", 20d), (2, "a", 30d), (3, "c", 50d)
    )

val output: DataSet[(Int, String, Double)] = input.groupBy(1).sum(0).max(2)

// 输出 (3,a,50.0)

MinBy / MaxBy
可以应用于分组数据集

 val env = ExecutionEnvironment.getExecutionEnvironment
val input: DataSet[(Int, String, Double)] = env.fromElements(
  (1, "b", 20d), (1, "a", 10d), (2, "a", 30d)
)

// 按第二个字段分组,取第一、三个字段最小的元组
val output: DataSet[(Int, String, Double)] = input.groupBy(1)
  .minBy(0, 2)

// 输出 (1,a,10.0),(1,b,20.0)

Join

将两个 DataSet 连接生成一个新的 DataSet。

Join
两个数据集指定的要连接的 key,进行 join,默认是一个 inner join。可以使用 JoinFunction 将该组连接元素转化为单个元素,也可以使用 FlatJoinFunction 将该组元素转化为任意多个元素(包括 none)。

// where("0") 表示使用input1的第一个字段连接
// equalTo("1") 表示使用input2的第二个字段,判断等于input1的第一个字段的值
val result = input1.join(input2).where(0).equalTo(1)

可以通过 JoinHint 参数来指定运行时执行连接的方式。参数描述了 join 是通过分区(partitioning)还是广播(broadcasting)发生的,以及使用算法是基于排序(sort-based)的还是基于哈希(hash-based)的。如果没有指定 JoinHint,系统将尝试对输入大小进行评估,并根据这些评估选择最佳策略。

类似 Spark SQL 的 join 逻辑,会根据要连接的两个数据的大小,进行优化。如果不是非常了解那种连接方式在什么场景下更优,建议由系统选择,不要指定。

// 广播input1,并使用 hash table 的方式
val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
  .where(0).equalTo(1)

// JoinHint 可选项:
// OPTIMIZER_CHOOSES,由系统判断选择
// BROADCAST_HASH_FIRST,第一个数据集构建哈希表并广播,由第二个表扫描。适用于第一个数据集较小的情况
// BROADCAST_HASH_SECOND,适用于第二个数据集较小的情况
// REPARTITION_HASH_FIRST,对两个数据同时进行分区,并从第一个输入构建哈希表。如果第一个输入小于第二个输入,则此策略很好。 
// REPARTITION_HASH_SECOND,适用于第二个输入小于第一个输入。
// REPARTITION_SORT_MERGE,对两个数据同时进行分区,并对每个输入进行排序(除非数据已经分区或排序)。输入通过已排序输入的流合并来连接。如果已经对一个或两个输入进行过分区排序的情况,则此策略很好。

// REPARTITION_HASH_FIRST 是系统使用的默认回退策略,如果不能进行大小估计,并且不能重新使用预先存在的分区和排序顺序。

为了引导优化器选择正确的执行策略,可以提示要关联的 DataSet 的大小:

val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Int, String)] = // [...]

// 表示第二个数据集 input2 特别小
val result1 = input1.joinWithTiny(input2).where(0).equalTo(0)

// 表示第二个数据集 input2 特别大
val result1 = input1.joinWithHuge(input2).where(0).equalTo(0)

对连接成功的数据有两种处理方式(类似 map 和 flatMap),以下面两个数据集连接为例:

case class Rating(name: String, category: String, points: Int)

val ratings: DataSet[Ratings] = // [...]
val weights: DataSet[(String, Double)] = // [...]
  • Join with Join Function,接收第一个数据集的一个数据元和第二个数据集的一个数据元,并返回一个数据元
    val weightedRatings = ratings
    .join(weights)
    .where("category")
    .equalTo(0) {
    (rating, weight) => (rating.name, rating.points * weight._2)
    }
  • Join with Flat-Join Function,返回零个、一个或多个数据元
    val weightedRatings = ratings
    .join(weights)
    .where("category")
    .equalTo(0) {
    (rating, weight, out: Collector[(String, Double)]) =>
    if (weight._2 > 0.1) out.collect(rating.name, rating.points * weight._2)
    }

    ⚠️ Join 仅适用于等于连接的情况,其他连接类型需要使用 OuterJoin 或 CoGroup。

OuterJoin
多两个数据集执行左连接(leftOuterJoin)、右连接(rightOuterJoin)或全外连接(fullOuterJoin)。与 Join(inner join)的区别在于,如果在另一侧没有找到匹配的数据,则保存左侧(或右侧、两侧)的记录。

input1.leftOuterJoin(input2)
      .where(0)              
      .equalTo(1)            
      .with(new JoinFunction<String, String, String>() {
          public String join(String v1, String v2) {
             // NOTE:
             // - v2 might be null for leftOuterJoin
             // - v1 might be null for rightOuterJoin
             // - v1 OR v2 might be null for fullOuterJoin
          }
      });

CoGroup
Reduce 操作的二维变体。对一个或多个字段中的每个输入进行分组,然后加入组。每对组调用转换函数。

val iVals: DataSet[(String, Int)] = env.fromElements(("a", 10), ("b", 20), ("a", 30))
val dVals: DataSet[(String, Double)] = env.fromElements(("a", 1.0), ("b", 2.0), ("c", 3.0))

// iVals 第一个字段与 dVals 第一个字段连接
val output: DataSet[Double] = iVals.coGroup(dVals).where(0).equalTo(0) {
  (iVals, dVals, out: Collector[Double]) =>
    // iVals [("a",10),("a",30)]
    val ints = iVals map {
      _._2
    } toSet

    // dVals [("a", 1.0)]
    for (dVal <- dVals) {
      for (i <- ints) {
        out.collect(dVal._2 * i)
      }
    }
}

// 输出 10.0,30.0,40.0

Union
构建两个数据集的并集。

data.union(data2)

Cross
构建两个输入数据集的笛卡尔积。可选择使用 CrossFunction 将元素对转换为单个元素。

val data1: DataSet[Int] = // [...]
val data2: DataSet[String] = // [...]
val result: DataSet[(Int, String)] = data1.cross(data2)

Cross 是一个计算密集型操作,对大型数据集会带来挑战。建议使用 crossWithTiny()crossWithHuge() 优化。

Repartition

Rebalance
均匀地重新负载数据集的并行分区以消除数据偏差。后面只可以接类似 map 的算子操作。

val in: DataSet[String] = // [...]
val out = in.rebalance().map { ... }

Hash-Partition
根据给定的 key 对数据集做 hash 分区。可以是 position keys,expression keys 或者 key selector functions。

val in: DataSet[(String, Int)] = // [...]
val out = in.partitionByHash(0).mapPartition { ... }

Range-Partition
根据给定的 key 对一个数据集进行 Range 分区。可以是 position keys,expression keys 或者 key selector functions。

val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByRange(0).mapPartition { ... }

Custom Partitioning
手动指定数据分区。此方法仅适用于单个字段的 key。

val in: DataSet[(Int, String)] = // [...]
val result = in.partitionCustom(partitioner: Partitioner[K], key)

其他

Sort Partition
本地以指定的顺序在指定的字段上对数据集的所有分区进行排序。可以指定 field position 或 filed expression。

val in: DataSet[(Int, String)] = // [...]
val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... }

First-n
返回数据集的前n个元素。可以应用于任意数据集。

val in: DataSet[(Int, String)] = // [...]
// DataSet
val result1 = in.first(3)
// Grouped DataSet
val result2 = in.groupBy(0).first(3)
// Grouped-sorted DataSet
val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)

Project
Java API 支持,Scala API 不支持,作用于元组的转换,从元组中选择字段的子集。

DataSet<Tuple3<Integer, Double, String>> in = // [...]
// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
DataSet<Tuple2<String, Integer>> out = in.project(2,0);

Data Sink

Data Sink 从 DataSet 中取出数据保存或者返回。Flink 各种内置的输出格式,在 DataSet 上的算子操作后面调用:

  • writeAsText() / TextOutputFormat,将元素以字符串形式写入文件。字符串通过调用每个元素的 toString() 方法获得。

  • writeAsCsv(...) / CsvOutputFormat,将元组字段以逗号分隔写入文件。行和字段分隔符是可配置的。每个字段的值来自对象的 toString() 方法。

  • print() / printToErr(),在标准输出/标准错误输出中打印每个元素的 toString() 返回值。

  • write() / FileOutputFormat,自定义文件输出的方法和基类。支持自定义对象到字节转换。

  • output() / OutputFormat,通用输出方法,用于非基于文件的数据接收器。

示例代码:

// text data
val textData: DataSet[String] = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS")

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE)

// tuples as lines with pipe as the separator "a|b|c"
val values: DataSet[(String, Int, Double)] = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file")

// this writes values as strings using a user-defined formatting
values map { tuple => tuple._1 + " - " + tuple._2 }
  .writeAsText("file:///path/to/the/result/file")

本地排序输出

可以使用元组字段位置(field position)或字段表达式(field expression)在指定字段上对数据接收器的输出进行本地排序。这适用于每种输出格式。

尚不支持全局排序的输出。

val tData: DataSet[(Int, String, Double)] = // [...]
val pData: DataSet[(BookPojo, Double)] = // [...]
val sData: DataSet[String] = // [...]

// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print()

// sort output on Double field in descending and Int field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print()

// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...)

// sort output on the full tuple in ascending order
tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...)

// sort atomic type (String) output in descending order
sData.sortPartition("_", Order.DESCENDING).writeAsText(...)

Iteration

迭代在 Flink 程序中实现循环。迭代运算符封装了程序的一部分并重复执行,将一次迭代的结果(部分结果)反馈到下一次迭代中。Flink 两种迭代类型:==BulkIteration== 和 ==DeltaIteration==。

批量迭代(Bulk Iteration)

调用 DataSet 的 iterate(int) 方法创建一个 BulkIteration,迭代以此为起点,返回一个 IterativeDataSet,可以用常规运算符进行转换。迭代调用的参数 int 指定最大迭代次数。

IterativeDataSet 调用 closeWith(DataSet) 方法来指定哪个转换应该反馈到下一个迭代,可以选择使用 closeWith(DataSet,DataSet) 指定终止条件。如果该 DataSet 为空,则它将评估第二个 DataSet 并终止迭代。如果没有指定终止条件,则迭代在给定的最大次数迭代后终止。

以下示例迭代地估计数量Pi。目标是计算落入单位圆的随机点数。在每次迭代中,挑选一个随机点。如果此点位于单位圆内,增加计数。然后估计 Pi 作为结果计数除以迭代次数乘以4。

val env = ExecutionEnvironment.getExecutionEnvironment()

val initial = env.fromElements(0)

val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
  val result = iterationInput.map { i =>
    val x = Math.random()
    val y = Math.random()
    i + (if (x * x + y * y < 1) 1 else 0)
  }
  result
}

val result = count map { c => c / 10000.0 * 4 }

result.print()

env.execute("Iterative Pi Example")

可以查看 K-Means示例,该示例使用 BulkIteration 来聚类一组未标记的点。

增量迭代(Delta Iteration)

DeltaIteration 利用了某些算法在每次迭代中不会更改解的每个数据点的特点。

除了在每次迭代中反馈的部分解决方案之外,还在迭代中维护状态,可以通过增量更新。迭代计算的结果是最后一次迭代之后的状态。参考 迭代的基本原理

定义 DeltaIteration 类似于定义 BulkIteration。两个数据集构成每次迭代的输入(工作集和解集),并且在每次迭代中生成两个数据集作为结果(新工作集,增量解集)。

调用初始解决方案集的 iterateDelta(initialWorkset, maxIterations, key) 方法创建一个 DeltaIteration:

val initialSolutionSet: DataSet[(Long, Double)] = // [...]

val initialWorkset: DataSet[(Long, Double)] = // [...]

val maxIterations = 100
val keyPosition = 0

val result = initialSolutionSet.iterateDelta(initialWorkset, maxIterations, Array(keyPosition)) {
  (solution, workset) =>
    val candidateUpdates = workset.groupBy(1).reduceGroup(new ComputeCandidateChanges())
    val deltas = candidateUpdates.join(solution).where(0).equalTo(0)(new CompareChangesToCurrent())

    val nextWorkset = deltas.filter(new FilterByThreshold())

    (deltas, nextWorkset)
}

result.writeAsCsv(outputPath)

env.execute()

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/

Tags:

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注

4 − 1 =