网站首页 > 技术文章 正文
上次介绍了关于Spark的DAG的划分,以及一个初步的执行流程,我们发现在action算子调用的runJob最终都会调到DAGSchedule里面的runJob,在这里进行stage的划分,以及提交。
接下来,我想分享并记录学习到的关于Spark存储方面的内容。Spark存储是采用了主从模式,即Master与Slave模型,整个存储的消息通信是采用RPC的消息通信方式。其中Master主要负责关于程序运行期间数据块元数据的管理维护。Slave将本地数据信息传递给Master。
Spark的存储级别
Spark虽然是基于内存计算,但是rdd的数据集不仅可以保存在内存中还可以保存到磁盘。在Spark中的StorageLevel中spark根据_useDisk、_useMemory、_useOffHeap、_deserialized、_replication这几个参数的组合,一共提供了12中存储级别策略。
我们可以通过cache方法和persist方法来进行rdd的缓存,persist主要就是实现的这12种策略,cache是persist中的一种特殊情况,是memory_only这种。我们来看看persist的源码。
再来看看cache的源码:
def cache(): this.type = persist()
是其中的一种默认方式。
值得注意的是,cache操作是一种懒操作,类似于转换算子,需要action触发才会执行。所以在我们cache算子后,并不能马上起作用。下次使用到才会到内存中去读。
存储级别 | 占用空间 | cpu使用 | 内存存储 | 磁盘存储 | 描述 |
MEMORY_ONLY | 高 | 低 | 是 | 否 | 默认指定,全部缓存在内存中 |
MEMORY_ONLY_SER | 低 | 高 | 是 | 否 | 将数据序列化后存储在内存中,但是在使用的时候,cpu会将数据进行反序列化操作 |
MEMORY_AND_DISK | 高 | 中 | 部分 | 部分 | 会缓存在内存中,当内存不够时,是存储在磁盘上 |
DISK_ONLY | 低 | 高 | 否 | 是 | 全部存储在磁盘上 |
总结:
内存足够的情况下,我们使用MEMORY_ONLY的方式,直接内存最快,也不用反序列化
内存不足够的情况下,使用MEMORY_ONLY_SER序列化之后存储在内存中,但是就是要使用cpu做反序列化操作。
如果需要失败快速恢复,就是用_2的缓存方式。这样会存储两份,就不用再去计算一次了。
通常不使用DISK的方式,有时候,从磁盘上还不如直接读取数据的效率更快
最后就是释放缓存数据了,就是直接调用unpersist()就行了。
然后我们再介绍一下Spark中的序列化以及压缩
Spark中序列化包括两种,JavaSerialize以及KryoSerialize两种,这两种序列化各有优缺点。他们都是继承于Serialize这个抽象类。
JavaSerialize这个序列化操作灵活,通用性高,基本上都可用。但是性能不是很好,而且暂用空间也比较大。序列结果比较大。
KryoSerialize序列化操作不是很灵活,通用性不强,但是性能高,序列化速度快,序列结果小。压缩效率更高,但是它并不能支持序列化所有的类,而且要求用户注册类。
关于KryoSerialize的使用方式。
首先在SparkConf中设置:
set(“spark.serializer”,"org.apache.spark.serializer.KryoSerializer")
然后再将序列化的类进行注册,在sparkconf中
conf.registerKryoClasses(new Class[]{CategorySortKey.class});进行需要序列化的类进行注册。
Spark中的三种压缩方式:LZ4、LZF、Snappy三种压缩方式,这三种都是基于CompressionCodec类。
同样三种也有不同的优势:
LZ4:提供了压缩速度与压缩比俱佳的性能
LZF:提供了更高的压缩比
Snappy :提供了更高的压缩速度
接下来我们介绍下Spark项目中经常遇到的优化点。
广播
广播就是共享变量。通常情况下,Spark的算子使用外部变量的时候,该外部变量的值会复制到节点执行的每一个task,每个task只能操作自己复制的变量副本。假设我们需要10M外部数据,然后根据我们集群的设置,我们一共有Executor50个,然后task存在200个,在不使用广播的情况下,每个task会有一个备份就是10*200 = 2G,这样就消耗了2G,但是我们采用广播的方式的话,他会在Executor上做一个备份,然后就是task共享这个数据了。就是50 * 10 = 500M,这就减少了几个数量级。所以共享变量还是很有必要的。
如果想让节点Executor上所有task的能够共享该变量,有两种实现方式
第一种 broadcast
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
//获取广播的值
scala> broadcastVar.valueres0: Array[Int] = Array(1, 2, 3)
第二种就是 累加变量Accumulators
第二种主要是做累加操作,让更多的task共同操作变量,不能读取值,只有在Driver端能够读取。当然还可以自定义累加器。 可以参考官网具体实例
猜你喜欢
- 2024-10-12 大佬用10小时就把Spark讲完了,附6大技术文档
- 2024-10-12 浅析图数据库 Nebula Graph 数据导入工具——Spark Writer
- 2024-10-12 Spark Streaming 和 Flink 谁是数据开发者的最爱?
- 2024-10-12 分享几点 Spark Streaming 调优实践经验
- 2024-10-12 大数据学习之计算天下——SPARK的那些事
- 2024-10-12 第二篇|Spark core编程指南 spark编程软件
- 2024-10-12 Spark计算引擎 spark是基于什么计算引擎
- 2024-10-12 Spark Shuffle机制 sparkshuffle原理
- 2024-10-12 一文带你了解SparkStreaming窗口函数
- 2024-10-12 深度预警:Spark运行原理 简述spark的运行架构和原理
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)