网站首页 > 技术文章 正文
Spark变成主要是函数式,核心是基于数据处理的需求,使用算子与RDD构建数据管道,管道的开始是输入,末尾是输出,管道就是声明的处理逻辑,也是描述了一种映射关系。
RDD算子主要分成两类,一类是转换算子(transform),一类是行为算子(action)。转换算子主要负责改变RDD中的数据,切分RDD中的数据,过滤掉某些数据等,并按照一定顺序组合。
Spark会将转换算子放入一个计算的有向无环图中,并不会立即执行。当Driver请求某些数据时,才会真正提交作业并触发计算。
而行动算子就会触发Driver请求数据,这种机制与函数式编程思想的惰性求值类似。
RDD转换算子大概有20-30个,按照分区与分区的映射关系来分组,有三类:
1、一对一,如map
2、多对一,如union
3、多对多,如groupbykey
Flatmap算子的字面意思是“展平”,flatmap算子的函数f的作用是将T类型的数据元素转换为元素类型为U的集合。
分层抽样是将数据元素按照不同特征分成不同的组,然后从这些组中分别抽样数据元素。Spark内置了实现这一功能的算子 samplebykey、withreplacement参数,表示此次抽样是重置抽样还是不重置抽样:重置抽样是“有放回的抽样”。
Fractions是每个键的抽样比例,以map的形式提供。
Seed为随机数种子,一般设置为当前时间戳。
Cogroup算子是很多算子的基础,如intersection、join等。简单来说,cogroup算子相当于多个数据集一起做groupbykey操作。生成的pair RDD的数据元素类型为(K, (Iterable[V], Iterable[W])),其中第一个迭代器为当前键在RDD_0中的分组结果,第二个迭代器为RDD_1的结果。
Union算子将两个同类型的RDD合并为一个RDD,类似于求并集的操作。
数据结果类的算子,主要用于改变RDD中底层的数据结构,即RDD的分区。在这些算子中,可以直接操作分区而不需要访问分区中的元素,可以更高效地控制集群中的分区和分区的分发。
一般情况下,集群中的单个节点会有多个数据分区,数据分区数一般取决于数据量和集群节点数。如果某个计算任务的数据在本地,我们称其为数据的本地性,计算任务会尽可能的根据本地性优先选择本地数据。
Coalesce可以试图将RDD中分区数变为用户设定的分区数,从而调整作业的并行程度。如果用户设定的分区数小于RDD原有的分区数,则会进行本地合并而不会进行shuffle。
如果用户设定的分区数大于RDD原有分区数,则不会触发操作。
如果需要增加分区数,则需要将shuffle参数设定为true,这样数据就会通过散列分区器将数据进行分发,以达到增加分区的效果。
当用户将分区数设定为1时,如果shuffle参数为false,会对某些节点造成极大的性能负担,用户可以设置shuffle参数为true,汇总分区的上游计算过程并行执行。
Reparation是coalesce默认开启shuffle的简单封装。
大部分转换算子都提供了numPartitions这个可选参数,意味着在作业流程的每一步,都可以细粒度地控制作业的并行度,从而提高执行时的性能。但是需要注意的是,提交作业后executor的数量是一定的。
行动算子从功能上来说作为一个触发器,会触发提交整个作业并开始执行。从代码上来说,它与转换算子的最大不同之处在于,转换算子返回的还是RDD,行动算子返回的是非RDD类型的,可能是整数,可能没有返回值。
行动算子可以分为Driver和分布式两类:
Driver:该类型算子返回值通常为Driver内部的内存变量,如collect、count、countByKey等,这种算子会在远端executor执行计算完成后将结果数据返回Driver。
该种算子的缺点是,如果返回的数据太大,很容易会超过Driver的内存限制。因此使用这种算子作为作业结束需要谨慎,主要还是用于调试与开发场景。
分布式:与前一类算子将结果传回Driver不同,这类算子会在集群中的节点上“就地”分布式执行,如saveAsTextFile,这是一种最常见的分布式行动算子。
Foreach算子迭代RDD中的每个元素,并可以自定义输出操作。通过用户传入的函数,可以实现打印、插入到外部存储、修改累加器等迭代所带来的副作用。
在计算过程中,用户可能经常用到同一份数据,此时就可以用到Spark缓存技术,也就是利用缓存算子将RDD进行缓存,从而加速Spark作业的执行速度。
Spark缓存算子也属于行动算子,也就是说会触发整个作业进行计算,想要缓存数据可以使用cache或者persist算子,它们是行动算子中仅有的两个返回值为RDD的算子。Spark缓存技术是加速Spark作业执行的关键技术之一,尤其是在迭代计算的场景。
缓存需要尽可能地将数据放入内存。如果没有足够的内存,那么驻留在内存的当前数据就可能被移除;如果数据量本身已经超过可用内存容量,这时由于磁盘会代替内存存储数据,性能会下降。
如果内存足够大,使用memory_only无疑是性能最好的选择。想要节省空间的话,可以使用memory_only_ser,可以序列化对象使其所占空间减少一点。
DISK是在重算的代价特别昂贵时不得已的选择。
Memory_only_2与memory_and_disk_2拥有最佳的可用性,但会消耗额外的存储空间。
猜你喜欢
- 2024-10-12 大佬用10小时就把Spark讲完了,附6大技术文档
- 2024-10-12 浅析图数据库 Nebula Graph 数据导入工具——Spark Writer
- 2024-10-12 Spark Streaming 和 Flink 谁是数据开发者的最爱?
- 2024-10-12 分享几点 Spark Streaming 调优实践经验
- 2024-10-12 大数据学习之计算天下——SPARK的那些事
- 2024-10-12 第二篇|Spark core编程指南 spark编程软件
- 2024-10-12 Spark计算引擎 spark是基于什么计算引擎
- 2024-10-12 Spark Shuffle机制 sparkshuffle原理
- 2024-10-12 一文带你了解SparkStreaming窗口函数
- 2024-10-12 深度预警:Spark运行原理 简述spark的运行架构和原理
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)