计算机系统应用教程网站

网站首页 > 技术文章 正文

RDD算子(2)—— MapPartitions算子

btikc 2024-10-30 02:06:40 技术文章 9 ℃ 0 评论

介绍

MapPartitions函数可以获取到每个分区的迭代器,在函数中通过这个分区整体的迭代器对整个分区的元素进行操作。


对比Map算子

Map:遍历算子,可以遍历RDD中的每个元素,遍历的单位是每条记录。

MapPartitions:遍历算子,可以改变RDD的格式,提高RDD并行度,遍历的单位是Partition,也就是在遍历之前它会将一个Partition的数据加载到内存中。

因此,MapPartitions算子效率更高


题目要求

使用MapPartitions算子,将RDD的数据("dog","salmon","salmon","rat","elephant")按照下述规则转换:将字符串与该字符串的长度合成一个元组,如:

dog --> (dog,3)

salmon --> (salmon,6)


思路分析

根据题目的要求,咱们可以先定义一个列表来保存输入数据,然后实例化一个SparkContext对象作为Spark程序的入口,然后将前面的列表数据转换为RDD1分区数据格式,接着自定义一个函数用于转换数据,最后用MapPartitions算子调用函数来转换RDD1数据得到RDD2数据。


源码

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

def func(iterator):
    result = []
    # 遍历每个字符串
    for item in iterator:
        result.append((item,len(item))) # 按照指定格式添加到结果里面
    return result
        
if __name__ == "__main__":
    #********** Begin **********#
    
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","mappartitions app")

    # 2. 一个内容为("dog", "salmon", "salmon", "rat", "elephant")的列表List
    ls = ["dog", "salmon", "salmon", "rat", "elephant"]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd1 = sc.parallelize(ls)

    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd1.collect())

    """
    使用 mapPartitions 算子,将 rdd 的数据 ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的规则进行转换操作,规则如下:
    需求:
        将字符串与该字符串的长度组合成一个元组,例如:
        dog  -->  (dog,3)
        salmon   -->  (salmon,6)
    """

    # 5.使用 mapPartitions 算子完成以上需求
    rdd2 = rdd1.mapPartitions(func)

    # 6.使用rdd.collect() 收集完成 mapPartitions 转换的元素
    print(rdd2.collect())

    # 7.停止 SparkContext
    sc.stop()

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表