Tuesday, 29 January 2019

Samples which use with and without using Case class

Here is the samples which use with and without using Case class.
input
movies.csv:
------------
hdfs dfs -head /user/movies.csv

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller
11,"American President, The (1995)",Comedy|Drama|Romance
12,Dracula: Dead and Loving It (1995),Comedy|Horror
13,Balto (1995),Adventure|Animation|Children
14,Nixon (1995),Drama
15,Cutthroat Island (1995),Action|Adventure|Romance
16,Casino (1995),Crime|Drama
17,Sense and Sensibility (1995),Drama|Romance
18,Four Rooms (1995),Comedy
19,Ace Ventura: When Nature Calls (1995),Comedy
20,Money Train (1995),Action|Comedy|Crime|Drama|Thriller
21,Get Shorty (1995),Comedy|Crime|Thriller
22,Copycat (1995),Crime|Drama|Horror|Mystery|Thriller


scala> val d1 = sc.textFile("hdfs://localhost:9000/user/movies.csv")
d1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/movies.csv MapPartitionsRDD[43] at textFile at <console>:30

scala> d1.take(10).foreach(println)
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action


scala> val d2 = d1.map (x => x.split(","))
d2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[46] at map at <console>:31

// Here we are not using Case class // without Case class
val d3 = d2.map { x =>
       val movieID = x(0)
       val title = x(1)
       val genre = x(2)
       (movieID,title,genre)
       }

scala> d3.take(5).foreach(println)
(movieId,title,genres)
(1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
(2,Jumanji (1995),Adventure|Children|Fantasy)
(3,Grumpier Old Men (1995),Comedy|Romance)
(4,Waiting to Exhale (1995),Comedy|Drama|Romance)

// Dataframe without proper header
scala> val df1 = d3.toDF()
df1: org.apache.spark.sql.DataFrame = [_1: string, _2: string ... 1 more field]

scala> df1.show(3)
+-------+----------------+--------------------+
|     _1|              _2|                  _3|
+-------+----------------+--------------------+
|movieId|           title|              genres|
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 3 rows

// Dataframe with Header
scala> val df1 = d3.toDF("MovieID","Title","Genre")
df1: org.apache.spark.sql.DataFrame = [MovieID: string, Title: string ... 1 more field]

scala> df1.show(3)
+-------+----------------+--------------------+
|MovieID|           Title|               Genre|
+-------+----------------+--------------------+
|movieId|           title|              genres|
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 3 rows


// Here we are going to make use of Case class

scala> case class Movies(MovieID:String, Title:String, Genre:String)
defined class Movies

scala> val d1 = sc.textFile("hdfs://localhost:9000/user/movies.csv")
d1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/movies.csv MapPartitionsRDD[62] at textFile at <console>:30

scala>  val d2 = d1.map (x => x.split(","))
d2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[63] at map at <console>:31

// Here we are Applying Case Class // with case class
val d3 = d2.map { x =>
       val movieID = x(0)
       val title = x(1)
       val genre = x(2)
       Movies(movieID,title,genre)
       }
 
scala> d3.take(5).foreach(println)
Movies(movieId,title,genres)
Movies(1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
Movies(2,Jumanji (1995),Adventure|Children|Fantasy)
Movies(3,Grumpier Old Men (1995),Comedy|Romance)
Movies(4,Waiting to Exhale (1995),Comedy|Drama|Romance)


scala> val df = d3.toDF()
df: org.apache.spark.sql.DataFrame = [MovieID: string, Title: string ... 1 more field]

// fields automatically came because we used Case class object
scala> df.show(5)
+-------+--------------------+--------------------+
|MovieID|               Title|               Genre|
+-------+--------------------+--------------------+
|movieId|               title|              genres|
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
+-------+--------------------+--------------------+
only showing top 5 rows


scala> case class Movies (MovieID:Int, Title:String, Genre:String)
defined class Movies

scala> val a1 = Movies(1,"Gladiator","Action")
a1: Movies = Movies(1,Gladiator,Action)

scala> val a2 = Movies(2,"Batman","Thriller")
a2: Movies = Movies(2,Batman,Thriller)

scala> val a3 = Movies(3,"Titanic","Romance")
a3: Movies = Movies(3,Titanic,Romance)

scala> val myList = List(a1,a2,a3)
myList: List[Movies] = List(Movies(1,Gladiator,Action), Movies(2,Batman,Thriller), Movies(3,Titanic,Romance))

scala> val myRDD = sc.makeRDD(myList)
myRDD: org.apache.spark.rdd.RDD[Movies] = ParallelCollectionRDD[72] at makeRDD at <console>:32

scala> myRDD.foreach(println)
Movies(1,Gladiator,Action)
Movies(2,Batman,Thriller)
Movies(3,Titanic,Romance)

scala> val df = myRDD.toDF() // with Case class
df: org.apache.spark.sql.DataFrame = [MovieID: int, Title: string ... 1 more field]

scala> df.show()
+-------+---------+--------+
|MovieID|    Title|   Genre|
+-------+---------+--------+
|      1|Gladiator|  Action|
|      2|   Batman|Thriller|
|      3|  Titanic| Romance|
+-------+---------+--------+


scala> df.printSchema
root
 |-- MovieID: integer (nullable = false)
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)


// Here we applied schema to an RDD which doesn't have schema using case class

[RDD without schema] + [case class] => dataframe with Schema
// We provide schema using case class



// here we have an Array of Tuples
scala> val rdd1 = sc.makeRDD(Array( (1,2),(2,3),(3,4),(4,5),(5,6),(6,7) ))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[78] at makeRDD at <console>:30

scala> rdd1.foreach(println)
(1,2)
(2,3)
(3,4)
(4,5)
(5,6)
(6,7)

// Creating a dataframe without applying our own custom schema
scala> val df = rdd1.toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: int]

// here the schema is auto generated [without Case class]
scala> df.printSchema
root
 |-- _1: integer (nullable = false)
 |-- _2: integer (nullable = false)


scala> df.show()
+---+---+
| _1| _2|
+---+---+
|  1|  2|
|  2|  3|
|  3|  4|
|  4|  5|
|  5|  6|
|  6|  7|
+---+---+

scala> val rdd1 = sc.makeRDD(Array( (1,2),(2,3),(3,4),(4,5),(5,6),(6,7) ))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[83] at makeRDD at <console>:30

scala> case class ourSchema(id1:Int, id2:Int)
defined class ourSchema


// Here schema applied with the help of case class object
scala> val rdd2 = rdd1.map (x => ourSchema(x._1, x._2))
rdd2: org.apache.spark.rdd.RDD[ourSchema] = MapPartitionsRDD[84] at map at <console>:33

scala> rdd2.foreach(println)
ourSchema(1,2)
ourSchema(2,3)
ourSchema(3,4)
ourSchema(4,5)
ourSchema(5,6)
ourSchema(6,7)

scala> val df = rdd2.toDF()
df: org.apache.spark.sql.DataFrame = [id1: int, id2: int]

scala> df.show()
+---+---+
|id1|id2|
+---+---+
|  1|  2|
|  2|  3|
|  3|  4|
|  4|  5|
|  5|  6|
|  6|  7|
+---+---+


scala> val df1 = df.toDF("Left","Right")
df1: org.apache.spark.sql.DataFrame = [Left: int, Right: int]

scala> df1.show()
+----+-----+
|Left|Right|
+----+-----+
|   1|    2|
|   2|    3|
|   3|    4|
|   4|    5|
|   5|    6|
|   6|    7|
+----+-----+

scala> val df2 = df1.select("Left","Right")
df2: org.apache.spark.sql.DataFrame = [Left: int, Right: int]

scala> df2.show
+----+-----+
|Left|Right|
+----+-----+
|   1|    2|
|   2|    3|
|   3|    4|
|   4|    5|
|   5|    6|
|   6|    7|
+----+-----+

// Here headers are wrong
scala> val df2 = df1.select(col("Left")+100,col("Right")+10)
df2: org.apache.spark.sql.DataFrame = [(Left + 100): int, (Right + 10): int]

scala> df2.show
+------------+------------+
|(Left + 100)|(Right + 10)|
+------------+------------+
|         101|          12|
|         102|          13|
|         103|          14|
|         104|          15|
|         105|          16|
|         106|          17|
+------------+------------+

// Here we properly managed headers using alias names
scala>  val df2 = df1.select(col("Left")+100 as "Left",col("Right")+10 as "Right")
df2: org.apache.spark.sql.DataFrame = [Left: int, Right: int]

scala> df2.show
+----+-----+
|Left|Right|
+----+-----+
| 101|   12|
| 102|   13|
| 103|   14|
| 104|   15|
| 105|   16|
| 106|   17|
+----+-----+

scala> val df3 = df2.select("Left","Right").groupBy("Left").agg(count("*"))
df3: org.apache.spark.sql.DataFrame = [Left: int, count(1): bigint]

scala> df3.show
+----+--------+
|Left|count(1)|
+----+--------+
| 101|       1|
| 103|       1|
| 102|       1|
| 105|       1|
| 106|       1|
| 104|       1|
+----+--------+


scala> val df3 = df2.select("Left","Right").groupBy("Right").agg(count("*"))
df3: org.apache.spark.sql.DataFrame = [Right: int, count(1): bigint]

scala> df3.show
+-----+--------+
|Right|count(1)|
+-----+--------+
|   12|       1|
|   13|       1|
|   16|       1|
|   15|       1|
|   17|       1|
|   14|       1|
+-----+--------+

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...