计算机系统应用教程网站

网站首页 > 技术文章 正文

Spark实战(2)——好友推荐 spark实时推荐

btikc 2024-10-12 11:41:42 技术文章 10 ℃ 0 评论

题目要求

使用Spark算子来处理好友推荐问题,以第二行数据为例:这个人叫 world ,他有三个好友,分别是:hadoophellohivehadoophellohive 之间就是间接好友。wordhadoophellohive 属于直接好友

输出要求:



思路

下面是思路的几个步骤:

  1. 实例化SparkContext对象
  2. 读取文件数据转换为RDD1数据
  3. 将RDD1的数据中字符串进行map处理得到RDD2数据。
  4. 将处理后的RDD2数据进行mapPartitions处理得到RDD3数据。
  5. 将RDD3数据进行reduceByKey叠加处理得到RDD4数据。
  6. 将RDD4数据进行filter过滤处理得到RDD5数据。



实例化SparkContext对象

sc = SparkContext("local", "test2 app")


将文件数据转换为RDD1数据

sparkcontext对象中提供textFile()方法来将外部数据转换为RDD数据

    rdd_1 = sc.textFile("/root/friend.txt")


将RDD1数据的字符串进行分割处理

根据题目中的输入要求,他每行的字符串是需要用split函数分割的,由于题目中采用的是utf-8编码,因此还需要对字符串进行utf-8编码,因此这里可以用map算子等其它算子进行处理,然后用lambda表达式定义处理的规则:先进行首尾空格处理,然后进行utf-8编码,最后进行切割。

    rdd_2 = rdd_1.map(lambda x:x.strip().encode('utf-8').split(" "))


将处理后的RDD2数据进行关系处理

得到每个人名的列表集合后,咱们就要处理他们的关系,根据题目要求,第一个人名和后面的人名是直接好友关系,后面的人名互相是间接好友关系,由于需要统计间接好友数量,同时题目也要求人名关系需要按照name1_name2这样的格式,名字顺序根据hash来定,因此咱们可以将每个关系定义为(key,value)格式,其中key为两个人名的字符串value即为他们的关系,将value设置为0说明这两个人是直接好友关系,value设置为1说明这两个是间接好友关系,等后面统计则可以直接相加。

  1. 定义word函数来处理两个人名顺序
def word(str1, str2):
    if hash(str1) > hash(str2):
        return str1 + '_' + str2
    else:    
        return str2 + '_' + str1


  1. 定义relation函数来统计好友关系集合
def relation(items):
    result = []
    for i in range(1, len(items)):
        result.append((word(items[0], items[i]), 0))
        for j in range(i+1, len(items)):
            result.append((word(items[i], items[j]), 1))
    return result


  1. 将RDD2数据用flatMap算子处理并合并分区结果
rdd_3 = rdd_2.flatMap(relation)


对处理后的数据进行叠加处理

在得到每个人的关系数据后,咱们现在开始统计间接关系数量和直接关系数量,根据前面的解释:如果两人中有一个的(key,value)数据的value为0时,说明这两个人是直接关系,如果value为1的话,说明两个人是间接关系,可以直接相加。因此使用reduceByKey算子来进行叠加处理,然后使用lambda表达式来设定叠加规则。

    rdd_4 = rdd_3.reduceByKey(lambda x,y:0 if x==0 or y==0 else x+y)


将得到的关系数据进行过滤

在得到所有人的关系统计数量后,由于题目只需要间接关系的数量,因此这里可以用filter算子来进行过滤:当两人是直接关系则(key,value)的value肯定是为0,间接关系的value肯定是大于0的。

rdd_5 = rdd_4.filter(lambda x:x[1]>0)


打印RDD5数据内容

print(rdd_5.collect())


源码

# -*- coding: UTF-8 -*-
from pyspark import SparkContext
# 根据hashcode来判断先后关系
def word(str1, str2):
    if hash(str1) > hash(str2):
        return str1 + '_' + str2
    else:    
        return str2 + '_' + str1

def relation(items):
    result = []
    for i in range(1, len(items)):
        result.append((word(items[0], items[i]), 0))
        for j in range(i+1, len(items)):
            result.append((word(items[i], items[j]), 1))
    return result
 
if __name__ == "__main__":
    """
        需求:对本地文件系统URI为:/root/friend.txt 的数据统计间接好友的数量
    """
    sc = SparkContext("local", "test2 app")
    
    rdd_1 = sc.textFile("/root/friend.txt")

    rdd_2 = rdd_1.map(lambda x:x.strip().encode('utf-8').split(" "))

    rdd_3 = rdd_2.flatMap(relation)
    
    rdd_4 = rdd_3.reduceByKey(lambda x,y:0 if x==0 or y==0 else x+y)
    
    rdd_5 = rdd_4.filter(lambda x:x[1]>0)

    print(rdd_5.collect())

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表