Wednesday, 16 January 2019

Most Popular Movie Program - Using Scala, Spark Core

Top Rated Movies:
-------------------
input data:
userID, MovieID, Rating, TimeStamp
---------------------------------
196 242 3 881250949
186 302 3 891717742
22 377 1 878887116
244 51 2 880606923
166 346 1 886397596


scala> val input = sc.textFile("E:\\POCs\\ml-100k\\u.data")
input: org.apache.spark.rdd.RDD[String] = E:\POCs\ml-100k\u.data MapPartitionsRDD[3] at textFile at <console>:24

scala> val movies = input.map( movie => {
     | val fields = movie.split("\t")
     | val movieID = fields(1).toInt
     | (movieID,1)}
     | )
movies: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at map at <console>:25

scala> movies.take(5).foreach(println)
(242,1)
(302,1)
(377,1)
(51,1)
(346,1)




scala> val moviesCount = movies.reduceByKey( (x,y) => x+y)
moviesCount: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[7] at reduceByKey at <console>:25


scala> moviesCount.take(5).foreach(println)
(454,16)
(1084,21)
(1410,4)
(772,49)
(752,39)

scala> val flipped = moviesCount.map (x => (x._2, x._1))
scala> flipped.take(5).foreach(println)
(16,454)
(21,1084)
(4,1410)
(49,772)
(39,752)

scala> val sortedMovies = flipped.sortByKey()
sortedMovies: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[14] at sortByKey at <console>:25


scala> sortedMovies.collect
res9: Array[(Int, Int)] = Array((1,1494), (1,1414), (1,1596), (1,1630), (1,1632), (1,1310), (1,1670), (1,1320), (1,1678), (1,1674), (1,1566), (1,1682)
, (1,1680), (1,1548), (1,1624), (1,1498), (1,1492), (1,1366), (1,1606), (1,1352), (1,1130), (1,1638), (1,1660), (1,1562), (1,1576), (1,1626), (1,1618)
, (1,1580), (1,1640), (1,1616), (1,1236), (1,1482), (1,1570), (1,1654), (1,1486), (1,1636), (1,814), (1,1340), (1,1574), (1,1564), (1,1452), (1,830),
(1,1546), (1,1666), (1,1364), (1,1476), (1,1668), (1,1614), (1,1348), (1,1520), (1,1510), (1,1676), (1,1156), (1,852), (1,1526), (1,1650), (1,1572), (
1,1634), (1,1122), (1,1582), (1,1584), (1,1568), (1,1458), (1,1604), (1,1586), (1,1648), (1,1536), (1,1460), (1,1661), (1,1325), (1,1677), (1,599), (1
,1363), (1,1505), (1,1635), (1,1567), (1,1641),...


scala> sortedMovies.take(20).foreach(println)
(1,1494)
(1,1414)
(1,1596)
(1,1630)
(1,1632)
(1,1310)
(1,1670)
(1,1320)
(1,1678)
(1,1674)
(1,1566)
(1,1682)
(1,1680)
(1,1548)
(1,1624)
(1,1498)
(1,1492)
(1,1366)
(1,1606)
(1,1352)
....
...
..
.
(687,69)
(1001,17)
(47,133)
(1557,1)
(1093,23)
(485,125)
(989,32)
(1103,19)
(207,66)
(521,120)
(739,164)
(233,124)
(1105,18)
(1249,12)
(991,25)
(537,30)
(377,13)
(155,98)
(315,160)

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...