计算机系统应用教程网站

网站首页 > 技术文章 正文

Spark Shuffle机制 sparkshuffle原理

btikc 2024-10-12 11:42:39 技术文章 8 ℃ 0 评论

Shuffle是什么?

Spark 延续了MapReduce 的设计思路,但提高计算的性能。spak的设计思想:移动数据不如移动计算。

Map 步骤是在不同机器上独立且同步运行的,它的主要目的是将数据转换为 key-value 的形式;而 Reduce 步骤是做聚合运算,它也是在不同机器上独立且同步运行的。Map 和 Reduce 中间夹杂着一步数据移动,也就是 shuffle,这步操作会涉及数量巨大的网络传输(network I/O),需要耗费大量的时间。 由于 MapReduce 的框架限制,一个 MapReduce 任务只能包含一次 Map 和一次 Reduce,计算完成之后,MapReduce 会将运算结果写回到磁盘中(更准确地说是分布式存储系统)供下次计算使用。如果所做的运算涉及大量循环,比如估计模型参数的梯度下降或随机梯度下降算法就需要多次循环使用训练数据,那么整个计算过程会不断重复地往磁盘里读写中间结果。这样的读写数据会引起大量的网络传输以及磁盘读写,极其耗时,而且它们都是没什么实际价值的废操作。因为上一次循环的结果会立马被下一次使用,完全没必要将其写入磁盘。


Spark 延续了MapReduce 的设计思路:对数据的计算也分为Map 和Reduce 两类。但不同的是,一个Spark 任务并不止包含一个Map 和一个Reduce,而是由一系列的Map、Reduce构成。这样,计算的中间结果可以高效地转给下一个计算步骤,提高算法性能。虽然Spark 的改进看似很小,但实验结果显示,它的算法性能相比MapReduce 提高了10~100 倍。

Shuffle意味数据移动,会带来网络IO开销,通常也是大数据架构最大的瓶颈之一

spark Shuffle算子

1 去重

  • def distinct()
  • def distinct(numPartitions: Int)
  • 2 聚合

  • def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
  • def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
  • def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])]
  • def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])]
  • def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)]
  • def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)]
  • def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
  • def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
  • def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) =>
  • 3 排序

  • def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
  • def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length
  • 4 重分区

  • def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
  • def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)
  • 5 集合与join操作

    • def intersection(other: RDD[T]): RDD[T]
    • def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
    • def intersection(other: RDD[T], numPartitions: Int): RDD[T]
    • def subtract(other: RDD[T], numPartitions: Int): RDD[T]
    • def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
    • def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
    • def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
    • def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
    • def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
    • def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
    • def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
    • def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

    spark shuffle核心要点

    1 shuffle 划分

    Spark Shuffle分为map阶段和reduce阶段,或者称之为ShuffleRead阶段和ShuffleWrite阶段。在划分stage时,最后一个stage称为finalStage,它本质上是一个ResultStage对象,前面的所有stage被称为ShuffleMapStage。


    2 HashShuffle解析


    shuffle read阶段,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。shuffle read的拉取过程是一边拉取一边进行聚合的。

    shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“划分”。所谓“划分”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

    开启优化机制:spark.shuffle. consolidateFiles,该参数默认值为false,将其设置为true即可开启优化机制,通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项

    优化机制:开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了,此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

    当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件,也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

    3 SortShuffle解析

    SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort. bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

    • 普通运行机制

    在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。


    • bypass运行机制

    bypass运行机制的触发条件如下:

    l shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。

    l 不是聚合类的shuffle算子。

    此时,每个task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

    该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。


    Tags:

    本文暂时没有评论,来添加一个吧(●'◡'●)

    欢迎 发表评论:

    最近发表
    标签列表