type
status
date
slug
summary
tags
category
icon
password
Property
Dec 2, 2022 12:05 PM

推荐系统实现

实现目标
基于物品的协同过滤(Item-Based CF)
实现步骤
  1. 获取数据,建立物品-用户矩阵,(注意任务目标是隐反馈,评分要置1
  1. 计算物品间相似度,选出候选商品
  1. 计算用户对候选物品的兴趣度
  1. 基于兴趣度排序推荐
实现发式
采用Dataframe格式,便于建立评分矩阵与计算相似度,但涉及表联立操作时语句稍复杂
实现难点
表的联立计算相关操作
最终代码
package org.stu.Difer import org.apache.spark.{SparkConf, SparkContext} import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.sql.functions.{expr, first} /** * itemCF * author: Difer */ class movieRecommend{ } object movieRecommend extends movieRecommend { // case class, 方便RDD, DataFrame转换 case class Rating(userId: Int, movieId: Int, rating: Int) case class Items(movieId: Int, title: String) def main(args: Array[String]): Unit = { // 过滤INFO Logger.getLogger("org").setLevel(Level.OFF) Logger.getLogger("akka").setLevel(Level.OFF) //本地调试 local模式 // val conf = new SparkConf().setAppName("MoviesRecommend").setMaster("local[2]") // yarn-cluster模式 val conf = new SparkConf().setAppName("MoviesRecommend").setMaster("yarn-cluster") val spark = SparkSession .builder() .appName("MoviesRecommend System") .config(conf) .getOrCreate() val sc=spark.sparkContext import spark.implicits._ sc.setLogLevel("ERROR") // 文件路径 val dataPath = args(0) val moviePath = args(1) // 读取评分数据与电影名称,id 数据 val dataFile = sc.textFile(dataPath).map(_.split("\t")) val movieFile = sc.textFile(moviePath).map(_.split("\\|")) // 隐反馈, 故让评分为1 dataFile.toDF().show() val df = dataFile.map(x => Rating(x(0).toInt,x(1).toInt, if (x(2).toInt > 0) 1 else 0)) val movieDf = movieFile.map(x => Items(x(0).toInt, x(1))).toDF() movieDf.createOrReplaceTempView("movie") val Array(train, test) = df.randomSplit(Array(0.8, 0.2), 2021) println("训练集") train.toDF().show(5) train.toDF().createOrReplaceTempView("train") test.toDF().createOrReplaceTempView("test") // 得item坐标矩阵, 以计算相似度 def parseToMatrix(data: RDD[Rating]): CoordinateMatrix = { val parsedData = data.map { case Rating(userId: Int, movieId: Int, rating: Int) => MatrixEntry(userId, movieId, rating) } new CoordinateMatrix(parsedData) } var dfMatrix = parseToMatrix(train) // 计算item相似度 def standardCosine(matrix: CoordinateMatrix): RDD[MatrixEntry] = { val similarity = matrix.toIndexedRowMatrix().columnSimilarities() val sim = similarity.entries sim } var schemas= Seq("item1", "item2", "sim") val sim = standardCosine(dfMatrix).toDF(schemas: _*) sim.createOrReplaceTempView("itemSim") // 测试集, 获取待推荐item, 相似度 拼接 val testItemSim = spark.sql( """ |select test.userId, test.movieId, test.rating label, train.movieId moviedRec, train.rating,sim from test |left join train on test.userId = train.userId |left join itemSim on test.movieId = itemSim.item1 and train.movieId = itemSim.item2 |order by test.userId |""".stripMargin) testItemSim.createOrReplaceTempView("testItemSim") println("相似度") testItemSim.show(5) // 取相似度前100 item val testItemSimRank100 = spark.sql( """ |select *, | rank (100) over( | partition by userId, movieId | order by sim desc)testItemsRank | from testItemSim |""".stripMargin) testItemSimRank100.createOrReplaceTempView("testItemSimRank100") // 计算喜好度 val testAndScore = spark.sql( """ |select userId,moviedRec, nvl(sum(rating*sim),0) pre |from testItemSimRank100 |group by userId,moviedRec |""".stripMargin ) testAndScore.createOrReplaceTempView("testAndScore") println("喜好度计算") testAndScore.show(5) // 取喜好度前10 val testAndPre = spark.sql( """ |select * from( |select userId, moviedRec, pre, row_number() over (partition by userId order by pre desc ) rank |from testAndScore)t |where rank < 11 |""".stripMargin) testAndPre.createOrReplaceTempView("testAndPre") // 拼接 movie, 取电影名称, 行转列 val resultDf = spark.sql( """ |select userId, title, rank from |testAndPre |left join movie where moviedRec = movie.movieId |""".stripMargin).groupBy("userId").pivot("rank").agg(first("title")) println("最终推荐表") resultDf.show() // 存储推荐结果 resultDf.write .format("com.databricks.spark.csv") .option("header", "true") .save(args(2)) spark.stop() } }
运行结果
notion image
 

程序打包

打包方式
Maven提供的assembly插件,该插件可以通过定义自己的descriptor来实现定制化打包,本次实验为了方便,直接使用了默认具有的jar-with-dependencies方式
插件配置
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <!-- 绑定到package生命周期阶段上 --> <phase>package</phase> <goals> <!-- 绑定到package生命周期阶段上 --> <goal>single</goal> </goals> </execution> </executions> </plugin>
 

上传数据集及Jar包

  1. 上传文件至虚拟机
    1. 通过vmware共享文件夹,复制文件至虚拟机(需安装VMware Tools)
      notion image
      共享文件夹处于“/mnt/hgfs/”目录下
      notion image
  1. 转移文件至docker容器hbase-master
    1. 用docker cp命令复制文件夹到hbase-master容器的“/home/”文件夹下
      notion image
      notion image
  1. 上传至hdfs
    1. notion image
 

查看spark程序执行状态

spark submit脚本
选择的yarn-cluster模式
hdfs dfs -rm -r /tmp/result export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop spark-submit \ --master yarn-cluster\ --num-executors 4 \ --driver-memory 512M \ --conf spark.default.parallelism=8 \ --class org.stu.Difer.movieRecommend moviesRecommend-1.0-SNAPSHOT.jar /tmp/data/ u.data /tmp/data/u.item /tmp/result
通过Spark UI查看程序执行情况
  1. 进入8080端口寻找任务,进入Spark UI查看任务状态
    1. notion image
  1. 查看任务信息
    1. 从跳转链接看见Application Master主机名是hbase-master(此处由于是由本机访问虚拟机,需更改为ip或在VMware添加映射)
      notion image
      由下图可见Driver与Executors信息,共有5个Executor
      Executor信息
      notion image
简单搭个搜索服务《一群穷孩子的人生实验》-有感