网站首页 > 技术文章 正文
Shuffle是什么?
Spark 延续了MapReduce 的设计思路,但提高计算的性能。spak的设计思想:移动数据不如移动计算。
Map 步骤是在不同机器上独立且同步运行的,它的主要目的是将数据转换为 key-value 的形式;而 Reduce 步骤是做聚合运算,它也是在不同机器上独立且同步运行的。Map 和 Reduce 中间夹杂着一步数据移动,也就是 shuffle,这步操作会涉及数量巨大的网络传输(network I/O),需要耗费大量的时间。 由于 MapReduce 的框架限制,一个 MapReduce 任务只能包含一次 Map 和一次 Reduce,计算完成之后,MapReduce 会将运算结果写回到磁盘中(更准确地说是分布式存储系统)供下次计算使用。如果所做的运算涉及大量循环,比如估计模型参数的梯度下降或随机梯度下降算法就需要多次循环使用训练数据,那么整个计算过程会不断重复地往磁盘里读写中间结果。这样的读写数据会引起大量的网络传输以及磁盘读写,极其耗时,而且它们都是没什么实际价值的废操作。因为上一次循环的结果会立马被下一次使用,完全没必要将其写入磁盘。
Spark 延续了MapReduce 的设计思路:对数据的计算也分为Map 和Reduce 两类。但不同的是,一个Spark 任务并不止包含一个Map 和一个Reduce,而是由一系列的Map、Reduce构成。这样,计算的中间结果可以高效地转给下一个计算步骤,提高算法性能。虽然Spark 的改进看似很小,但实验结果显示,它的算法性能相比MapReduce 提高了10~100 倍。
Shuffle意味数据移动,会带来网络IO开销,通常也是大数据架构最大的瓶颈之一
spark Shuffle算子
1 去重
2 聚合
3 排序
4 重分区
5 集合与join操作
- def intersection(other: RDD[T]): RDD[T]
- def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
- def intersection(other: RDD[T], numPartitions: Int): RDD[T]
- def subtract(other: RDD[T], numPartitions: Int): RDD[T]
- def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
- def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
- def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
- def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
- def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
- def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
- def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
- def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
spark shuffle核心要点
1 shuffle 划分
Spark Shuffle分为map阶段和reduce阶段,或者称之为ShuffleRead阶段和ShuffleWrite阶段。在划分stage时,最后一个stage称为finalStage,它本质上是一个ResultStage对象,前面的所有stage被称为ShuffleMapStage。
2 HashShuffle解析
shuffle read阶段,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。shuffle read的拉取过程是一边拉取一边进行聚合的。
shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“划分”。所谓“划分”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。
开启优化机制:spark.shuffle. consolidateFiles,该参数默认值为false,将其设置为true即可开启优化机制,通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项
优化机制:开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了,此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。
当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件,也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。
3 SortShuffle解析
SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort. bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。
- 普通运行机制
在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
- bypass运行机制
bypass运行机制的触发条件如下:
l shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
l 不是聚合类的shuffle算子。
此时,每个task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。
猜你喜欢
- 2024-10-12 大佬用10小时就把Spark讲完了,附6大技术文档
- 2024-10-12 浅析图数据库 Nebula Graph 数据导入工具——Spark Writer
- 2024-10-12 Spark Streaming 和 Flink 谁是数据开发者的最爱?
- 2024-10-12 分享几点 Spark Streaming 调优实践经验
- 2024-10-12 大数据学习之计算天下——SPARK的那些事
- 2024-10-12 第二篇|Spark core编程指南 spark编程软件
- 2024-10-12 Spark计算引擎 spark是基于什么计算引擎
- 2024-10-12 一文带你了解SparkStreaming窗口函数
- 2024-10-12 深度预警:Spark运行原理 简述spark的运行架构和原理
- 2024-10-12 Spark大数据系列学习指南 spark在大数据中的应用
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)