计算机系统应用教程网站

网站首页 > 技术文章 正文

二十七、Spark销售订单数据分析 spark模型购买

btikc 2024-10-12 11:42:22 技术文章 9 ℃ 0 评论



1.数据源

  • 利用Python代码模拟生成,生成的数据如下:
     * id|brand|product|category|cost|price|quantity|province|timestamp
     * ---------------------------------------------------------------------------
     * 1|金龙鱼|金龙鱼花生油|生活用品|52|75|64|云南|1593536348
     * 2|金龙鱼|金龙鱼菜籽油|生活用品|48|62|16|河北|1593536348
     * 3|金龙鱼|金龙鱼玉米油|生活用品|34|53|72|河北|1593536348
  • 生成数据的部分代码如下, 如果你需要完整代码,请关注我并私信"销售订单"
def produce_orders(file_name, num):
    provinces = (
         "江苏", "浙江", "安徽", "江西", "山东", 
         "山西", "河南", "河北", "湖北", "湖南", 
         "广东", "广西", "陕西", "甘肃", "福建", 
         "四川", "黑龙将", "辽宁",
         "吉林", "北京", "上海", "天津", "重庆", 
         "海南", "新疆", "云南", "贵州", "宁夏", "西藏", 
         "香港", "澳门", "台湾", "内蒙古")
    products = (
        ("金龙鱼花生油", "生活用品", 52, 75, "金龙鱼"), 
        ("金龙鱼植物油", "生活用品", 58, 80, "金龙鱼"),
        ("金龙鱼大豆油", "生活用品", 54, 73, "金龙鱼"),
...........
...........
...........
        ("西湖龙井", "茶叶", 22, 49, "龙井"), ("信阳毛尖", "茶叶", 13, 25, "信阳毛尖"),
        ("溧阳白茶", "茶叶", 23, 59, "溧阳白茶"), ("太平猴魁", "茶叶", 30, 69, "太平猴魁"), 
        ("安溪铁观音", "茶叶", 26, 43, "铁观音"),
        ("黄山毛峰", "茶叶", 15, 24, "毛峰")
    )
    with open(file_name, mode='a', encoding='utf-8') as f:
        counter = 0
        for i in range(num):
            for j in range(len(products)):
                counter += 1
                province = provinces[random.randint(0, len(provinces) - 1)]
                product_name = products[j][0]
                category = products[j][1]
                cost = products[j][2]
                price = products[j][3]
                brand = products[j][4]
                quantity = random.randint(10, 100)
                timestamp = int(time.time())

                record = str(counter) + "|" + brand + "|" + product_name + "|" + str(category) + "|"+ str(cost) + "|"+ str(price) + "|" + str(
                    quantity) + "|" + province + "|" + str(timestamp)
                f.write(record + "\n")
                time.sleep(0.0001)
  • 数据集生成好了利用python代码自动上传HDFS
def write_to_hdfs(file_name):
    # 利用pyhdfs连接HDFS集群,用于上传生成的数据
    fs = pyhdfs.HdfsClient(hosts='node01:9870,node02:9870', randomize_hosts=False, user_name='hadoop', timeout=1000,
                           max_tries=1000)
    # HDFS要存放数据的目录
    hdfs_file_dir = '/user/hadoop/datasets/orders/'

    # 如果该目录不存在,则创建
    if not fs.exists(hdfs_file_dir):
        fs.mkdirs(hdfs_file_dir)

    # 上传文件, src为本地生成的文件路径,dest为hdfs上要存放的路径
    src = os.getcwd() + "/" + file_name
    dest = hdfs_file_dir + file_name
    # 从本地上传文件到hdfs指定目录
    fs.copy_from_local(src, dest)

2.Spark销售数据分析

由于目前我的文章只更新到了SparkCore部分,所以,现在的分析都是基于SparkCore提供的算子进行分析,在更新完SparkSql和SparkStreaming后会有更加方便的方法来分析这部分数据,后面会慢慢更新这部分内容。

为了清楚地表达代码的逻辑,分析这部分我使用了scala语言,后续也会整合python版本和java版本的。

  • 计算销售额排名的前10的省份
  /**
   * 1.计算销售额排名的前10的省份
   *
   * @param productOrderCached
   */
  def computeTop10ProvinceSales(productOrderCached: RDD[ProductOrder]): Unit = {
    //把province和grossProfit组成K,V格式的RDD
    val provinceAndGrossProfit: RDD[(String, Float)] = productOrderCached.map(po => {
      (po.province, po.grossProfit)
    })

    //按照省份进行聚合
    val reducedProvinceAndGrossProfit: RDD[(String, Float)] = provinceAndGrossProfit.reduceByKey(_ + _)

    //按照聚合好的grossProfit进行倒排序
    val sortedProvinceAndGrossProfit: RDD[(String, Float)] = reducedProvinceAndGrossProfit.sortBy(tup => tup._2, false)

    //取出grossProfit前10省份的销量
    val results: Array[(String, Float)] = sortedProvinceAndGrossProfit.take(10)
    //打印结果,一般是生成json格式文件或者直接进入下游(比如,hdfs, mysql, kafka等)
    results.foreach(println)
  }
  • 计算销售额排名的前5的种类
  /**
   * 2.计算销售额排名的前5的种类
   */
  def computeTop5CategorySales(productOrderCached: RDD[ProductOrder]) = {
    //把category和grossProfit组成K,V格式的RDD
    val categoryAndGrossProfit: RDD[(String, Float)] = productOrderCached.map(po => {
      (po.category, po.grossProfit)
    })

    //按照category进行聚合
    val reducedCategoryAndGrossProfit: RDD[(String, Float)] = categoryAndGrossProfit.reduceByKey(_ + _)

    //按照聚合好的grossProfit进行倒排序
    val sortedCategoryAndGrossProfit: RDD[(String, Float)] = reducedCategoryAndGrossProfit.sortBy(tup => tup._2, false)

    //取出grossProfit前10的category的销量
    val results: Array[(String, Float)] = sortedCategoryAndGrossProfit.take(5)
    //打印结果,一般是生成json格式文件或者直接进入下游(比如,hdfs, mysql, kafka等)
    results.foreach(println)
  }
  • 计算最畅销的5个品牌
/**
   * 3.计算最畅销的5个品牌
   */
  def computeTop5Brand(productOrderCached: RDD[ProductOrder]) = {
    //对每条销售订单的brand计数1,并组成K,V格式的RDD
    val brandAndOne: RDD[(String, Int)] = productOrderCached.map(po => {
      (po.brand, 1)
    })

    //按照brand进行聚合
    val reducedBrandCounts: RDD[(String, Int)] = brandAndOne.reduceByKey(_ + _)

    //按照brand的计数进行倒排序
    val sortedBrandAndCounts: RDD[(String, Int)] = reducedBrandCounts.sortBy(tup => tup._2, false)

    //取出前10
    val results: Array[(String, Int)] = sortedBrandAndCounts.take(5)

    results.foreach(println)
  }
  • 统计最挣钱的10个商品
  /**
   * 4.统计最挣钱的10个商品
   */
  def computeTop10NetProfit(productOrderCached: RDD[ProductOrder]) = {
    // 取出产品名称和净利润组成K,V格式的RDD
    val productAndNetProfit: RDD[(String, Float)] = productOrderCached.map(po => {
      (po.product, po.netProfit)
    })

    // 按产品聚合
    val reducedProductAndNetProfit: RDD[(String, Float)] = productAndNetProfit.reduceByKey(_ + _)
    // 按照净利润倒排序
    val sortedProductAndNetProfit: RDD[(String, Float)] = reducedProductAndNetProfit.sortBy(tup => tup._2, false)

    // 取出前10
    val results: Array[(String, Float)] = sortedProductAndNetProfit.take(10)

    //打印结果
    results.foreach(println)

  }

后话

以上只是利用现有的SparkCore做的销售数据的分析,后面会持续更新利用SparkSQL+Hive与SparkStreaming对接Kafka用流处理的方式来分析处理数据。

由于测试数据文件比较大(建议下载源码生成),如果您需要全部源码与生成的测试数据,请关注我并发送私信"销售订单"。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表