计算机系统应用教程网站

网站首页 > 技术文章 正文

Spark存储 spark内存

btikc 2024-10-12 11:42:12 技术文章 8 ℃ 0 评论

上次介绍了关于Spark的DAG的划分,以及一个初步的执行流程,我们发现在action算子调用的runJob最终都会调到DAGSchedule里面的runJob,在这里进行stage的划分,以及提交。

接下来,我想分享并记录学习到的关于Spark存储方面的内容。Spark存储是采用了主从模式,即Master与Slave模型,整个存储的消息通信是采用RPC的消息通信方式。其中Master主要负责关于程序运行期间数据块元数据的管理维护。Slave将本地数据信息传递给Master。

Spark的存储级别

Spark虽然是基于内存计算,但是rdd的数据集不仅可以保存在内存中还可以保存到磁盘。在Spark中的StorageLevel中spark根据_useDisk、_useMemory、_useOffHeap、_deserialized、_replication这几个参数的组合,一共提供了12中存储级别策略。

我们可以通过cache方法和persist方法来进行rdd的缓存,persist主要就是实现的这12种策略,cache是persist中的一种特殊情况,是memory_only这种。我们来看看persist的源码。

再来看看cache的源码:

def cache(): this.type = persist()

是其中的一种默认方式。

值得注意的是,cache操作是一种懒操作,类似于转换算子,需要action触发才会执行。所以在我们cache算子后,并不能马上起作用。下次使用到才会到内存中去读。

存储级别

占用空间

cpu使用

内存存储

磁盘存储

描述

MEMORY_ONLY

默认指定,全部缓存在内存中

MEMORY_ONLY_SER

将数据序列化后存储在内存中,但是在使用的时候,cpu会将数据进行反序列化操作

MEMORY_AND_DISK

部分

部分

会缓存在内存中,当内存不够时,是存储在磁盘上

DISK_ONLY

全部存储在磁盘上

总结:

内存足够的情况下,我们使用MEMORY_ONLY的方式,直接内存最快,也不用反序列化

内存不足够的情况下,使用MEMORY_ONLY_SER序列化之后存储在内存中,但是就是要使用cpu做反序列化操作。

如果需要失败快速恢复,就是用_2的缓存方式。这样会存储两份,就不用再去计算一次了。

通常不使用DISK的方式,有时候,从磁盘上还不如直接读取数据的效率更快

最后就是释放缓存数据了,就是直接调用unpersist()就行了。

然后我们再介绍一下Spark中的序列化以及压缩

Spark中序列化包括两种,JavaSerialize以及KryoSerialize两种,这两种序列化各有优缺点。他们都是继承于Serialize这个抽象类。

JavaSerialize这个序列化操作灵活,通用性高,基本上都可用。但是性能不是很好,而且暂用空间也比较大。序列结果比较大。

KryoSerialize序列化操作不是很灵活,通用性不强,但是性能高,序列化速度快,序列结果小。压缩效率更高,但是它并不能支持序列化所有的类,而且要求用户注册类。

关于KryoSerialize的使用方式。

首先在SparkConf中设置:

set(“spark.serializer”,"org.apache.spark.serializer.KryoSerializer")

然后再将序列化的类进行注册,在sparkconf中

conf.registerKryoClasses(new Class[]{CategorySortKey.class});进行需要序列化的类进行注册。

Spark中的三种压缩方式:LZ4、LZF、Snappy三种压缩方式,这三种都是基于CompressionCodec类。

同样三种也有不同的优势:

LZ4:提供了压缩速度与压缩比俱佳的性能

LZF:提供了更高的压缩比

Snappy :提供了更高的压缩速度


接下来我们介绍下Spark项目中经常遇到的优化点。

广播

广播就是共享变量。通常情况下,Spark的算子使用外部变量的时候,该外部变量的值会复制到节点执行的每一个task,每个task只能操作自己复制的变量副本。假设我们需要10M外部数据,然后根据我们集群的设置,我们一共有Executor50个,然后task存在200个,在不使用广播的情况下,每个task会有一个备份就是10*200 = 2G,这样就消耗了2G,但是我们采用广播的方式的话,他会在Executor上做一个备份,然后就是task共享这个数据了。就是50 * 10 = 500M,这就减少了几个数量级。所以共享变量还是很有必要的。

如果想让节点Executor上所有task的能够共享该变量,有两种实现方式

第一种 broadcast

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

//获取广播的值

scala> broadcastVar.valueres0: Array[Int] = Array(1, 2, 3)

第二种就是 累加变量Accumulators

第二种主要是做累加操作,让更多的task共同操作变量,不能读取值,只有在Driver端能够读取。当然还可以自定义累加器。 可以参考官网具体实例

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表