Tuesday, 26 May 2020

Most Viewed, Most Rated Movies Analysis using Pyspark

Movie Data Analysis using Pyspark

movies.dat:
E:\\vow\\ml-1m\\movies.dat
MovieID,Title,Genres
1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
ratings.dat:
E:\\vow\\ml-1m\\ratings.dat
UserID,MovieID,Rating,Timestamp
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
users.dat:
E:\\vow\\ml-1m\\users.dat
UserID,Gender,Age,Occupation,ZipCode
1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
Linux location:
hdfs dfs -cat hdfs://localhost:9000/SparkFiles/movies.dat
hdfs dfs -cat hdfs://localhost:9000/SparkFiles/ratings.dat
hdfs dfs -cat hdfs://localhost:9000/SparkFiles/users.dat
Requirement:
Top 10 most viewed movies with their movie names (Z-A, A-Z)
Top 20 rated movies (The movie should be rated / viewed by at least 40 users)
How the Genres ranked by Average rating for each profession,  and age group

import collections
from datetime import datetime

Movie = collections.namedtuple('Movie',['MovieID','Title','Genres']) 

def parseMovie(_row):
fields = _row.split("::")
movieid = (int) (fields[0])
title = fields[1]
genres = fields[2]
M = Movie(movieid,title,genres)
return M

#print(parseMovie("1::Toy Story (1995)::Animation|Children's|Comedy"))
movies_r1 = spark.sparkContext.textFile("hdfs://localhost:9000/SparkFiles/movies.dat")
movies_r2 = movies_r1.map(lambda x:(str)(x)).map(parseMovie)
#movies_r2.take(2)

#[Movie(MovieID=1, Title='Toy Story (1995)', Genres="Animation|Children's|Comedy"),
#Movie(MovieID=2, Title='Jumanji (1995)', Genres="Adventure|Children's|Fantasy")]
 
 


Rating = collections.namedtuple('Rating',['UserID','MovieID','Rating','Timestamp']) 

def parseRatingRecord(_row):
fields = _row.split("::")
userid = (int) (fields[0])
movieid = (int) (fields[1])
rating = (int) (fields[2])
_timestamp = datetime.fromtimestamp((int) (fields[3]))
_rating = Rating(userid,movieid,rating,_timestamp)
return _rating

#print(parseRatingRecord("1::1193::5::978300760"))

ratings_r1 = spark.sparkContext.textFile("hdfs://localhost:9000/SparkFiles/ratings.dat")
ratings_r2 = ratings_r1.map(lambda x:(str)(x)).map(parseRatingRecord)
#ratings_r2.take(2)


User = collections.namedtuple("Users",["UserID","Gender","Age","Occupation","ZipCode"])

def parseUserRecord(_row):
fields = _row.split("::")
userid = (int)(fields[0])
gender= fields[1]
age = (int) (fields[2])
occupation = (int) (fields[3])
zipcode = (int) (fields[4])
_user = User(userid,gender,age,occupation,zipcode) 
return _user

#print(parseUserRecord("3::M::25::15::55117"))            

users_r1 = spark.sparkContext.textFile("hdfs://localhost:9000/SparkFiles/users.dat")
users_r2 = users_r1.map(lambda x:(str)(x)).map(parseUserRecord)
#users_r2.take(2)

#[Users(UserID=1, Gender='F', Age=1, Occupation=10, ZipCode=48067),
# Users(UserID=2, Gender='M', Age=56, Occupation=16, ZipCode=70072)]
 
 
dfMovies  = movies_r2.toDF()
dfRatings = ratings_r2.toDF()
dfUsers = users_r2.toDF()
 
dfMovies.printSchema()
dfRatings.printSchema()
dfUsers.printSchema()

dfMovies.printSchema()
root
 |-- MovieID: long (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genres: string (nullable = true)
 
 
dfRatings.printSchema()
root
 |-- UserID: long (nullable = true)
 |-- MovieID: long (nullable = true)
 |-- Rating: long (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 
 
dfUsers.printSchema()
root
 |-- UserID: long (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Occupation: long (nullable = true)
 |-- ZipCode: long (nullable = true)
 
dfMovies.show(5)
+-------+--------------------+--------------------+
|MovieID|               Title|              Genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Animation|Childre...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|        Comedy|Drama|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+

dfRatings.show(5)
+------+-------+------+-------------------+
|UserID|MovieID|Rating|          Timestamp|
+------+-------+------+-------------------+
|     1|   1193|     5|2001-01-01 03:42:40|
|     1|    661|     3|2001-01-01 04:05:09|
|     1|    914|     3|2001-01-01 04:02:48|
|     1|   3408|     4|2001-01-01 03:34:35|
|     1|   2355|     5|2001-01-07 05:08:11|
+------+-------+------+-------------------+

dfUsers.show(5)
+------+------+---+----------+-------+
|UserID|Gender|Age|Occupation|ZipCode|
+------+------+---+----------+-------+
|     1|     F|  1|        10|  48067|
|     2|     M| 56|        16|  70072|
|     3|     M| 25|        15|  55117|
|     4|     M| 45|         7|   2460|
|     5|     M| 25|        20|  55455|
+------+------+---+----------+-------+



#Joining Movies and Ratings tables

from pyspark.sql.functions import col

dfMovies_Ratings = dfMovies.join(dfRatings,dfMovies["MovieID"] == dfRatings["MovieID"],"inner")
dfMovies_Ratings.printSchema()
dfMovies_Ratings.show(5)

root
 |-- MovieID: long (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- UserID: long (nullable = true)
 |-- MovieID: long (nullable = true)
 |-- Rating: long (nullable = true)
 |-- Timestamp: timestamp (nullable = true)

+-------+--------------+------+------+-------+------+-------------------+
|MovieID|         Title|Genres|UserID|MovieID|Rating|          Timestamp|
+-------+--------------+------+------+-------+------+-------------------+
|     26|Othello (1995)| Drama|    18|     26|     4|2000-12-30 11:52:15|
|     26|Othello (1995)| Drama|    69|     26|     4|2000-12-27 07:24:10|
|     26|Othello (1995)| Drama|   229|     26|     4|2002-12-10 12:29:33|
|     26|Othello (1995)| Drama|   342|     26|     4|2000-12-09 10:41:17|
|     26|Othello (1995)| Drama|   524|     26|     3|2000-12-07 11:33:32|
+-------+--------------+------+------+-------+------+-------------------+
 
 
#Solution : 1
from pyspark.sql.functions import col,count,desc
dfMovies_Ratings2 = dfMovies_Ratings.select("Title","UserID").groupBy("Title").agg(count("UserID").alias("Views")).orderBy(desc("Views")).limit(20)
dfMovies_Ratings2.show()

+--------------------+-----+
|               Title|Views|
+--------------------+-----+
|American Beauty (...| 3428|
|Star Wars: Episod...| 2991|
|Star Wars: Episod...| 2990|
|Star Wars: Episod...| 2883|
|Jurassic Park (1993)| 2672|
|Saving Private Ry...| 2653|
|Terminator 2: Jud...| 2649|
|  Matrix, The (1999)| 2590|
|Back to the Futur...| 2583|
|Silence of the La...| 2578|
| Men in Black (1997)| 2538|
|Raiders of the Lo...| 2514|
|        Fargo (1996)| 2513|
|Sixth Sense, The ...| 2459|
|   Braveheart (1995)| 2443|
|Shakespeare in Lo...| 2369|
|Princess Bride, T...| 2318|
|Schindler's List ...| 2304|
|L.A. Confidential...| 2288|
|Groundhog Day (1993)| 2278|
+--------------------+-----+


dfMovies_Ratings2.printSchema()
root
 |-- Title: string (nullable = true)
 |-- Views: long (nullable = false)


#SparkSQL approach

dfMovies_Ratings.createOrReplaceTempView("MovieRatingsView")
dfdfMovies_Ratings2 =spark.sql("select Title,count(UserID) as Views from MovieRatingsView group by Title order by Views desc limit 20")
dfMovies_Ratings2.show()

dfMovies_Ratings2.show()

+--------------------+-----+
|               Title|Views|
+--------------------+-----+
|American Beauty (...| 3428|
|Star Wars: Episod...| 2991|
|Star Wars: Episod...| 2990|
|Star Wars: Episod...| 2883|
|Jurassic Park (1993)| 2672|
|Saving Private Ry...| 2653|
|Terminator 2: Jud...| 2649|
|  Matrix, The (1999)| 2590|
|Back to the Futur...| 2583|
|Silence of the La...| 2578|
| Men in Black (1997)| 2538|
|Raiders of the Lo...| 2514|
|        Fargo (1996)| 2513|
|Sixth Sense, The ...| 2459|
|   Braveheart (1995)| 2443|
|Shakespeare in Lo...| 2369|
|Princess Bride, T...| 2318|
|Schindler's List ...| 2304|
|L.A. Confidential...| 2288|
|Groundhog Day (1993)| 2278|
+--------------------+-----+

#Write the output into hdfs
dfMovies_Ratings2.coalesce(1).write.format("csv").option("header",True).save("hdfs://localhost:9000/SparkFiles/dfMoviesRatings")

hadoop@hadoop:~$ hdfs dfs -cat hdfs://localhost:9000/SparkFiles/dfMoviesRatings/part-00000-3035084e-127a-4227-93f0-1acfc66b97ec-c000.csv

American Beauty (1999),3428
Star Wars: Episode IV - A New Hope (1977),2991
Star Wars: Episode V - The Empire Strikes Back (1980),2990
Star Wars: Episode VI - Return of the Jedi (1983),2883
Jurassic Park (1993),2672
Saving Private Ryan (1998),2653
Terminator 2: Judgment Day (1991),2649
"Matrix, The (1999)",2590
Back to the Future (1985),2583
"Silence of the Lambs, The (1991)",2578
Men in Black (1997),2538
Raiders of the Lost Ark (1981),2514
Fargo (1996),2513
"Sixth Sense, The (1999)",2459
Braveheart (1995),2443
Shakespeare in Love (1998),2369
"Princess Bride, The (1987)",2318
Schindler's List (1993),2304
L.A. Confidential (1997),2288
Groundhog Day (1993),2278

#Solution : 2
#Top 10 Rated Movies
from pyspark.sql.functions import avg

result2 = dfMovies_Ratings.select("Title","Rating","UserID").groupBy("Title").agg(count("UserID").alias("Views"),avg("Rating").alias("AvgRating"))
result3 = result2.filter("Views >= 40").orderBy(desc("AvgRating"))
result3.show(10)

+--------------------+-----+-----------------+
|               Title|Views|        AvgRating|
+--------------------+-----+-----------------+
|      Sanjuro (1962)|   69|4.608695652173913|
|Seven Samurai (Th...|  628|4.560509554140127|
|Shawshank Redempt...| 2227|4.554557700942973|
|Godfather, The (1...| 2223|4.524966261808367|
|Close Shave, A (1...|  657| 4.52054794520548|
|Usual Suspects, T...| 1783|4.517106001121705|
|Schindler's List ...| 2304|4.510416666666667|
|Wrong Trousers, T...|  882|4.507936507936508|
|Sunset Blvd. (a.k...|  470|4.491489361702127|
|Raiders of the Lo...| 2514|4.477724741447892|
+--------------------+-----+-----------------+


from pyspark.sql.functions import avg

result2 = dfMovies_Ratings.select("Title","Rating","UserID").groupBy("Title").agg(count("UserID").alias("Views"),avg("Rating").alias("AvgRating"))
result3 = result2.filter("Views >= 40").orderBy(desc("AvgRating"))
#result3.show(10)

#write the output into hdfs
result3.coalesce(1).write.format("csv").option("header",True).save("hdfs://localhost:9000/SparkFiles/top10Movies/")


hadoop@hadoop:~$ hdfs dfs -cat hdfs://localhost:9000/SparkFiles/dfMoviesRatings/part-00000-3035084e-127a-4227-93f0-1acfc66b97ec-c000.csv
Title,Views
American Beauty (1999),3428
Star Wars: Episode IV - A New Hope (1977),2991
Star Wars: Episode V - The Empire Strikes Back (1980),2990
Star Wars: Episode VI - Return of the Jedi (1983),2883
Jurassic Park (1993),2672
Saving Private Ryan (1998),2653
Terminator 2: Judgment Day (1991),2649
"Matrix, The (1999)",2590


No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...