优化你的Spark程序(一)[译]

当您通过API编写Apache Spark代码,您会遇到transformation,action和RDD等术语。了解这些术语的概念对于编写Spark程序至关重要。同样的,当问题出现或者查看Web UI试图了解您的应用程序为什么要花费这么长时间的时候,job、stage、和task这些术语也需要理解.要编写能够高效执行的Spark程序,了解Spark的底层执行模型非常有帮助。

在这篇文章中,您将了解如何在集群上实际执行Spark程序的基础知识。然后,您将获得一些实用的建议,了解Spark的执行模式是如何用于编写高效的程序。

Spark如何执行你的程序

Spark应用程序由单个driver进程和分散在群集节点上的一系列executor进程组成。

driver负责控制整个工作流。executor进程负责执行这些工作,以及用于存储用户选择缓存的任何数据。driver和executor通常都会在应用程序运行的整个时间内保持一致,动态资源分配可以对后者进行改变。一个executor具有多个用于执行task的插槽,并且会并行执行。在集群上部署这些进程取决于正在使用的集群管理器(YARN,Mesos或Spark Standalone),但driver和executor本身存在于每个Spark应用程序中。

spark-tuning-f1

在上图执行层次结构的顶部是作业(jobs)。在Spark应用程序中调用一个action触发作业以实现它。为了确定这个作业是什么样的,Spark会检查该action所依赖的RDD并制定一个执行计划。该计划从第一个RDD开始(这个RDD即不依赖于其他RDD或引用已缓存的数据的RDD)并最终达到产生action结果所需的那个RDD。

执行计划包括将job的转换(transformation)分解到各个阶段(stage)。stage是所有执行相同代码的task的集合(每个task处理不同的数据子集)。每个stage都包含一系列的无需shuffle的转换。

什么情况下数据需要shuffle?回想一下,RDD包括固定数量的分区,每个分区包括一些数据。对于窄依赖(narrow transformations)的RDD,如map或filter,计算单个分区中的计算所需的数据都在父RDD中的单个分区中。每一个父RDD的分区最多只被子RDD的一个分区所使用。虽然coalesce这类操作会通过多个分区作为输入源,但这个transformation仍然是窄依赖,因为用于计算数据只在是有限子集中。

当然,Spark还支持宽赖性的转换,例如groupByKey和reduceByKey。在这些依赖关系中,计算单个分区中的记录所需的数据可能在父RDD的许多分区中。所有具有相同key的元组必须在同一个分区中完成,并由相同的任务进行处理。为了满足这些操作,Spark必须执行一个shuffle,它会在群集中传输数据,并在一个新的stage创建一组新的分区。

例子,请查看以下代码

1
2
3
4
5
sc.textFile("someFile.txt").
map(mapFunc).
flatMap(flatMapFunc).
filter(filterFunc).
count()

它执行单个action,该代码将在一个stage中完成执行,因为代码中三个transformation操作的输出都依赖于单个父分区,而不需要来自不同分区的数据输入。

以下代码用来统计在文本当中出现超过1000次的单词的每个字母的数量。

1
2
3
4
5
6
val tokenized = sc.textFile(args(0)).flatMap(_.split(' '))
val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
val filtered = wordCounts.filter(_._2 >= 1000)
val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).
reduceByKey(_ + _)
charCounts.collect()

整个过程将分为三个stage。 reduceByKey操作为stage边界,因为计算其输出需要按key重新分配数据。

下面是一个更复杂的转换图,包括具有多个依赖关系的连接转换。

spark-tuning-f2

粉色框显示执行的各个阶段。

spark-tuning-f3

在每个阶段的边界,数据通过父阶段的任务写入磁盘,然后子阶段的任务通过网络获取。因为它们引起重度磁盘和网络I/O操作,所以阶段边界昂贵的,并且应在可能的时候尽量避免。父级中的数据分区数可能不同于子阶段的分区数。所以转化操作通常可以接受一个numPartitions参数,该参数决定了在子阶段将数据分割到多少个分区。

正如reducer的数量是调整MapReduce作业的重要参数一样,在stage边界调整分区数也会影响应用程序的性能。我们将在稍后部分中调深入研究如何整该数字。

选择正确的操作

当试图用Spark完成某些事情时,开发人员通常可以选择许多action和transformation,从而产生相同的结果。然而,他们的性能并不是一样的(避免常见的陷阱并挑选正确的操作,程序的性能也是不一样的)一些准则和意见有助于你做出这些选择。

最近的SPARK-5097让SchemaRDD趋于稳定,这将开启Spark的Catalyst优化器,从而使Spark能够对一些操作进行选择。当SchemaRDD成为稳定的组件时,用户就不必那么费劲的自己判断了。

选择正确操作的主要目的是减少shuffle次数和数量。这是因为shuffle是相当昂贵的操作;所有shuffle的数据必须写入磁盘,然后通过网络传输。repartition , join, cogroup以及任何*By*ByKey转换都可能导致shuffle。然而,并不是所有这些操作都是相同的,而对于新手Spark开发人员来说,最常见的性能缺陷之一就是选择了错误的操作.

  • 聚合操作时避免使用groupByKey例如rdd.groupByKey().mapValues(_.sum)将产生与rdd.reduceByKey(_ + _)相同的结果。然而,前者将通过网络传输整个数据集,而后者将计算每个分区中每个key,然后在本地求和,并且在混洗后将这些局部和组合为更大的和。
  • 当输入输出值不同的时候避免使用reduceByKey。例如找到与每个键对应的所有唯一字符串。一种方法是使用map将每个元素转换为一个Set,然后将SetreduceByKey组合:
1
2
rdd.map(kv => (kv._1, new Set[String]() + kv._2))
  .reduceByKey(_ ++ _)

此代码导致大量不必要的对象创建,因为必须为每个记录分配一个新的集合。最好使用aggregateByKey,它执行map的聚合操作更有效:

1
2
3
4
val zero = new collection.mutable.Set[String]()
rdd.aggregateByKey(zero)(
(set, v) => set += v,
(set1, set2) => set1 ++= set2)
  • 避免使用flatMap-join-groupBy的模式。当两个数据集已经按key分组并且想要join起来继续按key分组时,可以使用cogroup。这避免了与拆包和重包(unpacking and repacking)相关的所有开销。

不会发生shuffle的情况

了解上述transformation不会导致shuffle的情况也很有用。 如果前一个transformation的分区与现在的相同,那么Spark也会避免shuffle。考虑以下代码:

1
2
3
rdd1 = someRdd.reduceByKey(...)
rdd2 = someOtherRdd.reduceByKey(...)
rdd3 = rdd1.join(rdd2)

因为没有partitioner传递给reduceByKey,则使用默认的partitioner,rdd1和rdd2都进行哈希分区。这两个reduceByKey将导致两次shuffle。如果RDD具有相同数量的分区,则连接将不需要额外的shuffle。因为RDD分区相等,所以rdd1的任何一个分区中的key集只能与rdd2的单个分区关联。因此rdd3的单个分区就是rdd1中的单个分区的内容和rdd2中的单个分区内容,并不需要第三次shuffle。

举例来说,如果someRdd有四个分区,someOtherRdd有两个分区,然后每个reduceByKey使用三个分区,则执行起来看起来是这样的:

spark-tuning-f4

如果rdd1和rdd2使用不同的分区数或者使用默认的哈希分区,会怎么样?在这种情况下,分区数较少的那个rdd需要shuffle。
看这个转换,同样的输入,不同数量的分区:

spark-tuning-f5

采用广播变量的方法可以在两个数据集join的时候避免shuffle。只要该数据集较小并可放在executor里面即可。把它加载到driver里面的一个哈希表,然后广播到每个executor。map转换可以引用该哈希表并匹配。

什么时候Shuffle会带来好处

一些例外情况情,shuffle增加并行度以后可以增加性能。例如, 你有一些很大的不可分割的文件,因为分区数是根据InputFormat决定的。这种情况无法充分利用群集的计算资源(core),此时对其重新分区(增加分区量,会触发shuffle),之后的操作就会利用更多的群集的CPU。

另一种例外情况就是使用reduce或者aggregate操作的时候把数据聚合并传送到driver。在driver上进行聚合因为单线程的关系会造成瓶颈。要释放driver压力, 可以首先用 reduceByKeyaggregateByKey进行一轮分布式聚合。然后将结果发送给driver进行最后一次聚合。 可以看下treeReducetreeAggregate 如何实现的。(请注意,在本文撰写的时候Spark最新版为1.2,这些被标记为开发人员API,但SPARK-5430提到会将稳定版本添加到内核中。)

当聚合按key分组时,这个技巧特别有用。例如,考虑一个想要计算语料库中每个单词的出现的应用程序,并将结果作为map传送到driver中。一个方法是在每个分区计算其map,然后输出到driver进行map的合并。另外一个方法就是通过aggregateByKey以完全分布的方式执行计算,然后简单的使用collectAsMap把结果放到driver中。

二次排序

有一个叫做repartitionAndSortWithinPartitions的转换需要注意,这是一个听起来很神秘的转换,但似乎在各种奇怪的情况下出现。这种转换在shuffle的机器上进行排序,大量数据有效的切分并排序,可以与其他操作相结合。

例如,Apache Hive on Spark使用此转换进行join的实现。它也作为二次排序(secondary sort)模式中的重要的组成,比方说你希望通过key对记录进行分组。然后再对与key相对应的值进行迭代时计算时以特定顺序显示它们。这个问题出现在需要用户分组事件的算法中,然后根据它们在时间上发生的顺序来分析每个用户的事件。利用repartitionAndSortWithinPartition进行二次排序,目前需要用户多做一些操作,但SPARK-3655将大大简化。

结论

您现在应该很好地了解创建高性能Spark程序的基本要素!在第2部分中,我们将介绍资源请求调优,并行性和数据结构。

Sandy Ryza是Cloudera的数据科学家,Apache Spark提交者和Apache Hadoop PMC成员。他也是O’Reilly Advanced Analytics with Spark一书的合写者。

原文地址:How-to: Tune Your Apache Spark Jobs (Part 1)

打赏支持:支付宝/微信,感谢赏口饭吃