一、开发调优
-
复用RDD并进行持久化
对于同一份数据的读取,避免创建多个RDD,尽可能复用,并且对于多次使用的RDD进行持久化,避免重复计算。
错误写法
val rdd1 = sc.textFile("hello.txt")
rdd1.map(...).count()
val rdd2 = sc.textFile("hello.txt")
rdd2.reduce(...).count()
正确写法
//不持久化,Spark默认还是会读取两次数据
val rdd = sc.textFile("hello.txt").cache()//=.persist(StorageLevel.MEMORY_ONLY)
//遇到action算子时触发持久化
rdd.map(...).count()
//从内存中读取持久化的RDD复用
rdd.reduce(...).count()
持久化级别 |
使用场景 |
MEMORY_ONLY |
默认的持久化策略,使用未序列化的Java对象格式,将数据保存在内存中,如果内存不够存放所有的数据,则数据就不会进行持久化。 |
MEMORY_AND_DISK |
使用未序列化的Java对象格式,优先尝试将数据保存在内存中,如果内存不够存放所有的数据,会将数据写入磁盘文件中。 |
MEMORY_ONLY_SER |
使用序列化的方式,将RDD的每个partition序列化成一个字节数组保存在内存中,更加节省内存,避免持久化的数据占用过多内存导致频繁GC,不过多了序列化和反序列化的开销。 |
MEMORY_AND_DISK_SER |
使用序列化的方式,将数据保存在内存和磁盘中。 |
DISK_ONLY |
使用未序列化的方式,将数据全部写入磁盘文件中。 |
MEMORY_ONLY_2 MEMORY_AND_DISK_2 |
对于上述任意一种持久化策略,如果加上后缀_2,表示将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上,保证持久化数据的高可用性。 |
如何选择:考虑内存和性能的平衡,内存足够时使用MEMORY_ONLY或MEMORY_ONLY_SER,内存不足时使用MEMORY_AND_DISK或MEMORY_AND_DISK_SER,不建议使用DISK_ONLY以及两副本的持久化策略。
-
使用广播替代shuffle操作
某些场景下,使用广播避免shuffle操作,可以大大减少Spark作业的性能开销。
//join会导致shuffle操作,因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
val rdd3 = rdd1.join(rdd2)
//使用Broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
//在rdd1.map算子中,可以获取rdd2的所有数据并进行遍历处理
val rdd3 = rdd1.map(rdd2DataBroadcast.value.foreach(...))
- 使用高性能算子
使用reduceByKey/aggregateByKey替代groupByKey,reduceByKey及aggregateByKey属于map-side预聚合算子,会在map阶段对每个节点本地相同的key进行一次聚合操作,这样reduce阶段task拉取的数据量大大减少,从而也就减少了磁盘IO以及网络传输开销。
使用mapPartitions/foreachPartitions替代map/foreach,前者一次函数调用会处理一个分区所有的数据,而后者一次函数调用只会处理一条数据,所以在分区数据量不是很大的情况下使用前者可以提高数据处理效率,分区数据量比较大的情况则要慎用,防止出现OOM。
使用filter之后进行coalesce操作,通常对一个RDD执行filter算子过滤掉较多数据后,建议使用coalesce算子合并分区以减少分区数量,这样可以减少后续处理数据的task数量,对于性能提升有一定帮助。
使用repartitionAndSortWithinPartitions替代先repartition后sort操作,如果需要在repartition重分区之后还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子,因为该算子可以一边进行重分区的shuffle操作,一边进行排序,相比先shuffle后sort的操作性能要高。 -
广播大变量
在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(100M、1G),大量的变量副本在网络中传输会极大地影响性能。使用广播大变量的方法,会保证每个Executor的内存中只驻留一份变量副本,而Executor中的task执行时共享该变量副本,从而减少网络传输的性能开销,并减少对Executor内存的占用开销。
//每个task都会有一份list副本
val list = ...
rdd.map(list...)
//使用广播,一个executor上执行的task共享一份list副本
val list = ...
val listBroadcast = sc.broadcast(list)
rdd.map(listBroadcast.value...)
-
使用Kryo优化序列化性能
Spark中主要有三个地方涉及到了序列化:在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输;将自定义的类型作为RDD的泛型类型时,所有自定义类型对象,都会进行序列化;使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
为了方便开发者,Spark默认使用Java对象的序列化机制,在性能表现上Kryo序列化机制比Java序列化高10倍左右,因此在涉及到有序列化的场景下我们可以使用Kryo来优化序列化性能。
val conf = new SparkConf()
//设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
二、资源调优
- num-executors
参数说明:设置Spark作业启动的Executor进程数量。
参数建议:根据队列资源调整,建议设置50~100个。 - executor-cores
参数说明:设置每个Executor进程占用的CPU数量。
参数建议:多个core会共享Executor内存,建议设置1~2个。 - driver-memory
参数说明:设置Driver进程占用的内存。
参数调优建议:通常设置1~2G,如果需要使用collect算子将RDD数据拉取到Driver上进行处理,那么必须确保Driver的内存足够大。 - executor-memory
参数说明:设置每个Executor进程占用的内存。
参数建议:根据task处理数据量的大小调整,建议设置4~8G。 - spark.dynamicAllocation.enabled
参数说明:是否开启动态资源配置,根据工作负载来衡量是否应该增加或减少Executor,开启时num-executors参数无效。
参数建议:建议设置为true,避免空闲Executor浪费资源。 - spark.dynamicAllocation.minExecutors
参数说明:开启动态资源配置时分配的最小Executor个数,在作业启动时申请,默认0。
参数建议:建议设置10~50个, - spark.dynamicAllocation.maxExecutors
参数说明:开启动态资源配置时分配的最大Executor个数,默认infinity。
参数建议:根据作业所需资源和队列资源调整,建议设置100~500个。 - spark.default.parallelism
参数说明:设置每个stage的默认task数量,不设置时,Spark会根据底层HDFS的block数量来设置task的数量。
参数建议:建议设置为num-executors * executor-cores的2~3倍。 - spark.storage.memoryFraction
参数说明:设置RDD持久化数据在Executor内存中能占用的比例,默认是0.6。
参数建议:如果有较多的RDD持久化操作,可以适当调高该参数值;如果作业频繁出现GC并且GC耗时较长,表明task执行用户代码的内存不够用,那么建议调低该参数值。 - spark.shuffle.memoryFraction
参数说明:设置shuffle read task进行聚合操作时在Executor内存中能占用的比例,默认是0.2。
参数建议:如果RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比,避免shuffle过程频繁读写磁盘。
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net