网站首页 > 技术文章 正文
Hello大家好,今天为大家整理了一份关于Spark RDD算子使用的文章,并利用python进行过程的实现~
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎,而Spark的核心是RDD(Resilient Distributed Dataset),即弹性分布式数据集,是由AMPLab实验室提出的概念,属于一种分布式的内存系统数据集应用,Spark的主要优势来自RDD本身的特性,RDD能与其他系统兼容,可以导入外部存储系统的数据集,例如HDFS,HBase或者其他Hadoop数据源,接下来我们就具体看看他的使用~~
RDD的3种基本运算
- “转换运算”(Transformation)
- RDD执行“转换”运算耳朵结果,会产生另外一个RDD
- RDD具有lazy特性,所以"转换"运算并不会立即实际执行,等到执行"动作"运行才会实际执行
- "动作"运算(Action)
- RDD执行"动作"运算后不会产生另外一个RDD,而是会产生数值,数组或写入文件系统
- RDD执行“动作”运算时会立刻实际执行,并且连同之前的转换运算一起执行
- "持久化"(Persistence)
- 对于那些会重复使用的RDD,可以将RDD“持久化”在内存中作为后续使用,以提高执行性能
RDD转换示意图
map运算
map运算可以通过传入的函数将每一个元素经过函数运算产生另外一个RDD >>> data = [1,2,3,4,5] >>> rdd1 = sc.parallelize(data) >>> rdd2 = rdd1.map(lambda x:x+2) >>> rdd2.collect() 输出结果:[3, 4, 5, 6, 7] #定义加一函数 def addOne(x): return (x+1) >>> rdd3 = rdd1.map(addOne) >>> rdd3.collect() 输出结果:[2, 3, 4, 5, 6] #同理字符串依旧可以计算 >>> rdd1 = sc.parallelize(['小花','小白','大力']) >>> rdd2 = rdd1.map(lambda x:"name:"+x) >>> rdd2.collect() 输出结果:['name:小花', 'name:小白', 'name:大力']
filter运算
#filter可以用于对RDD内每一个元素进行筛选,并产生另外的RDD #定义俩个常用RDD >>> intRDD = sc.parallelize([2,3,4,5,6,7,8,9]) >>> stringRDD = sc.parallelize(['小花','小白','大力','老王']) >>> >>> intRDD.filter(lambda x:x==3).collect() [3] >>> intRDD.filter(lambda x:x>1 and x<5).collect() [2, 3, 4] >>> intRDD.filter(lambda x:x>=5 or x<3).collect() [2, 5, 6, 7, 8, 9] >>> >>> stringRDD.filter(lambda x:'小' in x).collect() ['小花', '小白'] >>>
distinct运算
#distinct运算会删除重复的元素 >>> intRDD.distinct().collect() [4, 8, 5, 9, 2, 6, 3, 7] >>> stringRDD.distinct().collect() ['大力', '小花', '小白', '老王'] >>>
randomSplit运算
#randomSplit可以将整个集合元素以随机数的方式按照比例分为多个RDD #如下示例,按照4:6拆分 >>> a = intRDD.randomSplit([0.4,0.6]) >>> a [PythonRDD[18] at RDD at PythonRDD.scala:48, PythonRDD[19] at RDD at PythonRDD.scala:48] >>> a[0].collect() [2, 4, 8] >>> a[1].collect() [3, 5, 6, 7, 9] >>>
groupBy运算
#groupBy运算可以按照传入的匿名函数规则将数据分为多个list #利用groupBy运算将整个集合分为基数与偶数 >>> aRDD = intRDD.groupBy(lambda x:"even" if (x%2==0) else "odd") >>> aRDD.collect() [('even', <pyspark.resultiterable.ResultIterable object at 0x7f3bef8bdc50>), ('odd', <pyspark.resultiterable.ResultIterable object at 0x7f3bef8bdbe0>)] >>> aRDD PythonRDD[29] at collect at <stdin>:1 >>> aRDD.collect()[0] ('even', <pyspark.resultiterable.ResultIterable object at 0x7f3bef8bdc50>) >>> aRDD.collect()[1] ('odd', <pyspark.resultiterable.ResultIterable object at 0x7f3bef8bdba8>) >>> aRDD.collect()[0][0] 'even' >>> aRDD.collect()[0][1] <pyspark.resultiterable.ResultIterable object at 0x7f3bef8bdc18> >>> aRDD.collect()[1][0] 'odd' >>> sorted(aRDD.collect()[0][1]) [2, 4, 6, 8] >>> sorted(aRDD.collect()[1][1]) [3, 5, 7, 9] >>>
多RDD操作
>>> intRDD1 = sc.parallelize([3,1,2,5,5]) >>> intRDD2 = sc.parallelize([5,6]) >>> intRDD3 = sc.parallelize([2,7])
union并集操作
#使用union函数进行并集运算 >>> intRDD1.union(intRDD2).union(intRDD3).collect() [3, 1, 2, 5, 5, 5, 6, 2, 7] >>>
intersection交集运算
>>> intRDD1.intersection(intRDD2).collect() [5] >>>
subtract差集运算
>>> intRDD1.subtract(intRDD2).collect() [1, 2, 3] >>>
cartesian笛卡尔乘积运算
>>> intRDD1.cartesian(intRDD2).collect() [(3, 5), (3, 6), (1, 5), (1, 6), (2, 5), (2, 6), (5, 5), (5, 5), (5, 6), (5, 6)] >>>
基本动作运算
#取出第一项数据 >>> intRDD.first() 2 #取出第二项数据 >>> intRDD.take(2) [2, 3] #从小到大排序,取出前3项 >>> intRDD.takeOrdered(3) [2, 3, 4] #从大到小排序取出前3项 >>> intRDD.takeOrdered(3,key=lambda x:-x) [9, 8, 7]
统计功能
统计 >>> intRDD.stats() (count: 8, mean: 5.5, stdev: 2.29128784748, max: 9.0, min: 2.0) 最小 >>> intRDD.min() 2 最大 >>> intRDD.max() 9 标准差 >>> intRDD.stdev() 2.2912878474779199 计数 >>> intRDD.count() 8 总和 >>> intRDD.sum() 44 平均 >>> intRDD.mean() 5.5 >>>
RDD Key-Value基本转换运算
Spark RDD支持键值(Key-Value)运算,Key-Value运算也是Map/Reduce的基础,接下来我们瞅瞅怎么玩~
创建范例Key-Value RDD 第一个字段是key,第二个字段是value >>> kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)]) >>> kvRDD1.collect() [(3, 4), (3, 6), (5, 6), (1, 2)] >>> 列出全部key值(第一个字段) >>> kvRDD1.keys().collect() [3, 3, 5, 1] >>> 列出values值(第二个字段) >>> kvRDD1.values().collect() [4, 6, 6, 2] >>>
利用filter筛选RDD内的元素
利用filter针对key筛选RDD内的元素 >>> kvRDD1.filter(lambda x:x[0]<5).collect() [(3, 4), (3, 6), (1, 2)] >>> 利用filter针对value筛选RDD内的元素 >>> kvRDD1.filter(lambda x:x[1]<5).collect() [(3, 4), (1, 2)] >>>
mapValues运算
#mapValues运算可以针对RDD内每一组(Key,Value)进行运算,并且产生另外一个RDD 将value的每一个值进行平方操作 >>> kvRDD1.mapValues(lambda x:x*x).collect() [(3, 16), (3, 36), (5, 36), (1, 4)]
sortByKey排序(从小到大按照key排序)
>>> kvRDD1.sortByKey(ascending=True).collect() [(1, 2), (3, 4), (3, 6), (5, 6)] >>> kvRDD1.sortByKey().collect() [(1, 2), (3, 4), (3, 6), (5, 6)] >>>
sortByKey按照key从大到小排序
>>> kvRDD1.sortByKey(ascending=False).collect()
[(5, 6), (3, 4), (3, 6), (1, 2)]
>>>
reduceByKey按照Key值进行reduce运算
也就是相同key的值进行一定规则的合并
[(3, 4), (3, 6), (5, 6), (1, 2)] >>> kvRDD1.reduceByKey(lambda x,y:x+y).collect() [(5, 6), (1, 2), (3, 10)] >>>
ok,今天就先写这么多吧,剩余的以及某些算子的区别我们在下篇文章中下表述~~~
猜你喜欢
- 2024-10-12 大佬用10小时就把Spark讲完了,附6大技术文档
- 2024-10-12 浅析图数据库 Nebula Graph 数据导入工具——Spark Writer
- 2024-10-12 Spark Streaming 和 Flink 谁是数据开发者的最爱?
- 2024-10-12 分享几点 Spark Streaming 调优实践经验
- 2024-10-12 大数据学习之计算天下——SPARK的那些事
- 2024-10-12 第二篇|Spark core编程指南 spark编程软件
- 2024-10-12 Spark计算引擎 spark是基于什么计算引擎
- 2024-10-12 Spark Shuffle机制 sparkshuffle原理
- 2024-10-12 一文带你了解SparkStreaming窗口函数
- 2024-10-12 深度预警:Spark运行原理 简述spark的运行架构和原理
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)