Spark 编程模型 共享变量

一般来说,当一个被传递给 Spark 操作的函数在一个远程节点上运行时,该函数实际上操作的是它用到的所有变量的独立副本。

这些变量默认会被复制到每一台机器,并且在远程机器上对变量的修改不会回传给 Driver 程序。当一台机器上有多个 Task 在不同的 Worker 上并发运行一个函数时,Worker 会为每一个 Task 缓存一份变量的副本。

当需要在任务中共享变量,Spark 提供了两种模式的共享变量:广播变量和累加器。

广播变量

广播变量允许程序保留一个只读的变量,缓存在每一台 Worker 节点,而不是每个 Task 发送一份副本。

广播变量通过调用 SparkContext.broadcast(v) 从变量v创建,可以通过调用 value 方法获得广播变量的值

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

广播变量被创建后,可以在集群运行的任何函数中用v值调用,v值在第一次调用后缓存到任务节点,重复调用时,不需要被再次传递到这些节点上。

v值不能在广播后修改,保证所有节点收到相同的广播值

累加器

累加器是一种只能进行“加”操作的变量,可以在并行计算中得到高效的支持。类似 MapReduce 中的 counter,实现计数和求和等功能。Spark 原生支持 Int 和 Double 类型的累加器。

累加器可以通过调用 SparkContext.accumulator(x) 从一个初始值v中创建。运行在集群上的任务,可以通过 += 进行累加,但不能读取。只有主程序可以使用 value 方法读取累加值。

Action 操作会触发一次计算

val accum = sc.accumulator(0, "Accumulator")
val data = sc.parallelize(1 to 10)

// 统计偶数出现的次数,偶数返回0,奇数返回1
val newData = data.map{ x => {
    if(x % 2 == 0) {
        accum += 1
        0
    } else 1
}}

// Action 操作触发执行
newData.count

// 此时 Accum 的值为5
accum.value

// foreach 也是 Action 操作
newData.foreach(println)

// 上个步骤没有进行累计器操作,可是累加器此时的结果已经是10了
accum.value

References:
《Spark 核心技术与高级应用》

Tags:

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注

17 + 3 =