计算机系统应用教程网站

网站首页 > 技术文章 正文

Spark双Value算子是否产生Shuffle

btikc 2024-10-12 11:41:57 技术文章 6 ℃ 0 评论

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。其在代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

RDD算子在计算的过程中会产生Shuffle,其是Spark Job中一个比较重要的阶段。

RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。宽依赖表示同一个父RDD的Partition被多个子RDD的Partition依赖,会引起Shuffle,窄依赖并不会引起Shuflle操作。

本篇我们就来介绍一下Spark的双Value类型算子和Shuffle之间是否有直接联系。

一、Shuffle和算子类型

Spark Shuffle是发生在宽依赖的情况下,上游Stage和下游Stage之间传递数据的一种机制。Shuffle解决的问题是如何将数据重新组织,使其能够在上游和下游Task之间进行传递和计算。如果是单纯的数据传递,则只需要将数据进行分区、通过网络传输即可,没有太大难度,但Shuffle机制还需要进行各种类型的计算(如聚合、排序),而且数据量一般会很大。下面是Shuffle的两个重要阶段:

Shuffle Write

主要就是在一个Stage结束计算之后,为了下一个Stage可以执行Shuffle类的算子(比如reduceByKey,groupByKey),而将每个Task处理的数据按Key进行“分区”。所谓“分区”,就是对相同的Key执行Hash算法,从而将相同Key都写入同一个磁盘文件中,而每一个磁盘文件都只属于Reduce端的Stage的一个Task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

那么每个执行Shuffle Write的Task,要为下一个Stage创建多少个磁盘文件呢?很简单,下一个Stage的Task有多少个,当前Stage的每个Task就要创建多少份磁盘文件。比如下一个Stage总共有100个Task,那么当前Stage的每个Task都要创建100份磁盘文件。如果当前Stage有50个Task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。由此可见,未经优化的Shuffle Write操作所产生的磁盘文件的数量是极其惊人的。

Shuffle Read

Shuffle Read,通常就是一个Stage刚开始时要做的事情。此时该Stage的每一个Task就需要将上一个Stage的计算结果中的所有相同Key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行Key的聚合或连接等操作。由于Shuffle Write的过程中,Task给Reduce端的Stage的每个Task都创建了一个磁盘文件,因此Shuffle Read的过程中,每个Task只要从上游Stage的所有Task所在节点上,拉取属于自己的那一个磁盘文件即可。

Shuffle Read的拉取过程是一边拉取一边进行聚合的。每个Shuffle Read Task都会有一个自己的Buffer缓冲,每次都只能拉取与Buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到Buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据拉取完,并得到最终的结果。

算子类型

Spark大致分为这三种算子:

① Value数据类型的Transformation算子(转换算子),这种变换不触发提交作业,针对处理的数据项是Value型的数据。例如Map、GroupBy、Union等算子。

② Key-Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Key-Value型的数据。例如MapValues、ReduceByKey、Join等算子。

③ Action算子(行动算子),这类算子会触发SparkContext提交作业。例如Foreach、Collect、Top等算子。

二、双Value算子

上述算子类型中Value类型算子又可以细分单Value 类型、双 Value 类型,而双Value 类型是指两个RDD之间的操作,例如求交集,并集等,下面我们通过代码测试来了解一下。

此部分的测试代码基本一致,针对隔离部分进行替换。

Union并集

①抽象函数

def union(other: JavaRDD[T]): JavaRDD[T]

②测试代码

SparkConf conf = newSparkConf().setAppName("SparkCoreTest").setMaster("local[*]");

JavaSparkContext sparkContext = new JavaSparkContext(conf);

List<String> a1 = Arrays.asList("Spark", "Bigdata", "Atguigu");

List<String> a2 = Arrays.asList("Java", "Core", "Bigdata");

JavaRDD<String> pDD1 = sparkContext.parallelize(a1).repartition(2);

JavaRDD<String> pDD2 = sparkContext.parallelize(a2).repartition(3);

JavaRDD<String> result = pDD1.union(pDD2);

System.out.println("pDD1 partition num " + pDD1.partitions().size());

System.out.println("pDD2 partition num " + pDD2.partitions().size());

System.out.println("pDD3 partition num " + result.partitions().size());

result.collect().forEach(System.out::println);

Thread.sleep(600000);

sparkContext.stop();

③输出结果:

pDD1 partition num 2

pDD2 partition num 3

result partition num 5

Spark

Bigdata

Atguigu

Core

Bigdata

Java

④分析

1)union 的前后并没有 Shuffle 的发生,

2)union 后 partition 的数量为 Union的 RDD_A 与 RDD_B 数量总和。

Intersection交集

①抽象函数

def intersection(other: JavaRDD[T]): JavaRDD[T]

②测试代码

JavaRDD<String> result = pDD1.intersection(pDD2);

③输出结果:

pDD1 partition num 2

pDD2 partition num 3

result partition num 3

Bigdata

④分析

1)intersection的前后并没有 Shuffle 的发生,

2)intersection后 partition 的数量为RDD_A 与 RDD_B中分区数量最大的一个。

Subtract差集

①抽象函数

def subtract(other: JavaRDD[T]): JavaRDD[T]

②测试代码

JavaRDD<String> result = pDD1.subtract(pDD2);

③输出结果:

pDD1 partition num 2

pDD2 partition num 3

result partition num 2

Atguigu

Spark

④分析

1)subtract的前后并没有 Shuffle 的发生,

2)subtract后 partition 的数量为RDD_A 与 RDD_B中源RDD的分区数量。

zip拉链

①抽象函数

def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U]

②测试代码

JavaRDD<String> pDD1 = sparkContext.parallelize(a1).repartition(3);

JavaRDD<String> pDD2 = sparkContext.parallelize(a2).repartition(3);

JavaPairRDD<String, String> result = pDD1.zip(pDD2);

③输出结果:

pDD1 partition num 3

pDD2 partition num 3

result partition num 3

(Bigdata,Core)

(Atguigu,Bigdata)

(Spark,Java)

④分析

1)zip的前后并没有 Shuffle 的发生。

2)可以zip的两个RDD必须分区数量一致且元素数量一致。

3)zip后 partition 的数量为RDD_A 与 RDD_B其中任一RDD分区数量。

三、总结

本文针对Spark Shuffle进行了介绍,其主要分为Shuflle Write和Shuffle Read两部分,发生在宽依赖的算子之间。另通过代码测试和结果分析,得出Spark的双Value算子执行的前后并没有产生Shuffle操作。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表