网站首页 > 技术文章 正文
介绍
MapPartitions函数可以获取到每个分区的迭代器,在函数中通过这个分区整体的迭代器对整个分区的元素进行操作。
对比Map算子
Map:遍历算子,可以遍历RDD中的每个元素,遍历的单位是每条记录。
MapPartitions:遍历算子,可以改变RDD的格式,提高RDD并行度,遍历的单位是Partition,也就是在遍历之前它会将一个Partition的数据加载到内存中。
因此,MapPartitions算子效率更高。
题目要求
使用MapPartitions算子,将RDD的数据("dog","salmon","salmon","rat","elephant")按照下述规则转换:将字符串与该字符串的长度合成一个元组,如:
dog --> (dog,3)
salmon --> (salmon,6)
思路分析
根据题目的要求,咱们可以先定义一个列表来保存输入数据,然后实例化一个SparkContext对象作为Spark程序的入口,然后将前面的列表数据转换为RDD1分区数据格式,接着自定义一个函数用于转换数据,最后用MapPartitions算子调用函数来转换RDD1数据得到RDD2数据。
源码
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
def func(iterator):
result = []
# 遍历每个字符串
for item in iterator:
result.append((item,len(item))) # 按照指定格式添加到结果里面
return result
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local","mappartitions app")
# 2. 一个内容为("dog", "salmon", "salmon", "rat", "elephant")的列表List
ls = ["dog", "salmon", "salmon", "rat", "elephant"]
# 3.通过 SparkContext 并行化创建 rdd
rdd1 = sc.parallelize(ls)
# 4.使用rdd.collect() 收集 rdd 的元素。
print(rdd1.collect())
"""
使用 mapPartitions 算子,将 rdd 的数据 ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的规则进行转换操作,规则如下:
需求:
将字符串与该字符串的长度组合成一个元组,例如:
dog --> (dog,3)
salmon --> (salmon,6)
"""
# 5.使用 mapPartitions 算子完成以上需求
rdd2 = rdd1.mapPartitions(func)
# 6.使用rdd.collect() 收集完成 mapPartitions 转换的元素
print(rdd2.collect())
# 7.停止 SparkContext
sc.stop()
猜你喜欢
- 2024-10-30 爆肝整理5000字!HTAP的关键技术有哪些?| StoneDB学术分享会#3
- 2024-10-30 「经验分享」MindStudio基于AscendCL应用开发流程
- 2024-10-30 spark中 RDD代码演示 spark中的rdd
- 2024-10-30 图像处理中,如何抓住事物的不变特征
- 2024-10-30 TypeScript 4.7 正式发布「2022.05.24」「官文全文翻译」
- 2024-10-30 读书笔记丨《离线和实时大数据开发实战》
- 2024-10-30 视觉SLAM面试题汇总-2019年秋招第一部分
- 2024-10-30 一文了解GaussDB 200整体描述 一文快速了解中国5000年历史
- 2024-10-30 「Flink实时数据分析系列」10. Flink 和流式应用运维(下)
- 2024-10-30 综述:特征点检测与匹配 常用的特征点检测算法
你 发表评论:
欢迎- 最近发表
-
- 在 Spring Boot 项目中使用 activiti
- 开箱即用-activiti流程引擎(active 流程引擎)
- 在springBoot项目中整合使用activiti
- activiti中的网关是干什么的?(activiti包含网关)
- SpringBoot集成工作流Activiti(完整源码和配套文档)
- Activiti工作流介绍及使用(activiti工作流会签)
- SpringBoot集成工作流Activiti(实际项目演示)
- activiti工作流引擎(activiti工作流引擎怎么用)
- 工作流Activiti初体验及在数据库中生成的表
- Activiti工作流浅析(activiti6.0工作流引擎深度解析)
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)