Movie Data Analysis using Pyspark
movies.dat:
E:\\vow\\ml-1m\\movies.dat
MovieID,Title,Genres
1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
ratings.dat:
E:\\vow\\ml-1m\\ratings.dat
UserID,MovieID,Rating,Timestamp
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
users.dat:
E:\\vow\\ml-1m\\users.dat
UserID,Gender,Age,Occupation,ZipCode
1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
Linux location:
hdfs dfs -cat hdfs://localhost:9000/SparkFiles/movies.dat
hdfs dfs -cat hdfs://localhost:9000/SparkFiles/ratings.dat
hdfs dfs -cat hdfs://localhost:9000/SparkFiles/users.dat
Requirement:
Top 10 most viewed movies with their movie names (Z-A, A-Z)
Top 20 rated movies (The movie should be rated / viewed by at least 40 users)
How the Genres ranked by Average rating for each profession, and age group
import collections
from datetime import datetime
Movie = collections.namedtuple('Movie',['MovieID','Title','Genres'])
def parseMovie(_row):
fields = _row.split("::")
movieid = (int) (fields[0])
title = fields[1]
genres = fields[2]
M = Movie(movieid,title,genres)
return M
#print(parseMovie("1::Toy Story (1995)::Animation|Children's|Comedy"))
movies_r1 = spark.sparkContext.textFile("hdfs://localhost:9000/SparkFiles/movies.dat")
movies_r2 = movies_r1.map(lambda x:(str)(x)).map(parseMovie)
#movies_r2.take(2)
#[Movie(MovieID=1, Title='Toy Story (1995)', Genres="Animation|Children's|Comedy"),
#Movie(MovieID=2, Title='Jumanji (1995)', Genres="Adventure|Children's|Fantasy")]
Rating = collections.namedtuple('Rating',['UserID','MovieID','Rating','Timestamp'])
def parseRatingRecord(_row):
fields = _row.split("::")
userid = (int) (fields[0])
movieid = (int) (fields[1])
rating = (int) (fields[2])
_timestamp = datetime.fromtimestamp((int) (fields[3]))
_rating = Rating(userid,movieid,rating,_timestamp)
return _rating
#print(parseRatingRecord("1::1193::5::978300760"))
ratings_r1 = spark.sparkContext.textFile("hdfs://localhost:9000/SparkFiles/ratings.dat")
ratings_r2 = ratings_r1.map(lambda x:(str)(x)).map(parseRatingRecord)
#ratings_r2.take(2)
User = collections.namedtuple("Users",["UserID","Gender","Age","Occupation","ZipCode"])
def parseUserRecord(_row):
fields = _row.split("::")
userid = (int)(fields[0])
gender= fields[1]
age = (int) (fields[2])
occupation = (int) (fields[3])
zipcode = (int) (fields[4])
_user = User(userid,gender,age,occupation,zipcode)
return _user
#print(parseUserRecord("3::M::25::15::55117"))
users_r1 = spark.sparkContext.textFile("hdfs://localhost:9000/SparkFiles/users.dat")
users_r2 = users_r1.map(lambda x:(str)(x)).map(parseUserRecord)
#users_r2.take(2)
#[Users(UserID=1, Gender='F', Age=1, Occupation=10, ZipCode=48067),
# Users(UserID=2, Gender='M', Age=56, Occupation=16, ZipCode=70072)]
dfMovies = movies_r2.toDF()
dfRatings = ratings_r2.toDF()
dfUsers = users_r2.toDF()
dfMovies.printSchema()
dfRatings.printSchema()
dfUsers.printSchema()
dfMovies.printSchema()
root
|-- MovieID: long (nullable = true)
|-- Title: string (nullable = true)
|-- Genres: string (nullable = true)
dfRatings.printSchema()
root
|-- UserID: long (nullable = true)
|-- MovieID: long (nullable = true)
|-- Rating: long (nullable = true)
|-- Timestamp: timestamp (nullable = true)
dfUsers.printSchema()
root
|-- UserID: long (nullable = true)
|-- Gender: string (nullable = true)
|-- Age: long (nullable = true)
|-- Occupation: long (nullable = true)
|-- ZipCode: long (nullable = true)
dfMovies.show(5)
+-------+--------------------+--------------------+
|MovieID| Title| Genres|
+-------+--------------------+--------------------+
| 1| Toy Story (1995)|Animation|Childre...|
| 2| Jumanji (1995)|Adventure|Childre...|
| 3|Grumpier Old Men ...| Comedy|Romance|
| 4|Waiting to Exhale...| Comedy|Drama|
| 5|Father of the Bri...| Comedy|
+-------+--------------------+--------------------+
dfRatings.show(5)
+------+-------+------+-------------------+
|UserID|MovieID|Rating| Timestamp|
+------+-------+------+-------------------+
| 1| 1193| 5|2001-01-01 03:42:40|
| 1| 661| 3|2001-01-01 04:05:09|
| 1| 914| 3|2001-01-01 04:02:48|
| 1| 3408| 4|2001-01-01 03:34:35|
| 1| 2355| 5|2001-01-07 05:08:11|
+------+-------+------+-------------------+
dfUsers.show(5)
+------+------+---+----------+-------+
|UserID|Gender|Age|Occupation|ZipCode|
+------+------+---+----------+-------+
| 1| F| 1| 10| 48067|
| 2| M| 56| 16| 70072|
| 3| M| 25| 15| 55117|
| 4| M| 45| 7| 2460|
| 5| M| 25| 20| 55455|
+------+------+---+----------+-------+
#Joining Movies and Ratings tables
from pyspark.sql.functions import col
dfMovies_Ratings = dfMovies.join(dfRatings,dfMovies["MovieID"] == dfRatings["MovieID"],"inner")
dfMovies_Ratings.printSchema()
dfMovies_Ratings.show(5)
root
|-- MovieID: long (nullable = true)
|-- Title: string (nullable = true)
|-- Genres: string (nullable = true)
|-- UserID: long (nullable = true)
|-- MovieID: long (nullable = true)
|-- Rating: long (nullable = true)
|-- Timestamp: timestamp (nullable = true)
+-------+--------------+------+------+-------+------+-------------------+
|MovieID| Title|Genres|UserID|MovieID|Rating| Timestamp|
+-------+--------------+------+------+-------+------+-------------------+
| 26|Othello (1995)| Drama| 18| 26| 4|2000-12-30 11:52:15|
| 26|Othello (1995)| Drama| 69| 26| 4|2000-12-27 07:24:10|
| 26|Othello (1995)| Drama| 229| 26| 4|2002-12-10 12:29:33|
| 26|Othello (1995)| Drama| 342| 26| 4|2000-12-09 10:41:17|
| 26|Othello (1995)| Drama| 524| 26| 3|2000-12-07 11:33:32|
+-------+--------------+------+------+-------+------+-------------------+
#Solution : 1
from pyspark.sql.functions import col,count,desc
dfMovies_Ratings2 = dfMovies_Ratings.select("Title","UserID").groupBy("Title").agg(count("UserID").alias("Views")).orderBy(desc("Views")).limit(20)
dfMovies_Ratings2.show()
+--------------------+-----+
| Title|Views|
+--------------------+-----+
|American Beauty (...| 3428|
|Star Wars: Episod...| 2991|
|Star Wars: Episod...| 2990|
|Star Wars: Episod...| 2883|
|Jurassic Park (1993)| 2672|
|Saving Private Ry...| 2653|
|Terminator 2: Jud...| 2649|
| Matrix, The (1999)| 2590|
|Back to the Futur...| 2583|
|Silence of the La...| 2578|
| Men in Black (1997)| 2538|
|Raiders of the Lo...| 2514|
| Fargo (1996)| 2513|
|Sixth Sense, The ...| 2459|
| Braveheart (1995)| 2443|
|Shakespeare in Lo...| 2369|
|Princess Bride, T...| 2318|
|Schindler's List ...| 2304|
|L.A. Confidential...| 2288|
|Groundhog Day (1993)| 2278|
+--------------------+-----+
dfMovies_Ratings2.printSchema()
root
|-- Title: string (nullable = true)
|-- Views: long (nullable = false)
#SparkSQL approach
dfMovies_Ratings.createOrReplaceTempView("MovieRatingsView")
dfdfMovies_Ratings2 =spark.sql("select Title,count(UserID) as Views from MovieRatingsView group by Title order by Views desc limit 20")
dfMovies_Ratings2.show()
dfMovies_Ratings2.show()
+--------------------+-----+
| Title|Views|
+--------------------+-----+
|American Beauty (...| 3428|
|Star Wars: Episod...| 2991|
|Star Wars: Episod...| 2990|
|Star Wars: Episod...| 2883|
|Jurassic Park (1993)| 2672|
|Saving Private Ry...| 2653|
|Terminator 2: Jud...| 2649|
| Matrix, The (1999)| 2590|
|Back to the Futur...| 2583|
|Silence of the La...| 2578|
| Men in Black (1997)| 2538|
|Raiders of the Lo...| 2514|
| Fargo (1996)| 2513|
|Sixth Sense, The ...| 2459|
| Braveheart (1995)| 2443|
|Shakespeare in Lo...| 2369|
|Princess Bride, T...| 2318|
|Schindler's List ...| 2304|
|L.A. Confidential...| 2288|
|Groundhog Day (1993)| 2278|
+--------------------+-----+
#Write the output into hdfs
dfMovies_Ratings2.coalesce(1).write.format("csv").option("header",True).save("hdfs://localhost:9000/SparkFiles/dfMoviesRatings")
hadoop@hadoop:~$ hdfs dfs -cat hdfs://localhost:9000/SparkFiles/dfMoviesRatings/part-00000-3035084e-127a-4227-93f0-1acfc66b97ec-c000.csv
American Beauty (1999),3428
Star Wars: Episode IV - A New Hope (1977),2991
Star Wars: Episode V - The Empire Strikes Back (1980),2990
Star Wars: Episode VI - Return of the Jedi (1983),2883
Jurassic Park (1993),2672
Saving Private Ryan (1998),2653
Terminator 2: Judgment Day (1991),2649
"Matrix, The (1999)",2590
Back to the Future (1985),2583
"Silence of the Lambs, The (1991)",2578
Men in Black (1997),2538
Raiders of the Lost Ark (1981),2514
Fargo (1996),2513
"Sixth Sense, The (1999)",2459
Braveheart (1995),2443
Shakespeare in Love (1998),2369
"Princess Bride, The (1987)",2318
Schindler's List (1993),2304
L.A. Confidential (1997),2288
Groundhog Day (1993),2278
#Solution : 2
#Top 10 Rated Movies
from pyspark.sql.functions import avg
result2 = dfMovies_Ratings.select("Title","Rating","UserID").groupBy("Title").agg(count("UserID").alias("Views"),avg("Rating").alias("AvgRating"))
result3 = result2.filter("Views >= 40").orderBy(desc("AvgRating"))
result3.show(10)
+--------------------+-----+-----------------+
| Title|Views| AvgRating|
+--------------------+-----+-----------------+
| Sanjuro (1962)| 69|4.608695652173913|
|Seven Samurai (Th...| 628|4.560509554140127|
|Shawshank Redempt...| 2227|4.554557700942973|
|Godfather, The (1...| 2223|4.524966261808367|
|Close Shave, A (1...| 657| 4.52054794520548|
|Usual Suspects, T...| 1783|4.517106001121705|
|Schindler's List ...| 2304|4.510416666666667|
|Wrong Trousers, T...| 882|4.507936507936508|
|Sunset Blvd. (a.k...| 470|4.491489361702127|
|Raiders of the Lo...| 2514|4.477724741447892|
+--------------------+-----+-----------------+
from pyspark.sql.functions import avg
result2 = dfMovies_Ratings.select("Title","Rating","UserID").groupBy("Title").agg(count("UserID").alias("Views"),avg("Rating").alias("AvgRating"))
result3 = result2.filter("Views >= 40").orderBy(desc("AvgRating"))
#result3.show(10)
#write the output into hdfs
result3.coalesce(1).write.format("csv").option("header",True).save("hdfs://localhost:9000/SparkFiles/top10Movies/")
hadoop@hadoop:~$ hdfs dfs -cat hdfs://localhost:9000/SparkFiles/dfMoviesRatings/part-00000-3035084e-127a-4227-93f0-1acfc66b97ec-c000.csv
Title,Views
American Beauty (1999),3428
Star Wars: Episode IV - A New Hope (1977),2991
Star Wars: Episode V - The Empire Strikes Back (1980),2990
Star Wars: Episode VI - Return of the Jedi (1983),2883
Jurassic Park (1993),2672
Saving Private Ryan (1998),2653
Terminator 2: Judgment Day (1991),2649
"Matrix, The (1999)",2590