网站首页 > 技术文章 正文
1.数据源
- 利用Python代码模拟生成,生成的数据如下:
* id|brand|product|category|cost|price|quantity|province|timestamp
* ---------------------------------------------------------------------------
* 1|金龙鱼|金龙鱼花生油|生活用品|52|75|64|云南|1593536348
* 2|金龙鱼|金龙鱼菜籽油|生活用品|48|62|16|河北|1593536348
* 3|金龙鱼|金龙鱼玉米油|生活用品|34|53|72|河北|1593536348
- 生成数据的部分代码如下, 如果你需要完整代码,请关注我并私信"销售订单"
def produce_orders(file_name, num):
provinces = (
"江苏", "浙江", "安徽", "江西", "山东",
"山西", "河南", "河北", "湖北", "湖南",
"广东", "广西", "陕西", "甘肃", "福建",
"四川", "黑龙将", "辽宁",
"吉林", "北京", "上海", "天津", "重庆",
"海南", "新疆", "云南", "贵州", "宁夏", "西藏",
"香港", "澳门", "台湾", "内蒙古")
products = (
("金龙鱼花生油", "生活用品", 52, 75, "金龙鱼"),
("金龙鱼植物油", "生活用品", 58, 80, "金龙鱼"),
("金龙鱼大豆油", "生活用品", 54, 73, "金龙鱼"),
...........
...........
...........
("西湖龙井", "茶叶", 22, 49, "龙井"), ("信阳毛尖", "茶叶", 13, 25, "信阳毛尖"),
("溧阳白茶", "茶叶", 23, 59, "溧阳白茶"), ("太平猴魁", "茶叶", 30, 69, "太平猴魁"),
("安溪铁观音", "茶叶", 26, 43, "铁观音"),
("黄山毛峰", "茶叶", 15, 24, "毛峰")
)
with open(file_name, mode='a', encoding='utf-8') as f:
counter = 0
for i in range(num):
for j in range(len(products)):
counter += 1
province = provinces[random.randint(0, len(provinces) - 1)]
product_name = products[j][0]
category = products[j][1]
cost = products[j][2]
price = products[j][3]
brand = products[j][4]
quantity = random.randint(10, 100)
timestamp = int(time.time())
record = str(counter) + "|" + brand + "|" + product_name + "|" + str(category) + "|"+ str(cost) + "|"+ str(price) + "|" + str(
quantity) + "|" + province + "|" + str(timestamp)
f.write(record + "\n")
time.sleep(0.0001)
- 数据集生成好了利用python代码自动上传HDFS
def write_to_hdfs(file_name):
# 利用pyhdfs连接HDFS集群,用于上传生成的数据
fs = pyhdfs.HdfsClient(hosts='node01:9870,node02:9870', randomize_hosts=False, user_name='hadoop', timeout=1000,
max_tries=1000)
# HDFS要存放数据的目录
hdfs_file_dir = '/user/hadoop/datasets/orders/'
# 如果该目录不存在,则创建
if not fs.exists(hdfs_file_dir):
fs.mkdirs(hdfs_file_dir)
# 上传文件, src为本地生成的文件路径,dest为hdfs上要存放的路径
src = os.getcwd() + "/" + file_name
dest = hdfs_file_dir + file_name
# 从本地上传文件到hdfs指定目录
fs.copy_from_local(src, dest)
2.Spark销售数据分析
由于目前我的文章只更新到了SparkCore部分,所以,现在的分析都是基于SparkCore提供的算子进行分析,在更新完SparkSql和SparkStreaming后会有更加方便的方法来分析这部分数据,后面会慢慢更新这部分内容。
为了清楚地表达代码的逻辑,分析这部分我使用了scala语言,后续也会整合python版本和java版本的。
- 计算销售额排名的前10的省份
/**
* 1.计算销售额排名的前10的省份
*
* @param productOrderCached
*/
def computeTop10ProvinceSales(productOrderCached: RDD[ProductOrder]): Unit = {
//把province和grossProfit组成K,V格式的RDD
val provinceAndGrossProfit: RDD[(String, Float)] = productOrderCached.map(po => {
(po.province, po.grossProfit)
})
//按照省份进行聚合
val reducedProvinceAndGrossProfit: RDD[(String, Float)] = provinceAndGrossProfit.reduceByKey(_ + _)
//按照聚合好的grossProfit进行倒排序
val sortedProvinceAndGrossProfit: RDD[(String, Float)] = reducedProvinceAndGrossProfit.sortBy(tup => tup._2, false)
//取出grossProfit前10省份的销量
val results: Array[(String, Float)] = sortedProvinceAndGrossProfit.take(10)
//打印结果,一般是生成json格式文件或者直接进入下游(比如,hdfs, mysql, kafka等)
results.foreach(println)
}
- 计算销售额排名的前5的种类
/**
* 2.计算销售额排名的前5的种类
*/
def computeTop5CategorySales(productOrderCached: RDD[ProductOrder]) = {
//把category和grossProfit组成K,V格式的RDD
val categoryAndGrossProfit: RDD[(String, Float)] = productOrderCached.map(po => {
(po.category, po.grossProfit)
})
//按照category进行聚合
val reducedCategoryAndGrossProfit: RDD[(String, Float)] = categoryAndGrossProfit.reduceByKey(_ + _)
//按照聚合好的grossProfit进行倒排序
val sortedCategoryAndGrossProfit: RDD[(String, Float)] = reducedCategoryAndGrossProfit.sortBy(tup => tup._2, false)
//取出grossProfit前10的category的销量
val results: Array[(String, Float)] = sortedCategoryAndGrossProfit.take(5)
//打印结果,一般是生成json格式文件或者直接进入下游(比如,hdfs, mysql, kafka等)
results.foreach(println)
}
- 计算最畅销的5个品牌
/**
* 3.计算最畅销的5个品牌
*/
def computeTop5Brand(productOrderCached: RDD[ProductOrder]) = {
//对每条销售订单的brand计数1,并组成K,V格式的RDD
val brandAndOne: RDD[(String, Int)] = productOrderCached.map(po => {
(po.brand, 1)
})
//按照brand进行聚合
val reducedBrandCounts: RDD[(String, Int)] = brandAndOne.reduceByKey(_ + _)
//按照brand的计数进行倒排序
val sortedBrandAndCounts: RDD[(String, Int)] = reducedBrandCounts.sortBy(tup => tup._2, false)
//取出前10
val results: Array[(String, Int)] = sortedBrandAndCounts.take(5)
results.foreach(println)
}
- 统计最挣钱的10个商品
/**
* 4.统计最挣钱的10个商品
*/
def computeTop10NetProfit(productOrderCached: RDD[ProductOrder]) = {
// 取出产品名称和净利润组成K,V格式的RDD
val productAndNetProfit: RDD[(String, Float)] = productOrderCached.map(po => {
(po.product, po.netProfit)
})
// 按产品聚合
val reducedProductAndNetProfit: RDD[(String, Float)] = productAndNetProfit.reduceByKey(_ + _)
// 按照净利润倒排序
val sortedProductAndNetProfit: RDD[(String, Float)] = reducedProductAndNetProfit.sortBy(tup => tup._2, false)
// 取出前10
val results: Array[(String, Float)] = sortedProductAndNetProfit.take(10)
//打印结果
results.foreach(println)
}
后话
以上只是利用现有的SparkCore做的销售数据的分析,后面会持续更新利用SparkSQL+Hive与SparkStreaming对接Kafka用流处理的方式来分析处理数据。
由于测试数据文件比较大(建议下载源码生成),如果您需要全部源码与生成的测试数据,请关注我并发送私信"销售订单"。
猜你喜欢
- 2024-10-12 大佬用10小时就把Spark讲完了,附6大技术文档
- 2024-10-12 浅析图数据库 Nebula Graph 数据导入工具——Spark Writer
- 2024-10-12 Spark Streaming 和 Flink 谁是数据开发者的最爱?
- 2024-10-12 分享几点 Spark Streaming 调优实践经验
- 2024-10-12 大数据学习之计算天下——SPARK的那些事
- 2024-10-12 第二篇|Spark core编程指南 spark编程软件
- 2024-10-12 Spark计算引擎 spark是基于什么计算引擎
- 2024-10-12 Spark Shuffle机制 sparkshuffle原理
- 2024-10-12 一文带你了解SparkStreaming窗口函数
- 2024-10-12 深度预警:Spark运行原理 简述spark的运行架构和原理
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)