Tuesday, 26 May 2020

Movies, Ratings, Users Data analysis using Pyspark using namedTuple

import collections 
Movie = collections.namedtuple('Movie',['MovieID','Title','Genres']) 

def parseMovie(_row):
fields = _row.split("::")
movieid = (int) (fields[0])
title = fields[1]
genres = fields[2]
M = Movie(movieid,title,genres)
return M

#print(parseMovie("1::Toy Story (1995)::Animation|Children's|Comedy"))
movies_r1 = spark.sparkContext.textFile("E:\\vow\\ml-1m\\movies.dat")
movies_r2 = movies_r1.map(lambda x:(str)(x)).map(parseMovie)
movies_r2.take(2)

[Movie(MovieID=1, Title='Toy Story (1995)', Genres="Animation|Children's|Comedy"),
 Movie(MovieID=2, Title='Jumanji (1995)', Genres="Adventure|Children's|Fantasy")]
 
 


import collections 
from datetime import datetime

Rating = collections.namedtuple('Rating',['UserID','MovieID','Rating','Timestamp']) 

def parseRatingRecord(_row):
fields = _row.split("::")
userid = (int) (fields[0])
movieid = (int) (fields[1])
rating = (int) (fields[2])
_timestamp = datetime.fromtimestamp((int) (fields[3]))
_rating = Rating(userid,movieid,rating,_timestamp)
return _rating

#print(parseRatingRecord("1::1193::5::978300760"))

ratings_r1 = spark.sparkContext.textFile("E:\\vow\\ml-1m\\ratings.dat")
ratings_r2 = ratings_r1.map(lambda x:(str)(x)).map(parseRatingRecord)
ratings_r2.take(2)


[Rating(UserID=1, MovieID=1193, Rating=5, Timestamp=datetime.datetime(2001, 1, 1, 3, 42, 40)),
 Rating(UserID=1, MovieID=661, Rating=3, Timestamp=datetime.datetime(2001, 1, 1, 4, 5, 9))]
 
 
 
import collections
User = collections.namedtuple("Users",["UserID","Gender","Age","Occupation","ZipCode"])

def parseUserRecord(_row):
fields = _row.split("::")
userid = (int)(fields[0])
gender= fields[1]
age = (int) (fields[2])
occupation = (int) (fields[3])
zipcode = (int) (fields[4])
_user = User(userid,gender,age,occupation,zipcode) 
return _user

#print(parseUserRecord("3::M::25::15::55117"))            

users_r1 = spark.sparkContext.textFile("E:\\vow\\ml-1m\\users.dat")
userss_r2 = users_r1.map(lambda x:(str)(x)).map(parseUserRecord)
userss_r2.take(2)
                        
[Users(UserID=1, Gender='F', Age=1, Occupation=10, ZipCode=48067),
 Users(UserID=2, Gender='M', Age=56, Occupation=16, ZipCode=70072)]
                        

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...