计算机系统应用教程网站

网站首页 > 技术文章 正文

Spark常用算子(一) sparkstreaming常用算子

btikc 2024-10-12 11:40:54 技术文章 8 ℃ 0 评论

Hello大家好,今天为大家整理了一份关于Spark RDD算子使用的文章,并利用python进行过程的实现~

Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎,而Spark的核心是RDD(Resilient Distributed Dataset),即弹性分布式数据集,是由AMPLab实验室提出的概念,属于一种分布式的内存系统数据集应用,Spark的主要优势来自RDD本身的特性,RDD能与其他系统兼容,可以导入外部存储系统的数据集,例如HDFS,HBase或者其他Hadoop数据源,接下来我们就具体看看他的使用~~

RDD的3种基本运算

  • 转换运算”(Transformation)
  • RDD执行“转换”运算耳朵结果,会产生另外一个RDD
  • RDD具有lazy特性,所以"转换"运算并不会立即实际执行,等到执行"动作"运行才会实际执行
  • "动作"运算(Action)
  • RDD执行"动作"运算后不会产生另外一个RDD,而是会产生数值,数组或写入文件系统
  • RDD执行“动作”运算时会立刻实际执行,并且连同之前的转换运算一起执行
  • "持久化"(Persistence)
  • 对于那些会重复使用的RDD,可以将RDD“持久化”在内存中作为后续使用,以提高执行性能


RDD转换示意图




map运算


map运算可以通过传入的函数将每一个元素经过函数运算产生另外一个RDD
>>> data = [1,2,3,4,5]
>>> rdd1 = sc.parallelize(data)
>>> rdd2 = rdd1.map(lambda x:x+2)
>>> rdd2.collect()
输出结果:[3, 4, 5, 6, 7] 
#定义加一函数
def addOne(x):
 return (x+1)
>>> rdd3 = rdd1.map(addOne)
>>> rdd3.collect()
输出结果:[2, 3, 4, 5, 6]
#同理字符串依旧可以计算
>>> rdd1 = sc.parallelize(['小花','小白','大力'])
>>> rdd2 = rdd1.map(lambda x:"name:"+x)
>>> rdd2.collect()
输出结果:['name:小花', 'name:小白', 'name:大力']

filter运算


#filter可以用于对RDD内每一个元素进行筛选,并产生另外的RDD
#定义俩个常用RDD
>>> intRDD = sc.parallelize([2,3,4,5,6,7,8,9])
>>> stringRDD = sc.parallelize(['小花','小白','大力','老王'])
>>>
>>> intRDD.filter(lambda x:x==3).collect()
[3]
>>> intRDD.filter(lambda x:x>1 and x<5).collect()
[2, 3, 4]
>>> intRDD.filter(lambda x:x>=5 or x<3).collect()
[2, 5, 6, 7, 8, 9]
>>> 
>>> stringRDD.filter(lambda x:'小' in x).collect()
['小花', '小白']
>>> 


distinct运算


#distinct运算会删除重复的元素
>>> intRDD.distinct().collect()
[4, 8, 5, 9, 2, 6, 3, 7]
>>> stringRDD.distinct().collect()
['大力', '小花', '小白', '老王']
>>> 

randomSplit运算


#randomSplit可以将整个集合元素以随机数的方式按照比例分为多个RDD
#如下示例,按照4:6拆分
>>> a = intRDD.randomSplit([0.4,0.6])
>>> a
[PythonRDD[18] at RDD at PythonRDD.scala:48, 
 PythonRDD[19] at RDD at PythonRDD.scala:48]
>>> a[0].collect()
[2, 4, 8]
>>> a[1].collect()
[3, 5, 6, 7, 9]
>>> 

groupBy运算


#groupBy运算可以按照传入的匿名函数规则将数据分为多个list
#利用groupBy运算将整个集合分为基数与偶数
>>> aRDD = intRDD.groupBy(lambda x:"even" if (x%2==0) else "odd")
>>> aRDD.collect()
[('even', <pyspark.resultiterable.ResultIterable object at 0x7f3bef8bdc50>), 
('odd', <pyspark.resultiterable.ResultIterable object at 0x7f3bef8bdbe0>)]
>>> aRDD
PythonRDD[29] at collect at <stdin>:1
>>> aRDD.collect()[0]
('even', <pyspark.resultiterable.ResultIterable object at 0x7f3bef8bdc50>)
>>> aRDD.collect()[1]
('odd', <pyspark.resultiterable.ResultIterable object at 0x7f3bef8bdba8>)
>>> aRDD.collect()[0][0]
'even'
>>> aRDD.collect()[0][1]
<pyspark.resultiterable.ResultIterable object at 0x7f3bef8bdc18>
>>> aRDD.collect()[1][0]
'odd'
>>> sorted(aRDD.collect()[0][1])
[2, 4, 6, 8]
>>> sorted(aRDD.collect()[1][1])
[3, 5, 7, 9]
>>> 


多RDD操作


>>> intRDD1 = sc.parallelize([3,1,2,5,5])
>>> intRDD2 = sc.parallelize([5,6])
>>> intRDD3 = sc.parallelize([2,7])

union并集操作


#使用union函数进行并集运算
>>> intRDD1.union(intRDD2).union(intRDD3).collect()
[3, 1, 2, 5, 5, 5, 6, 2, 7]
>>> 

intersection交集运算


>>> intRDD1.intersection(intRDD2).collect()
[5]
>>> 

subtract差集运算


>>> intRDD1.subtract(intRDD2).collect()
[1, 2, 3]
>>> 

cartesian笛卡尔乘积运算


>>> intRDD1.cartesian(intRDD2).collect()
[(3, 5), (3, 6), (1, 5), (1, 6), (2, 5), (2, 6), (5, 5), (5, 5), (5, 6), (5, 6)]
>>> 


基本动作运算


#取出第一项数据
>>> intRDD.first()
2
#取出第二项数据
>>> intRDD.take(2)
[2, 3]
#从小到大排序,取出前3项
>>> intRDD.takeOrdered(3)
[2, 3, 4]
#从大到小排序取出前3项
>>> intRDD.takeOrdered(3,key=lambda x:-x)
[9, 8, 7]


统计功能


统计
>>> intRDD.stats()
(count: 8, mean: 5.5, stdev: 2.29128784748, max: 9.0, min: 2.0)
最小
>>> intRDD.min()
2
最大
>>> intRDD.max()
9
标准差
>>> intRDD.stdev()
2.2912878474779199
计数
>>> intRDD.count()
8
总和
>>> intRDD.sum()
44
平均
>>> intRDD.mean()
5.5
>>>


RDD Key-Value基本转换运算

Spark RDD支持键值(Key-Value)运算,Key-Value运算也是Map/Reduce的基础,接下来我们瞅瞅怎么玩~


创建范例Key-Value RDD
第一个字段是key,第二个字段是value
>>> kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
>>> kvRDD1.collect()
[(3, 4), (3, 6), (5, 6), (1, 2)]
>>> 
列出全部key值(第一个字段)
>>> kvRDD1.keys().collect()
[3, 3, 5, 1]
>>> 
列出values值(第二个字段)
>>> kvRDD1.values().collect()
[4, 6, 6, 2]
>>> 

利用filter筛选RDD内的元素


利用filter针对key筛选RDD内的元素
>>> kvRDD1.filter(lambda x:x[0]<5).collect()
[(3, 4), (3, 6), (1, 2)]
>>> 
利用filter针对value筛选RDD内的元素
>>> kvRDD1.filter(lambda x:x[1]<5).collect()
[(3, 4), (1, 2)]
>>> 


mapValues运算


#mapValues运算可以针对RDD内每一组(Key,Value)进行运算,并且产生另外一个RDD
将value的每一个值进行平方操作
>>> kvRDD1.mapValues(lambda x:x*x).collect()
[(3, 16), (3, 36), (5, 36), (1, 4)]


sortByKey排序(从小到大按照key排序)


>>> kvRDD1.sortByKey(ascending=True).collect()
[(1, 2), (3, 4), (3, 6), (5, 6)]
>>> kvRDD1.sortByKey().collect()
[(1, 2), (3, 4), (3, 6), (5, 6)]
>>> 

sortByKey按照key从大到小排序


>>> kvRDD1.sortByKey(ascending=False).collect()

[(5, 6), (3, 4), (3, 6), (1, 2)]

>>>

reduceByKey按照Key值进行reduce运算

也就是相同key的值进行一定规则的合并


[(3, 4), (3, 6), (5, 6), (1, 2)]
>>> kvRDD1.reduceByKey(lambda x,y:x+y).collect()
[(5, 6), (1, 2), (3, 10)]
>>> 


ok,今天就先写这么多吧,剩余的以及某些算子的区别我们在下篇文章中下表述~~~

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表