自从Amazone公布了协同过滤算法后,在推荐系统领域,它就占据了很重要的地位。不像传统的内容推荐,协同过滤不需要考虑物品的属性问题,用户的行为,行业问题等,只需要建立用户与物品的关联关系即可,可以物品之间更多的内在关系,类似于经典的啤酒与尿不湿的营销案例。所以,讲到推荐必须要首先分享协同过滤。下面代码实战基于sparkMLlib ASL 算法实战
- package com.gizwits.mllib
- import org.apache.log4j._
- import org.apache.spark.mllib.recommendation._
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
- /**
- * Created by feel
- *
- * moivelens 电影推荐 协同过滤算法实现电影推荐.目前spark 实现的算法有(交替最小二乘法(ALS))
- * 数据下载:http://grouplens.org/datasets/movielens/
- *
- */
- object MoiveRecommenderALS {
- /**
- *
- * @param input 电影评分数据
- * @param numIterations 迭代的次数
- * @param lambda ALS的正则化参数。
- * @param rank 模型中隐语义因子的个数。
- * @param numUserBlocks 用于并行化计算的分块个数 (设置为-1为自动配置)。
- * @param numProductBlocks 用于并行化计算的分块个数 (设置为-1为自动配置)。
- * @param implicitPrefs 决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本
- * @param userDataInput 用户数据输入
- */
- case class Params(
- input: String = null,
- numIterations: Int = 20,
- lambda: Double = 1.0,
- rank: Int = 10,
- numUserBlocks: Int = -1,
- numProductBlocks: Int = -1,
- implicitPrefs: Boolean = false,
- userDataInput: String = null)
- val numRecommender = 10
- def main(args: Array[String]) {
- // 设置日志级别
- val rootLogger = Logger.getRootLogger()
- Logger.getLogger("com.gizwits").setLevel(Level.ERROR)
- rootLogger.setLevel(Level.ERROR)
- val conf = new SparkConf()
- .setAppName("MoiveRecommenderALS")
- conf.setMaster("local[4]")
- val context = new SparkContext(conf)
- val inputDataPath = "file:///Users/feel/githome/idea/spark-exercise/src/main/resources/u.data"
- val userInputPath = "file:///Users/feel/githome/idea/spark-exercise/src/main/resources/u.user"
- //可以调整这些参数,不断优化结果,使均方差变小。比如iterations越多,lambda较小,均方差会较小,推荐结果较优
- val defaultParams = Params(
- inputDataPath, 20, 0.01, 10, -1, -1, false, userInputPath
- )
- //加载数据
- val data = context.textFile(inputDataPath)
- /**
- * *MovieLens ratings are on a scale of 1-5:
- * 5: Must see
- * 4: Will enjoy
- * 3: It's okay
- * 2: Fairly bad
- * 1: Awful
- */
- val ratings = data.map(_.split("\t") match {
- case Array(user, item, rate, time) => Rating(user.toInt, item.toInt, rate.toDouble)
- })
- //使用ALS建立推荐模型
- //也可以使用简单模式 val model = ALS.train(ratings, ranking, numIterations)
- val model = new ALS()
- .setRank(defaultParams.rank)
- .setIterations(defaultParams.numIterations)
- .setLambda(defaultParams.lambda)
- .setImplicitPrefs(defaultParams.implicitPrefs)
- .setUserBlocks(defaultParams.numUserBlocks)
- .setProductBlocks(defaultParams.numProductBlocks)
- .run(ratings)
- //预测
- predictMoive(defaultParams, context, model)
- //模型评估
- evaluateMode(ratings, model)
- //clean up
- context.stop()
- //end main
- }
- /**
- * 模型评估
- */
- private def evaluateMode(ratings: RDD[Rating], model: MatrixFactorizationModel) {
- //使用训练数据训练模型
- val usersProducets = ratings.map(r => r match {
- case Rating(user, product, rate) => (user, product)
- })
- //预测数据
- val predictions = model.predict(usersProducets).map(u => u match {
- case Rating(user, product, rate) => ((user, product), rate)
- })
- //将真实分数与预测分数进行合并
- val ratesAndPreds = ratings.map(r => r match {
- case Rating(user, product, rate) =>
- ((user, product), rate)
- }).join(predictions)
- //计算均方差
- val MSE = ratesAndPreds.map(r => r match {
- case ((user, product), (r1, r2)) =>
- val err = (r1 - r2)
- err * err
- }).mean()
- //打印出均方差值
- println("Mean Squared Error = " + MSE)
- }
- /**
- * 预测数据并保存到HBase中或其他存储引擎
- */
- private def predictMoive(params: Params, context: SparkContext, model: MatrixFactorizationModel) {
- val recommenders = new scala.collection.mutable.ArrayBuffer[scala.collection.mutable.HashMap[String, String]]();
- //读取需要进行电影推荐的用户数据
- val userData = context.textFile(params.userDataInput)
- userData.map(_.split("\\|") match {
- case Array(id, age, sex, job, x) => (id)
- }).collect().foreach(id => {
- //为用户推荐电影
- val rs = model.recommendProducts(id.toInt, numRecommender)
- var value = ""
- var key = 0
- //保存推荐数据到hbase中
- rs.foreach(r => {
- key = r.user
- value = value + r.product + ":" + r.rating + ","
- })
- //成功,则封装put对象,等待插入到Hbase中
- if (!value.equals("")) {
- val put = new scala.collection.mutable.HashMap[String, String]
- put += ("rowKey" -> key.toString)
- put += ("t:info" -> value)
- recommenders.+=(put)
- }
- })
- recommenders.foreach(println _)
- }
- }
[color=rgb(0, 0, 0) !important]复制代码
|
|
|
|
|
|