网站首页 > 技术文章 正文
Hello大家好,继续上篇的文章为大家分享Spark的常用算子~
flatMap算子
#输入的item能够被map能够被map到0或者多个items输出,返回值是一个sequence >>> data = ['hello spark','hello world','hello world'] >>> rdd = sc.parallelize(data) >>> rdd.flatMap(lambda line :line.split(' ')).collect() 2019-05-07 22:34:53 WARN SizeEstimator:66 - Failed to check whether UseCompressedOops is set; assuming yes ['hello', 'spark', 'hello', 'world', 'hello', 'world'] >>> rdd.map(lambda line:line.split(' ')).collect() [['hello', 'spark'], ['hello', 'world'], ['hello', 'world']] >>>
groupby() 分组
groupByKey()把相同的key分到一起
reduceByKey:把相同的key的数据分发到一起并进行相应的计算
案例
>>> data = ['hello spark','hello world','hello world'] >>> rdd = sc.parallelize(data) >>> rdd.flatMap(lambda line :line.split(' ')).map(lambda x:(x,1)) PythonRDD[14] at RDD at PythonRDD.scala:52 >>> rdd.flatMap(lambda line :line.split(' ')).map(lambda x:(x,1)).collect() [('hello', 1), ('spark', 1), ('hello', 1), ('world', 1), ('hello', 1), ('world', 1)] >>> maprdd = rdd.flatMap(lambda line :line.split(' ')).map(lambda x:(x,1)) >>> grouprdd = maprdd.groupByKey() >>> grouprdd.collect() [('hello', <pyspark.resultiterable.ResultIterable object at 0x7fb81c3aba20>), ('spark', <pyspark.resultiterable.ResultIterable object at 0x7fb81c3ab320>), ('world', <pyspark.resultiterable.ResultIterable object at 0x7fb81c3aba90>)] >>> grouprdd.map(lambda x:{x[0]:list(x[1])}).collect() [{'hello': [1, 1, 1]}, {'spark': [1]}, {'world': [1, 1]}] >>> grouprdd.map(lambda x:{x[0]:len(list(x[1]))}).collect() [{'hello': 3}, {'spark': 1}, {'world': 2}] >>> >>> data ['hello spark', 'hello world', 'hello world'] >>> rdd = sc.parallelize(data) >>> maprdd = rdd.flatMap(lambda line:line.split(' ')).map(lambda x:(x,1)) >>> reduceByKeyRDD = maprdd.reduceByKey(lambda a,b:a+b).collect() >>> reduceByKeyRDD [('hello', 3), ('spark', 1), ('world', 2)] >>>
多个RDD Key-Value“转换”运算
>>> kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)]) >>> kvRDD2 = sc.parallelize([(3,8)]) >>> kvRDD1.collect() [(3, 4), (3, 6), (5, 6), (1, 2)] >>> kvRDD2.collect() [(3, 8)] >>>
Key-Value RDD join 运算
>>> kvRDD1.join(kvRDD2).collect() [(3, (4, 8)), (3, (6, 8))] >>> -- 说明 -- kvRDD1与kvRDD2唯一相同的key值是3 -- kvRDD1是(3,4),(3,6)而kvRDD2是(3,8),所以join的结果如上,过程如下图
同理,leftouterjoin&rightouterjoin&subtractbykey如此
>>> kvRDD1.leftOuterJoin(kvRDD2).collect() [(1, (2, None)), (3, (4, 8)), (3, (6, 8)), (5, (6, None))] >>> >>> kvRDD1.rightOuterJoin(kvRDD2).collect() [(3, (4, 8)), (3, (6, 8))] >>> >>> kvRDD1.subtractByKey(kvRDD2).collect() [(1, 2), (5, 6)] >>>
Key-Value“动作”运算
--获取第一项数据 >>> kvRDD1.first() (3, 4) --获取前俩项数据 >>> kvRDD1.take(2) [(3, 4), (3, 6)] --获取第一项数据 >>> kvFirst = kvRDD1.first() --获取第一项数据的第一个元素,也就是key值 >>> kvFirst[0] 3 --获取第一项数据的第二个元素,也就是value值 >>> kvFirst[1] 4 >>>
计算RDD中每一个Key值的项数
-- kvRDD1数据(3, 4), (3, 6), (5, 6), (1, 2),key值为3的有俩项数据,其余key值为(1,5),所以结果为1->1,3->2,5->1 >>> kvRDD1.collect() [(3, 4), (3, 6), (5, 6), (1, 2)] >>> kvRDD1.countByKey() defaultdict(<class 'int'>, {3: 2, 5: 1, 1: 1}) >>>
collectAsMap创建Key-Value的字典
>>> kv=kvRDD1.collectAsMap() >>> kv {3: 6, 5: 6, 1: 2} >>> kvRDD1.collect() [(3, 4), (3, 6), (5, 6), (1, 2)] >>> type(kv) <class 'dict'> >>> kv[3] 6 >>> kv[1] 2 >>>
Key-Value lookup运算
--可以使用lookup输入key值来查找value值 >>> kvRDD1.lookup(3) [4, 6] -- 找到俩项为((3,4),(3,6)) >>> kvRDD1.lookup(5) [6] >>> kvRDD1.lookup(0) [] >>>
ok,今天的分享就先到此结束,Spark最核心的常用的transform与active我们也基本过的差不多啦,这里面大家要是感兴趣建议百度看看国内论坛关于spark 算子的讲解,当然最权威的还是spark官网~建议大家多读多练方能掌握~
猜你喜欢
- 2024-10-12 大佬用10小时就把Spark讲完了,附6大技术文档
- 2024-10-12 浅析图数据库 Nebula Graph 数据导入工具——Spark Writer
- 2024-10-12 Spark Streaming 和 Flink 谁是数据开发者的最爱?
- 2024-10-12 分享几点 Spark Streaming 调优实践经验
- 2024-10-12 大数据学习之计算天下——SPARK的那些事
- 2024-10-12 第二篇|Spark core编程指南 spark编程软件
- 2024-10-12 Spark计算引擎 spark是基于什么计算引擎
- 2024-10-12 Spark Shuffle机制 sparkshuffle原理
- 2024-10-12 一文带你了解SparkStreaming窗口函数
- 2024-10-12 深度预警:Spark运行原理 简述spark的运行架构和原理
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)