网站首页 > 技术文章 正文
题目要求
使用Spark算子来处理好友推荐问题,以第二行数据为例:这个人叫 world ,他有三个好友,分别是:hadoop、hello 和 hive。hadoop、hello 和 hive 之间就是间接好友。word 与 hadoop 、 hello 、hive 属于直接好友
输出要求:
思路
下面是思路的几个步骤:
- 实例化SparkContext对象
- 读取文件数据转换为RDD1数据
- 将RDD1的数据中字符串进行map处理得到RDD2数据。
- 将处理后的RDD2数据进行mapPartitions处理得到RDD3数据。
- 将RDD3数据进行reduceByKey叠加处理得到RDD4数据。
- 将RDD4数据进行filter过滤处理得到RDD5数据。
实例化SparkContext对象
sc = SparkContext("local", "test2 app")
将文件数据转换为RDD1数据
sparkcontext对象中提供textFile()方法来将外部数据转换为RDD数据
rdd_1 = sc.textFile("/root/friend.txt")
将RDD1数据的字符串进行分割处理
根据题目中的输入要求,他每行的字符串是需要用split函数分割的,由于题目中采用的是utf-8编码,因此还需要对字符串进行utf-8编码,因此这里可以用map算子等其它算子进行处理,然后用lambda表达式定义处理的规则:先进行首尾空格处理,然后进行utf-8编码,最后进行切割。
rdd_2 = rdd_1.map(lambda x:x.strip().encode('utf-8').split(" "))
将处理后的RDD2数据进行关系处理
得到每个人名的列表集合后,咱们就要处理他们的关系,根据题目要求,第一个人名和后面的人名是直接好友关系,后面的人名互相是间接好友关系,由于需要统计间接好友数量,同时题目也要求人名关系需要按照name1_name2这样的格式,名字顺序根据hash来定,因此咱们可以将每个关系定义为(key,value)格式,其中key为两个人名的字符串,value即为他们的关系,将value设置为0说明这两个人是直接好友关系,value设置为1说明这两个是间接好友关系,等后面统计则可以直接相加。
- 定义word函数来处理两个人名顺序
def word(str1, str2):
if hash(str1) > hash(str2):
return str1 + '_' + str2
else:
return str2 + '_' + str1
- 定义relation函数来统计好友关系集合
def relation(items):
result = []
for i in range(1, len(items)):
result.append((word(items[0], items[i]), 0))
for j in range(i+1, len(items)):
result.append((word(items[i], items[j]), 1))
return result
- 将RDD2数据用flatMap算子处理并合并分区结果
rdd_3 = rdd_2.flatMap(relation)
对处理后的数据进行叠加处理
在得到每个人的关系数据后,咱们现在开始统计间接关系数量和直接关系数量,根据前面的解释:如果两人中有一个的(key,value)数据的value为0时,说明这两个人是直接关系,如果value为1的话,说明两个人是间接关系,可以直接相加。因此使用reduceByKey算子来进行叠加处理,然后使用lambda表达式来设定叠加规则。
rdd_4 = rdd_3.reduceByKey(lambda x,y:0 if x==0 or y==0 else x+y)
将得到的关系数据进行过滤
在得到所有人的关系统计数量后,由于题目只需要间接关系的数量,因此这里可以用filter算子来进行过滤:当两人是直接关系则(key,value)的value肯定是为0,间接关系的value肯定是大于0的。
rdd_5 = rdd_4.filter(lambda x:x[1]>0)
打印RDD5数据内容
print(rdd_5.collect())
源码
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
# 根据hashcode来判断先后关系
def word(str1, str2):
if hash(str1) > hash(str2):
return str1 + '_' + str2
else:
return str2 + '_' + str1
def relation(items):
result = []
for i in range(1, len(items)):
result.append((word(items[0], items[i]), 0))
for j in range(i+1, len(items)):
result.append((word(items[i], items[j]), 1))
return result
if __name__ == "__main__":
"""
需求:对本地文件系统URI为:/root/friend.txt 的数据统计间接好友的数量
"""
sc = SparkContext("local", "test2 app")
rdd_1 = sc.textFile("/root/friend.txt")
rdd_2 = rdd_1.map(lambda x:x.strip().encode('utf-8').split(" "))
rdd_3 = rdd_2.flatMap(relation)
rdd_4 = rdd_3.reduceByKey(lambda x,y:0 if x==0 or y==0 else x+y)
rdd_5 = rdd_4.filter(lambda x:x[1]>0)
print(rdd_5.collect())
猜你喜欢
- 2024-10-12 大佬用10小时就把Spark讲完了,附6大技术文档
- 2024-10-12 浅析图数据库 Nebula Graph 数据导入工具——Spark Writer
- 2024-10-12 Spark Streaming 和 Flink 谁是数据开发者的最爱?
- 2024-10-12 分享几点 Spark Streaming 调优实践经验
- 2024-10-12 大数据学习之计算天下——SPARK的那些事
- 2024-10-12 第二篇|Spark core编程指南 spark编程软件
- 2024-10-12 Spark计算引擎 spark是基于什么计算引擎
- 2024-10-12 Spark Shuffle机制 sparkshuffle原理
- 2024-10-12 一文带你了解SparkStreaming窗口函数
- 2024-10-12 深度预警:Spark运行原理 简述spark的运行架构和原理
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)