网站首页 > 技术文章 正文
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。其在代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
RDD算子在计算的过程中会产生Shuffle,其是Spark Job中一个比较重要的阶段。
RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。宽依赖表示同一个父RDD的Partition被多个子RDD的Partition依赖,会引起Shuffle,窄依赖并不会引起Shuflle操作。
本篇我们就来介绍一下Spark的双Value类型算子和Shuffle之间是否有直接联系。
一、Shuffle和算子类型
Spark Shuffle是发生在宽依赖的情况下,上游Stage和下游Stage之间传递数据的一种机制。Shuffle解决的问题是如何将数据重新组织,使其能够在上游和下游Task之间进行传递和计算。如果是单纯的数据传递,则只需要将数据进行分区、通过网络传输即可,没有太大难度,但Shuffle机制还需要进行各种类型的计算(如聚合、排序),而且数据量一般会很大。下面是Shuffle的两个重要阶段:
Shuffle Write
主要就是在一个Stage结束计算之后,为了下一个Stage可以执行Shuffle类的算子(比如reduceByKey,groupByKey),而将每个Task处理的数据按Key进行“分区”。所谓“分区”,就是对相同的Key执行Hash算法,从而将相同Key都写入同一个磁盘文件中,而每一个磁盘文件都只属于Reduce端的Stage的一个Task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。
那么每个执行Shuffle Write的Task,要为下一个Stage创建多少个磁盘文件呢?很简单,下一个Stage的Task有多少个,当前Stage的每个Task就要创建多少份磁盘文件。比如下一个Stage总共有100个Task,那么当前Stage的每个Task都要创建100份磁盘文件。如果当前Stage有50个Task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。由此可见,未经优化的Shuffle Write操作所产生的磁盘文件的数量是极其惊人的。
Shuffle Read
Shuffle Read,通常就是一个Stage刚开始时要做的事情。此时该Stage的每一个Task就需要将上一个Stage的计算结果中的所有相同Key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行Key的聚合或连接等操作。由于Shuffle Write的过程中,Task给Reduce端的Stage的每个Task都创建了一个磁盘文件,因此Shuffle Read的过程中,每个Task只要从上游Stage的所有Task所在节点上,拉取属于自己的那一个磁盘文件即可。
Shuffle Read的拉取过程是一边拉取一边进行聚合的。每个Shuffle Read Task都会有一个自己的Buffer缓冲,每次都只能拉取与Buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到Buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据拉取完,并得到最终的结果。
算子类型
Spark大致分为这三种算子:
① Value数据类型的Transformation算子(转换算子),这种变换不触发提交作业,针对处理的数据项是Value型的数据。例如Map、GroupBy、Union等算子。
② Key-Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Key-Value型的数据。例如MapValues、ReduceByKey、Join等算子。
③ Action算子(行动算子),这类算子会触发SparkContext提交作业。例如Foreach、Collect、Top等算子。
二、双Value算子
上述算子类型中Value类型算子又可以细分单Value 类型、双 Value 类型,而双Value 类型是指两个RDD之间的操作,例如求交集,并集等,下面我们通过代码测试来了解一下。
此部分的测试代码基本一致,针对隔离部分进行替换。
Union并集
①抽象函数
def union(other: JavaRDD[T]): JavaRDD[T]
②测试代码
SparkConf conf = newSparkConf().setAppName("SparkCoreTest").setMaster("local[*]");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
List<String> a1 = Arrays.asList("Spark", "Bigdata", "Atguigu");
List<String> a2 = Arrays.asList("Java", "Core", "Bigdata");
JavaRDD<String> pDD1 = sparkContext.parallelize(a1).repartition(2);
JavaRDD<String> pDD2 = sparkContext.parallelize(a2).repartition(3);
JavaRDD<String> result = pDD1.union(pDD2);
System.out.println("pDD1 partition num " + pDD1.partitions().size());
System.out.println("pDD2 partition num " + pDD2.partitions().size());
System.out.println("pDD3 partition num " + result.partitions().size());
result.collect().forEach(System.out::println);
Thread.sleep(600000);
sparkContext.stop();
③输出结果:
pDD1 partition num 2
pDD2 partition num 3
result partition num 5
Spark
Bigdata
Atguigu
Core
Bigdata
Java
④分析
1)union 的前后并没有 Shuffle 的发生,
2)union 后 partition 的数量为 Union的 RDD_A 与 RDD_B 数量总和。
Intersection交集
①抽象函数
def intersection(other: JavaRDD[T]): JavaRDD[T]
②测试代码
JavaRDD<String> result = pDD1.intersection(pDD2);
③输出结果:
pDD1 partition num 2
pDD2 partition num 3
result partition num 3
Bigdata
④分析
1)intersection的前后并没有 Shuffle 的发生,
2)intersection后 partition 的数量为RDD_A 与 RDD_B中分区数量最大的一个。
Subtract差集
①抽象函数
def subtract(other: JavaRDD[T]): JavaRDD[T]
②测试代码
JavaRDD<String> result = pDD1.subtract(pDD2);
③输出结果:
pDD1 partition num 2
pDD2 partition num 3
result partition num 2
Atguigu
Spark
④分析
1)subtract的前后并没有 Shuffle 的发生,
2)subtract后 partition 的数量为RDD_A 与 RDD_B中源RDD的分区数量。
zip拉链
①抽象函数
def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U]
②测试代码
JavaRDD<String> pDD1 = sparkContext.parallelize(a1).repartition(3);
JavaRDD<String> pDD2 = sparkContext.parallelize(a2).repartition(3);
JavaPairRDD<String, String> result = pDD1.zip(pDD2);
③输出结果:
pDD1 partition num 3
pDD2 partition num 3
result partition num 3
(Bigdata,Core)
(Atguigu,Bigdata)
(Spark,Java)
④分析
1)zip的前后并没有 Shuffle 的发生。
2)可以zip的两个RDD必须分区数量一致且元素数量一致。
3)zip后 partition 的数量为RDD_A 与 RDD_B其中任一RDD分区数量。
三、总结
本文针对Spark Shuffle进行了介绍,其主要分为Shuflle Write和Shuffle Read两部分,发生在宽依赖的算子之间。另通过代码测试和结果分析,得出Spark的双Value算子执行的前后并没有产生Shuffle操作。
猜你喜欢
- 2024-10-12 大佬用10小时就把Spark讲完了,附6大技术文档
- 2024-10-12 浅析图数据库 Nebula Graph 数据导入工具——Spark Writer
- 2024-10-12 Spark Streaming 和 Flink 谁是数据开发者的最爱?
- 2024-10-12 分享几点 Spark Streaming 调优实践经验
- 2024-10-12 大数据学习之计算天下——SPARK的那些事
- 2024-10-12 第二篇|Spark core编程指南 spark编程软件
- 2024-10-12 Spark计算引擎 spark是基于什么计算引擎
- 2024-10-12 Spark Shuffle机制 sparkshuffle原理
- 2024-10-12 一文带你了解SparkStreaming窗口函数
- 2024-10-12 深度预警:Spark运行原理 简述spark的运行架构和原理
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)