计算机系统应用教程网站

网站首页 > 技术文章 正文

Spark常用算子(二) sparkstreaming常用算子

btikc 2024-10-12 11:40:56 技术文章 14 ℃ 0 评论

Hello大家好,继续上篇的文章为大家分享Spark的常用算子~

flatMap算子

#输入的item能够被map能够被map到0或者多个items输出,返回值是一个sequence
>>> data = ['hello spark','hello world','hello world']
>>> rdd = sc.parallelize(data)
>>> rdd.flatMap(lambda line :line.split(' ')).collect()
2019-05-07 22:34:53 WARN SizeEstimator:66 - Failed to check whether UseCompressedOops is set; assuming yes
['hello', 'spark', 'hello', 'world', 'hello', 'world'] 
>>> rdd.map(lambda line:line.split(' ')).collect()
[['hello', 'spark'], ['hello', 'world'], ['hello', 'world']]
>>> 

groupby() 分组

groupByKey()把相同的key分到一起

reduceByKey:把相同的key的数据分发到一起并进行相应的计算

案例

>>> data = ['hello spark','hello world','hello world']
 >>> rdd = sc.parallelize(data)
 >>> rdd.flatMap(lambda line :line.split(' ')).map(lambda x:(x,1))
 PythonRDD[14] at RDD at PythonRDD.scala:52
 >>> rdd.flatMap(lambda line :line.split(' ')).map(lambda x:(x,1)).collect()
 [('hello', 1), ('spark', 1), ('hello', 1), ('world', 1), ('hello', 1), ('world', 1)]
 >>> maprdd = rdd.flatMap(lambda line :line.split(' ')).map(lambda x:(x,1))
 
 >>> grouprdd = maprdd.groupByKey()
>>> grouprdd.collect()
[('hello', <pyspark.resultiterable.ResultIterable object at 0x7fb81c3aba20>), ('spark', <pyspark.resultiterable.ResultIterable object at 0x7fb81c3ab320>), ('world', <pyspark.resultiterable.ResultIterable object at 0x7fb81c3aba90>)]
>>> grouprdd.map(lambda x:{x[0]:list(x[1])}).collect()
[{'hello': [1, 1, 1]}, {'spark': [1]}, {'world': [1, 1]}]
>>> grouprdd.map(lambda x:{x[0]:len(list(x[1]))}).collect()
[{'hello': 3}, {'spark': 1}, {'world': 2}]
>>> 
>>> data
['hello spark', 'hello world', 'hello world']
>>> rdd = sc.parallelize(data)
>>> maprdd = rdd.flatMap(lambda line:line.split(' ')).map(lambda x:(x,1))
>>> reduceByKeyRDD = maprdd.reduceByKey(lambda a,b:a+b).collect()
>>> reduceByKeyRDD
[('hello', 3), ('spark', 1), ('world', 2)]
>>> 


多个RDD Key-Value“转换”运算

>>> kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
>>> kvRDD2 = sc.parallelize([(3,8)])
>>> kvRDD1.collect()
[(3, 4), (3, 6), (5, 6), (1, 2)]
>>> kvRDD2.collect()
[(3, 8)]
>>> 


Key-Value RDD join 运算

>>> kvRDD1.join(kvRDD2).collect()
[(3, (4, 8)), (3, (6, 8))] 
>>> 
-- 说明
-- kvRDD1与kvRDD2唯一相同的key值是3
-- kvRDD1是(3,4),(3,6)而kvRDD2是(3,8),所以join的结果如上,过程如下图




同理,leftouterjoin&rightouterjoin&subtractbykey如此

>>> kvRDD1.leftOuterJoin(kvRDD2).collect()
[(1, (2, None)), (3, (4, 8)), (3, (6, 8)), (5, (6, None))]
>>> 
>>> kvRDD1.rightOuterJoin(kvRDD2).collect()
[(3, (4, 8)), (3, (6, 8))]
>>> 
>>> kvRDD1.subtractByKey(kvRDD2).collect()
[(1, 2), (5, 6)]
>>> 


Key-Value“动作”运算

--获取第一项数据
>>> kvRDD1.first()
(3, 4)
--获取前俩项数据
>>> kvRDD1.take(2)
[(3, 4), (3, 6)]
--获取第一项数据
>>> kvFirst = kvRDD1.first()
--获取第一项数据的第一个元素,也就是key值
>>> kvFirst[0]
3
--获取第一项数据的第二个元素,也就是value值
>>> kvFirst[1]
4
>>> 


计算RDD中每一个Key值的项数

-- kvRDD1数据(3, 4), (3, 6), (5, 6), (1, 2),key值为3的有俩项数据,其余key值为(1,5),所以结果为1->1,3->2,5->1
>>> kvRDD1.collect()
[(3, 4), (3, 6), (5, 6), (1, 2)]
>>> kvRDD1.countByKey()
defaultdict(<class 'int'>, {3: 2, 5: 1, 1: 1})
>>> 

collectAsMap创建Key-Value的字典

>>> kv=kvRDD1.collectAsMap()
>>> kv
{3: 6, 5: 6, 1: 2}
>>> kvRDD1.collect()
[(3, 4), (3, 6), (5, 6), (1, 2)]
>>> type(kv)
<class 'dict'>
>>> kv[3]
6
>>> kv[1]
2
>>> 

Key-Value lookup运算

--可以使用lookup输入key值来查找value值
>>> kvRDD1.lookup(3)
[4, 6] -- 找到俩项为((3,4),(3,6))
>>> kvRDD1.lookup(5)
[6]
>>> kvRDD1.lookup(0)
[]
>>> 


ok,今天的分享就先到此结束,Spark最核心的常用的transform与active我们也基本过的差不多啦,这里面大家要是感兴趣建议百度看看国内论坛关于spark 算子的讲解,当然最权威的还是spark官网~建议大家多读多练方能掌握~

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表