Spark性能调优
1、常规性能调优
1)常规性能调优一:最优资源配置
- 增加Executor个数
- 增加每个Executor的CPU core个数
- 增加每个Executor的内存量
- 在资源允许的情况下,增加每个Executor的内存量以后,对性能的提升有三点:
-
- 可以缓存更多的数据(即对RDD进行cache),写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘IO;
- 可以为shuffle操作提供更多内存,即有更多空间来存放reduce端拉取的数据,写入磁盘的数据相应减少,甚至可以不写入磁盘,减少 了可能的磁盘IO;
- 可以为task的执行提供更多内存,在task的执行过程中可能创建 很多对象,内存较小时会引发频繁的GC,增加内存后,可以避免频繁的GC,提升整 体性能。
-
- 在资源允许的情况下,增加每个Executor的内存量以后,对性能的提升有三点:
bin/spark-submit \
--class com.atguigu.spark.WordCount \
--master yarn\
--deploy-mode cluster\
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
--queue root.default \
--conf spark.core.connection.ack.wait.timeout=300 \
/usr/local/spark/spark.jar
运行资源优化配置 –num-executors
参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。
Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。
参数调优建议:每个Spark作业的运行一般设置50~100个
左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。
运行资源优化配置 -executor-memory
参数说明:该参数用于设置每个Executor进程的内存。
Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。
参数调优建议:每个Executor进程的内存设置4G~8G
较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors * executor-memory
,是不能超过队列的最大内存量的。
此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2
,避免你自己的Spark作业占用了队列所有的资源,导致别的同事的作业无法运行。
运行资源优化配置 -executor-cores
参数说明:该参数用于设置每个Executor进程的CPU core数量。
这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。
参数调优建议:Executor的CPU core数量设置为2~4
个较为合适。
同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。
同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores
不要超过队列总CPU core的1/3~1/2
左右比较合适,也是避免影响其他同事的作业运行。
运行资源优化配置 -driver-memory
参数说明:该参数用于设置Driver进程的内存。
参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。
唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理(或者是用map side join
操作),那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。
运行资源优化配置 -spark.default.parallelism
参数说明:该参数用于设置每个stage的默认task数量,也可以认为是分区数。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
参数调优建议:Spark作业的默认task数量为500~1000
个较为合适。
很多人常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。
通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。
试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!
因此Spark官网建议的设置原则是,设置该参数为num-executors*executor-cores的2~3倍
较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。
下面两个关于内存分配的配置其实在Spark 1.6之后就无需配置了,因为此时关于内存采用的是——统一内存管理,对于Storage和Execution占用的空间是可以两者之间动态调整的。
运行资源优化配置 -spark.storage.memoryFraction
参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6
。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。
但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。
此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
运行资源优化配置 -spark.shuffle.memoryFraction
参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2
。
也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比,避免shuffle过程中数据过多时内存不够用,溢写到磁盘上,降低了性能。
此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
2)常规性能调优二:RDD优化
RDD复用
RDD持久化
必须对多次使用 的RDD进行持久化,通过持久化将公共RDD的数据缓存到内存/磁盘中,之后对于公共RDD的计算都会从 内存/磁盘中直接获取RDD数据。
RDD尽可能早的filter操作
3)常规性能调优三:并行度调节
task数量应该设置为Spark作业总CPU core数量的2~3倍
Spark能够有效地支持短至200毫秒的任务,因为它可以在一个任务中重复使用一个执行器JVM,并且任务启动成本较低,因此可以安全地将并行级别提高到超过集群内核的数量。
4)常规性能调优四:广播大变量
默认情况下,task中的算子中如果使用了外部的变量,每个task都会获取一份变量的复本,这就造成了内存的极大消耗。一方面,如果后续对RDD进行持久化,可能就无法将RDD数据存入内存,只能写入磁盘,磁盘IO将会严重消耗性能;另一方面,task在创建对象的时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的GC,GC会导致工作线程停止,进而导致Spark暂停工作一段时间,严重影响 Spark性能。
广播变量在每个Executor保存一个副本,此Executor的所有task共用此广播变量,这让变量产生的副本数量大大减少。
5)常规性能调优五:Kryo序列化
默认情况下,Spark使用Java的序列化机制,但是,Java序列化机制的效率不高,序列化速度慢并且序列化后 的数据所占用的空间依然较大。
Kryo序列化机制比Java序列化机制性能提高10倍左右,Spark之所以没有默认使用Kryo作为序列化类库, 是因为它不支持所有对象的序列化,同时Kryo需要用户在使用前注册需要序列化的类型,不够方便,但从Spark 2.0.0版本开始,简单类型、简单类型数组、字符串类型的Shuling RDDs已经默认使用Kryo序列化方式了 。
//创建SparkConf对象
val conf = new SparkConf().setMaster(...).setAppName(...)
//使用Kryo序列化库,如果要使用Java序列化库,需要把该行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//在Kryo序列化库中注册自定义的类集合,如果要使用Java序列化库,需要把该行屏蔽掉
conf.set("spark.kryo.registrator", "ag.com.MyKryoRegistrator");
6)常规性能调优六:调节本地化等待时长
Spark的本地化等级如表所示:
名称 | 解析 |
---|---|
PROCESS_LOCAL | 进程本地化,task和数据在同一个Executor中,性能最好。 |
NODE_LOCAL | 节点本地化,task和数据在同一个节点中,但是task和数据不在同一个 Executor中,数据需要在进程间进行传输。 |
RACK_LOCAL | 机架本地化,task和数据在同一个机架的两个节点上,数据需要通过网络在节 点之间进行传输。 |
NO_PREF | 对于task来说,从哪里获取都一样,没有好坏之分 |
ANY | task和数据可以在集群的任何地方,而且不在一个机架中,性能最差。 |
在Spark项目开发阶段,可以使用client模式对程序进行测试,此时,可以在本地看到比较全的日志信息,日志信息中有明确的task数据本地化的级别,如果大部分都是PROCESS_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是NODE_LOCAL、ANY,那么需要对本地化的等待时长进行调节,通过延长本地化等待时长,看看task的本地化级别有没有提升,并观察Spark作业的运行时间有没有缩短。
注意,过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得Spark作业的运行时间反而增加了。
可以通过调整spark.locality.wait来设置等待时间,默认值是3s。
属性名 | 默认值 | 含义 |
---|---|---|
spark.locality.wait | 3s | 为了数据本地性最长等待时间(spark会根据数据所在位置,尽量让任务也启动于相同的节点,然而可能因为该节点上资源不足等原因,无法满足这个任务分配,spark最多等待这么多时间,然后放弃数据本地性)。数据本地性有多个级别,每一级别都是等待这么多时间(同一进程、同一节点、同一机架、任意)。你也可以为每个级别定义不同的等待时间,需要设置spark.locality.wait.node等。如果你发现任务数据本地性不佳,可以增加这个值,但通常默认值是ok的。 |
spark.locality.wait.node | spark.locality.wait | 单独定义同一节点数据本地性任务等待时间。你可以设为0,表示忽略节点本地性,直接跳到下一级别,即机架本地性(如果你的集群有机架信息)。 |
spark.locality.wait.process | spark.locality.wait | 单独定义同一进程数据本地性任务等待时间。这个参数影响试图访问特定执行器上的缓存数据的任务。 |
spark.locality.wait.rack | spark.locality.wait | 单独定义同一机架数据本地性等待时间。 |
Spark本地化等待时长的设置如代码所示:
val conf = new SparkConf().set("spark.locality.wait", "6")
2**、算子调优**
1)算子调优一:mapPartitions和map
当要把RDD中的所有数据通过JDBC写入数据,如果使用map算子,那么需要对RDD中的每一个元素都创建一个数据库连接,这样对资源的消耗很大,如果使用mapPartitions算子,那么针对一个分区的数据,只需要建立一个数据库连接。
mapPartitions算子也存在一些缺点:对于普通的map操作,一次处理一条数据,如果在处理了2000条数据后内存不足,那么可以将已经处理完的2000条数据从内存中垃圾回收掉;但是如果使用mapPartitions算子,但数据量非常大时,function一次处理一个分区的数据,如果一旦内存不足,此时无法回收内存,就可能会OOM,即内存溢出。
应该首先估算一下RDD的数据量、每个partition的数据量,以及分配给每个Executor的内存资源,如果资源允许,可以考虑使用mapPartitions算子代替map
2)算子调优二:foreachPartition优化数据库操作
如果使用foreach算子完成数据库的操作,由于foreach算子是遍历RDD的每条数据,因此,每条数据都会建立一个数据库连接,这是对资源的极大浪费,因此,对于写数据库操作,我们应当使用foreachPartition算子
3)算子调优三:filter与coalesce的配合使用
在filter操作之后,使用coalesce算子针对每个partition的数据量各不相同的情况,压缩partition的数量,而且让每个partition的数据量尽量均匀紧凑,以便于后面的task进行计算操作,在某种程度上能够在一定程度上提升性能
4)算子调优四:repartition解决SparkSQL低并行度问题
SparkSQL的并行度不允许用户自己指定,SparkSQL自己会默认根据hive表对应的HDFS文件的split个数自动设置SparkSQL所在的那个stage的并行度,用户自己通spark.default.parallelism参数指定的并行度,只会在没SparkSQL的stage中生效。
SparkSQL这一步的并行度和task数量肯定是没有办法去改变了,但是,对于SparkSQL查询出来的RDD,立即使用repartition算子,去重新进行分区,这样可以重新分区为多个partition,从repartition之后的RDD操作,由于不再设计SparkSQL,因此stage的并行度就会等于你手动设置的值,这样就避免了SparkSQL所在的stage只能用少量的task去处理大量数据并执行复杂的算法逻辑。
5)算子调优五:reduceByKey代替groupByKey以达到预聚合
reduceByKey相较于普通的shuffle操作一个显著的特点就是会进行map端的本地聚合,map端会先对本地的数据进行combine操作,然后将数据写入给下个stage的每个task创建的文件中,也就是在map端,对每一个key对应的value,执行reduceByKey算子函数
3、Shuffle调优
Shuffle Writer 阶段优化
Shuffle调优一:调节map端缓冲区大小
如果Shuffle的map端处理的数据量比较大,但是map端缓冲的大小是固定的,可能会出现map端缓冲数据频繁spill溢写到磁盘文件中的情况,使得性能非常低下,通过调节map端缓冲的大小,可以避免频繁的磁盘IO操作,进而提升Spark任务的整体性能
val conf = new SparkConf().set("spark.shuffle.file.buffer", "64")
Shuffle调优六:通过压缩map任务输出文件,来降低io成本
spark.shuffle.compress
是 Apache Spark 中的一个配置参数,它控制着在 Shuffle 过程中是否对数据进行压缩。这个参数的设置可以对性能和资源使用产生显著影响。
调优效果:
- 优点:
- 减少网络传输:压缩数据可以显著减少在 Shuffle 过程中网络传输的数据量,这对于网络带宽有限的环境尤其有益。
- 减少磁盘I/O:在数据需要 spill 到磁盘时,压缩可以减少磁盘空间的使用,提高磁盘I/O效率。
- 提高内存效率:在内存中压缩数据可以让更多的数据保留在内存中,减少数据 spill 到磁盘的次数。
- 缺点:
- 增加CPU负载:数据压缩和解压缩需要CPU资源,这可能会在CPU资源紧张的情况下增加额外的负载。
- 延迟增加:压缩和解压缩操作会增加数据处理的延迟,尤其是在数据量大的情况下。
使用情况:
- 在网络带宽有限或需要跨网络传输大量数据的场景中,启用压缩可以提高效率。
- 在内存资源紧张,但CPU资源相对充足的环境中,压缩可以帮助更多数据保留在内存中,减少磁盘I/O。
设置建议:
- 在设置
spark.shuffle.compress
参数时,需要权衡压缩带来的网络和磁盘效率提升与额外的CPU负载。如果集群的网络带宽是瓶颈,或者磁盘I/O是限制因素,那么启用压缩通常是有益的。 - 如果CPU资源紧张,或者数据量不大,那么可能不需要启用压缩,以避免额外的CPU开销。
- 在某些情况下,可以根据数据的压缩率(即压缩前后大小的比例)来决定是否启用压缩。如果数据可以被高效压缩(例如,文本数据通常比二进制数据更容易压缩),那么启用压缩可能更有意义。
调整 spill 阈值避免频繁的磁盘溢出操作
调整 Shuffle 阶段的溢出阈值 spark.shuffle.spill.threshold
可以根据具体的场景和需求来优化,但是要注意需要进行一定的实验和测试以确定最佳值。
- 优势:
- 减少磁盘溢出操作: 通过增加溢出阈值,可以减少在 Shuffle 阶段发生的频繁磁盘溢出操作,提高作业的执行效率。
- 降低内存压力: 合理调节溢出阈值可以降低内存中数据的存储压力,避免内存溢出导致的作业失败或性能下降。
- 局限性:
- 内存占用增加: 增加溢出阈值可能会导致内存中存储的数据量增加,进而增加内存占用,需要确保集群有足够的内存资源。
- 性能影响: 过大的溢出阈值可能会导致内存压力过大,反而影响了作业的性能,需要根据实际情况进行调整。
- 适用场景:
- 数据量较大的作业: 在数据量较大的作业中,适当增加溢出阈值可以有效减少磁盘溢出操作,提高作业的执行效率。
- 内存资源充足的环境: 在内存资源充足的集群环境中,可以适当增加溢出阈值,以提高内存利用率和作业执行速度。
- 需要降低磁盘 I/O 的场景: 对于需要减少磁盘 I/O 开销的作业,调节溢出阈值可以降低磁盘读写次数,提高作业的整体性能。
Shuffle Read 阶段优化
Shuffle调优二:调节reduce端拉取数据缓冲区大小
Spark Shuffle过程中,Shuffle reduce task的buer缓冲区大小决定了reduce task每次能够缓冲的数据量, 也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉 取数据的次数,也就可以减少网络传输的次数,进而提升性能。
val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "96")
Shuffle调优三:调节reduce端拉取数据重试次数
Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进 行重试。对于那些包含了特别耗时的Shuffle操作的作业,建议增加重试最大次数(比如6次),以避免 由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数 十亿~上百亿)的Shuffle过程,调节该参数可以大幅度提升稳定性
val conf = new SparkConf().set("spark.shuffle.io.maxRetries", "6")
Shuffle调优四:调节reduce端拉取数据等待间隔
Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进 行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(比如60s),以 增加Shuffle操作的稳定性
val conf = new SparkConf().set("spark.shuffle.io.retryWait", "60s")
Shuffle调优五:调节SortShuffle排序操作阈值
对于SortShuffleManager,如果Shuffle reduce task的数量小于某一阈值则Shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于Shuffle read task的数量,那么此时map-side就不会进行排序了,减少了排序的性能开销,但是这种方式下,依 然会产生大量的磁盘文件,因此Shuffle write性能有待提高。
val conf = new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "400")
4**、JVM调优**
ull gc/minor gc,都会导致JVM的工作线程停止工作,即stop the world
1**)JVM调优一:降低cache操作的内存占比**
(1)静态内存管理机制
在一般情况下,Storage的内存都提供给了cache操作,但是如果在某些情况下cache操作内存不是很紧张,而task的算子中创建的对象很多,Execution内存又相对较小,这回导致频繁的minor gc,甚至于频繁的full gc,进而导致Spark频繁的停止工作,性能影响会很大。
val conf = new SparkConf().set("spark.storage.memoryFraction", "0.4")
(2)统一内存管理机制
无需手动进行调节
2**)JVM调优二:调节Executor堆外内存**
默认情况下,Executor堆外内存上限大概为300多MB,在实际的生产环境下,对海量数据进行处理的时候,这里都会出现问题,导致Spark作业反复崩溃,无法运行,此时就会去调节这个参数,到至少1G, 甚至于2G、4G
Executor堆外内存的配置需要在spark-submit脚本里配置,如代码清单所示:
--conf spark.yarn.executor.memoryOverhead=2048
3**)JVM调优三:调节连接等待时长**
在生产环境下,有时会遇到file not found、file lost这类错误,在这种情况下,很有可能是Executor的 BlockManager在拉取数据的时候,无法建立连接,然后超过默认的连接等待时长60s后,宣告数据拉取失败,如果反复尝试都拉取不到数据,可能会导致Spark作业的崩溃。这种情况也可能会导致 DAGScheduler反复提交几次stage,TaskScheduler返回提交几次task,大大延长了我们的Spark作业的运行时间
--conf spark.core.connection.ack.wait.timeout=300