Spark函数讲解:combineByKey

本文案例来自http://apachesparkbook.blogspot.hk,增删了一些内容,希望有助于理解。

combineByKey()是最为常用的基于键进行聚合的函数。大多数基于键聚合的函数都是用它实现的。和aggregate()一样,combineByKey()可以让用户返回与输入数据的类型不同的返回值。

要理解combineByKey(),要先理解它在处理数据时是如何处理每个元素的。由于combineByKey()会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。

如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值。需要注意的是,这一过程会在每个分区中第一次出现各个键时发生,而不是在整个RDD中第一次出现一个键时发生。

如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并

由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。

语法

def combineByKeyC ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
Generic function to combine the elements for each key using a custom set of aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C Note that V and C can be different – for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:

createCombiner, which turns a V into a C (e.g., creates a one-element list) - mergeValue, to merge a V into a C (e.g., adds it to the end of a list) - mergeCombiners, to combine two C’s into a single one.

In addition, users can control the partitioning of the output RDD, and whether to perform map-side aggregation (if a mapper can produce multiple items with the same key).

第一个参数: createCombiner在找到给定分区中第一次碰到的key(在RDD元素中)时被调用。此方法为这个key初始化一个累加器。
第二个参数: mergeValue当累加器已经存在的时候(也就是上面那个key的累加器)调用。
第三个参数: mergeCombiners如果哪个key跨多个分区,该参数就会被调用。

示例

让我们来计算每一项的平均值。

scala> val inputrdd = sc.parallelize(Seq(
                        ("maths", 50), ("maths", 60),
                        ("english", 65),
                        ("physics", 66), ("physics", 61), ("physics", 87)), 
                        1)
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[41] at parallelize at <console>:27

scala> inputrdd.getNumPartitions                      
res55: Int = 1

scala> val reduced = inputrdd.combineByKey(
         (mark) => {
           println(s"Create combiner -> ${mark}")
           (mark, 1)
         },
         (acc: (Int, Int), v) => {
           println(s"""Merge value : (${acc._1} + ${v}, ${acc._2} + 1)""")
           (acc._1 + v, acc._2 + 1)
         },
         (acc1: (Int, Int), acc2: (Int, Int)) => {
           println(s"""Merge Combiner : (${acc1._1} + ${acc2._1}, ${acc1._2} + ${acc2._2})""")
           (acc1._1 + acc2._1, acc1._2 + acc2._2)
         }
     )
reduced: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[42] at combineByKey at <console>:29

scala> reduced.collect()
Create combiner -> 50
Merge value : (50 + 60, 1 + 1)
Create combiner -> 65
Create combiner -> 66
Merge value : (66 + 61, 1 + 1)
Merge value : (127 + 87, 2 + 1)
res56: Array[(String, (Int, Int))] = Array((maths,(110,2)), (physics,(214,3)), (english,(65,1)))

scala> val result = reduced.mapValues(x => x._1 / x._2.toFloat)
result: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[43] at mapValues at <console>:31

scala> result.collect()
res57: Array[(String, Float)] = Array((maths,55.0), (physics,71.333336), (english,65.0))

注意:本例中因为只有一个分区所以mergeCombiners并没有用到。

参考:Spark API 之 combineByKey(一)

打赏支持:支付宝/微信,感谢赏口饭吃