Here is the samples which use with and without using Case class.
input
movies.csv:
------------
hdfs dfs -head /user/movies.csv
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller
11,"American President, The (1995)",Comedy|Drama|Romance
12,Dracula: Dead and Loving It (1995),Comedy|Horror
13,Balto (1995),Adventure|Animation|Children
14,Nixon (1995),Drama
15,Cutthroat Island (1995),Action|Adventure|Romance
16,Casino (1995),Crime|Drama
17,Sense and Sensibility (1995),Drama|Romance
18,Four Rooms (1995),Comedy
19,Ace Ventura: When Nature Calls (1995),Comedy
20,Money Train (1995),Action|Comedy|Crime|Drama|Thriller
21,Get Shorty (1995),Comedy|Crime|Thriller
22,Copycat (1995),Crime|Drama|Horror|Mystery|Thriller
scala> val d1 = sc.textFile("hdfs://localhost:9000/user/movies.csv")
d1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/movies.csv MapPartitionsRDD[43] at textFile at <console>:30
scala> d1.take(10).foreach(println)
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
scala> val d2 = d1.map (x => x.split(","))
d2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[46] at map at <console>:31
// Here we are not using Case class // without Case class
val d3 = d2.map { x =>
val movieID = x(0)
val title = x(1)
val genre = x(2)
(movieID,title,genre)
}
scala> d3.take(5).foreach(println)
(movieId,title,genres)
(1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
(2,Jumanji (1995),Adventure|Children|Fantasy)
(3,Grumpier Old Men (1995),Comedy|Romance)
(4,Waiting to Exhale (1995),Comedy|Drama|Romance)
// Dataframe without proper header
scala> val df1 = d3.toDF()
df1: org.apache.spark.sql.DataFrame = [_1: string, _2: string ... 1 more field]
scala> df1.show(3)
+-------+----------------+--------------------+
| _1| _2| _3|
+-------+----------------+--------------------+
|movieId| title| genres|
| 1|Toy Story (1995)|Adventure|Animati...|
| 2| Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 3 rows
// Dataframe with Header
scala> val df1 = d3.toDF("MovieID","Title","Genre")
df1: org.apache.spark.sql.DataFrame = [MovieID: string, Title: string ... 1 more field]
scala> df1.show(3)
+-------+----------------+--------------------+
|MovieID| Title| Genre|
+-------+----------------+--------------------+
|movieId| title| genres|
| 1|Toy Story (1995)|Adventure|Animati...|
| 2| Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 3 rows
// Here we are going to make use of Case class
scala> case class Movies(MovieID:String, Title:String, Genre:String)
defined class Movies
scala> val d1 = sc.textFile("hdfs://localhost:9000/user/movies.csv")
d1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/movies.csv MapPartitionsRDD[62] at textFile at <console>:30
scala> val d2 = d1.map (x => x.split(","))
d2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[63] at map at <console>:31
// Here we are Applying Case Class // with case class
val d3 = d2.map { x =>
val movieID = x(0)
val title = x(1)
val genre = x(2)
Movies(movieID,title,genre)
}
scala> d3.take(5).foreach(println)
Movies(movieId,title,genres)
Movies(1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
Movies(2,Jumanji (1995),Adventure|Children|Fantasy)
Movies(3,Grumpier Old Men (1995),Comedy|Romance)
Movies(4,Waiting to Exhale (1995),Comedy|Drama|Romance)
scala> val df = d3.toDF()
df: org.apache.spark.sql.DataFrame = [MovieID: string, Title: string ... 1 more field]
// fields automatically came because we used Case class object
scala> df.show(5)
+-------+--------------------+--------------------+
|MovieID| Title| Genre|
+-------+--------------------+--------------------+
|movieId| title| genres|
| 1| Toy Story (1995)|Adventure|Animati...|
| 2| Jumanji (1995)|Adventure|Childre...|
| 3|Grumpier Old Men ...| Comedy|Romance|
| 4|Waiting to Exhale...|Comedy|Drama|Romance|
+-------+--------------------+--------------------+
only showing top 5 rows
scala> case class Movies (MovieID:Int, Title:String, Genre:String)
defined class Movies
scala> val a1 = Movies(1,"Gladiator","Action")
a1: Movies = Movies(1,Gladiator,Action)
scala> val a2 = Movies(2,"Batman","Thriller")
a2: Movies = Movies(2,Batman,Thriller)
scala> val a3 = Movies(3,"Titanic","Romance")
a3: Movies = Movies(3,Titanic,Romance)
scala> val myList = List(a1,a2,a3)
myList: List[Movies] = List(Movies(1,Gladiator,Action), Movies(2,Batman,Thriller), Movies(3,Titanic,Romance))
scala> val myRDD = sc.makeRDD(myList)
myRDD: org.apache.spark.rdd.RDD[Movies] = ParallelCollectionRDD[72] at makeRDD at <console>:32
scala> myRDD.foreach(println)
Movies(1,Gladiator,Action)
Movies(2,Batman,Thriller)
Movies(3,Titanic,Romance)
scala> val df = myRDD.toDF() // with Case class
df: org.apache.spark.sql.DataFrame = [MovieID: int, Title: string ... 1 more field]
scala> df.show()
+-------+---------+--------+
|MovieID| Title| Genre|
+-------+---------+--------+
| 1|Gladiator| Action|
| 2| Batman|Thriller|
| 3| Titanic| Romance|
+-------+---------+--------+
scala> df.printSchema
root
|-- MovieID: integer (nullable = false)
|-- Title: string (nullable = true)
|-- Genre: string (nullable = true)
// Here we applied schema to an RDD which doesn't have schema using case class
[RDD without schema] + [case class] => dataframe with Schema
// We provide schema using case class
// here we have an Array of Tuples
scala> val rdd1 = sc.makeRDD(Array( (1,2),(2,3),(3,4),(4,5),(5,6),(6,7) ))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[78] at makeRDD at <console>:30
scala> rdd1.foreach(println)
(1,2)
(2,3)
(3,4)
(4,5)
(5,6)
(6,7)
// Creating a dataframe without applying our own custom schema
scala> val df = rdd1.toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: int]
// here the schema is auto generated [without Case class]
scala> df.printSchema
root
|-- _1: integer (nullable = false)
|-- _2: integer (nullable = false)
scala> df.show()
+---+---+
| _1| _2|
+---+---+
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
| 5| 6|
| 6| 7|
+---+---+
scala> val rdd1 = sc.makeRDD(Array( (1,2),(2,3),(3,4),(4,5),(5,6),(6,7) ))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[83] at makeRDD at <console>:30
scala> case class ourSchema(id1:Int, id2:Int)
defined class ourSchema
// Here schema applied with the help of case class object
scala> val rdd2 = rdd1.map (x => ourSchema(x._1, x._2))
rdd2: org.apache.spark.rdd.RDD[ourSchema] = MapPartitionsRDD[84] at map at <console>:33
scala> rdd2.foreach(println)
ourSchema(1,2)
ourSchema(2,3)
ourSchema(3,4)
ourSchema(4,5)
ourSchema(5,6)
ourSchema(6,7)
scala> val df = rdd2.toDF()
df: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
scala> df.show()
+---+---+
|id1|id2|
+---+---+
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
| 5| 6|
| 6| 7|
+---+---+
scala> val df1 = df.toDF("Left","Right")
df1: org.apache.spark.sql.DataFrame = [Left: int, Right: int]
scala> df1.show()
+----+-----+
|Left|Right|
+----+-----+
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
| 5| 6|
| 6| 7|
+----+-----+
scala> val df2 = df1.select("Left","Right")
df2: org.apache.spark.sql.DataFrame = [Left: int, Right: int]
scala> df2.show
+----+-----+
|Left|Right|
+----+-----+
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
| 5| 6|
| 6| 7|
+----+-----+
// Here headers are wrong
scala> val df2 = df1.select(col("Left")+100,col("Right")+10)
df2: org.apache.spark.sql.DataFrame = [(Left + 100): int, (Right + 10): int]
scala> df2.show
+------------+------------+
|(Left + 100)|(Right + 10)|
+------------+------------+
| 101| 12|
| 102| 13|
| 103| 14|
| 104| 15|
| 105| 16|
| 106| 17|
+------------+------------+
// Here we properly managed headers using alias names
scala> val df2 = df1.select(col("Left")+100 as "Left",col("Right")+10 as "Right")
df2: org.apache.spark.sql.DataFrame = [Left: int, Right: int]
scala> df2.show
+----+-----+
|Left|Right|
+----+-----+
| 101| 12|
| 102| 13|
| 103| 14|
| 104| 15|
| 105| 16|
| 106| 17|
+----+-----+
scala> val df3 = df2.select("Left","Right").groupBy("Left").agg(count("*"))
df3: org.apache.spark.sql.DataFrame = [Left: int, count(1): bigint]
scala> df3.show
+----+--------+
|Left|count(1)|
+----+--------+
| 101| 1|
| 103| 1|
| 102| 1|
| 105| 1|
| 106| 1|
| 104| 1|
+----+--------+
scala> val df3 = df2.select("Left","Right").groupBy("Right").agg(count("*"))
df3: org.apache.spark.sql.DataFrame = [Right: int, count(1): bigint]
scala> df3.show
+-----+--------+
|Right|count(1)|
+-----+--------+
| 12| 1|
| 13| 1|
| 16| 1|
| 15| 1|
| 17| 1|
| 14| 1|
+-----+--------+
input
movies.csv:
------------
hdfs dfs -head /user/movies.csv
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller
11,"American President, The (1995)",Comedy|Drama|Romance
12,Dracula: Dead and Loving It (1995),Comedy|Horror
13,Balto (1995),Adventure|Animation|Children
14,Nixon (1995),Drama
15,Cutthroat Island (1995),Action|Adventure|Romance
16,Casino (1995),Crime|Drama
17,Sense and Sensibility (1995),Drama|Romance
18,Four Rooms (1995),Comedy
19,Ace Ventura: When Nature Calls (1995),Comedy
20,Money Train (1995),Action|Comedy|Crime|Drama|Thriller
21,Get Shorty (1995),Comedy|Crime|Thriller
22,Copycat (1995),Crime|Drama|Horror|Mystery|Thriller
scala> val d1 = sc.textFile("hdfs://localhost:9000/user/movies.csv")
d1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/movies.csv MapPartitionsRDD[43] at textFile at <console>:30
scala> d1.take(10).foreach(println)
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
scala> val d2 = d1.map (x => x.split(","))
d2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[46] at map at <console>:31
// Here we are not using Case class // without Case class
val d3 = d2.map { x =>
val movieID = x(0)
val title = x(1)
val genre = x(2)
(movieID,title,genre)
}
scala> d3.take(5).foreach(println)
(movieId,title,genres)
(1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
(2,Jumanji (1995),Adventure|Children|Fantasy)
(3,Grumpier Old Men (1995),Comedy|Romance)
(4,Waiting to Exhale (1995),Comedy|Drama|Romance)
// Dataframe without proper header
scala> val df1 = d3.toDF()
df1: org.apache.spark.sql.DataFrame = [_1: string, _2: string ... 1 more field]
scala> df1.show(3)
+-------+----------------+--------------------+
| _1| _2| _3|
+-------+----------------+--------------------+
|movieId| title| genres|
| 1|Toy Story (1995)|Adventure|Animati...|
| 2| Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 3 rows
// Dataframe with Header
scala> val df1 = d3.toDF("MovieID","Title","Genre")
df1: org.apache.spark.sql.DataFrame = [MovieID: string, Title: string ... 1 more field]
scala> df1.show(3)
+-------+----------------+--------------------+
|MovieID| Title| Genre|
+-------+----------------+--------------------+
|movieId| title| genres|
| 1|Toy Story (1995)|Adventure|Animati...|
| 2| Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 3 rows
// Here we are going to make use of Case class
scala> case class Movies(MovieID:String, Title:String, Genre:String)
defined class Movies
scala> val d1 = sc.textFile("hdfs://localhost:9000/user/movies.csv")
d1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/movies.csv MapPartitionsRDD[62] at textFile at <console>:30
scala> val d2 = d1.map (x => x.split(","))
d2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[63] at map at <console>:31
// Here we are Applying Case Class // with case class
val d3 = d2.map { x =>
val movieID = x(0)
val title = x(1)
val genre = x(2)
Movies(movieID,title,genre)
}
scala> d3.take(5).foreach(println)
Movies(movieId,title,genres)
Movies(1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
Movies(2,Jumanji (1995),Adventure|Children|Fantasy)
Movies(3,Grumpier Old Men (1995),Comedy|Romance)
Movies(4,Waiting to Exhale (1995),Comedy|Drama|Romance)
scala> val df = d3.toDF()
df: org.apache.spark.sql.DataFrame = [MovieID: string, Title: string ... 1 more field]
// fields automatically came because we used Case class object
scala> df.show(5)
+-------+--------------------+--------------------+
|MovieID| Title| Genre|
+-------+--------------------+--------------------+
|movieId| title| genres|
| 1| Toy Story (1995)|Adventure|Animati...|
| 2| Jumanji (1995)|Adventure|Childre...|
| 3|Grumpier Old Men ...| Comedy|Romance|
| 4|Waiting to Exhale...|Comedy|Drama|Romance|
+-------+--------------------+--------------------+
only showing top 5 rows
scala> case class Movies (MovieID:Int, Title:String, Genre:String)
defined class Movies
scala> val a1 = Movies(1,"Gladiator","Action")
a1: Movies = Movies(1,Gladiator,Action)
scala> val a2 = Movies(2,"Batman","Thriller")
a2: Movies = Movies(2,Batman,Thriller)
scala> val a3 = Movies(3,"Titanic","Romance")
a3: Movies = Movies(3,Titanic,Romance)
scala> val myList = List(a1,a2,a3)
myList: List[Movies] = List(Movies(1,Gladiator,Action), Movies(2,Batman,Thriller), Movies(3,Titanic,Romance))
scala> val myRDD = sc.makeRDD(myList)
myRDD: org.apache.spark.rdd.RDD[Movies] = ParallelCollectionRDD[72] at makeRDD at <console>:32
scala> myRDD.foreach(println)
Movies(1,Gladiator,Action)
Movies(2,Batman,Thriller)
Movies(3,Titanic,Romance)
scala> val df = myRDD.toDF() // with Case class
df: org.apache.spark.sql.DataFrame = [MovieID: int, Title: string ... 1 more field]
scala> df.show()
+-------+---------+--------+
|MovieID| Title| Genre|
+-------+---------+--------+
| 1|Gladiator| Action|
| 2| Batman|Thriller|
| 3| Titanic| Romance|
+-------+---------+--------+
scala> df.printSchema
root
|-- MovieID: integer (nullable = false)
|-- Title: string (nullable = true)
|-- Genre: string (nullable = true)
// Here we applied schema to an RDD which doesn't have schema using case class
[RDD without schema] + [case class] => dataframe with Schema
// We provide schema using case class
// here we have an Array of Tuples
scala> val rdd1 = sc.makeRDD(Array( (1,2),(2,3),(3,4),(4,5),(5,6),(6,7) ))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[78] at makeRDD at <console>:30
scala> rdd1.foreach(println)
(1,2)
(2,3)
(3,4)
(4,5)
(5,6)
(6,7)
// Creating a dataframe without applying our own custom schema
scala> val df = rdd1.toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: int]
// here the schema is auto generated [without Case class]
scala> df.printSchema
root
|-- _1: integer (nullable = false)
|-- _2: integer (nullable = false)
scala> df.show()
+---+---+
| _1| _2|
+---+---+
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
| 5| 6|
| 6| 7|
+---+---+
scala> val rdd1 = sc.makeRDD(Array( (1,2),(2,3),(3,4),(4,5),(5,6),(6,7) ))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[83] at makeRDD at <console>:30
scala> case class ourSchema(id1:Int, id2:Int)
defined class ourSchema
// Here schema applied with the help of case class object
scala> val rdd2 = rdd1.map (x => ourSchema(x._1, x._2))
rdd2: org.apache.spark.rdd.RDD[ourSchema] = MapPartitionsRDD[84] at map at <console>:33
scala> rdd2.foreach(println)
ourSchema(1,2)
ourSchema(2,3)
ourSchema(3,4)
ourSchema(4,5)
ourSchema(5,6)
ourSchema(6,7)
scala> val df = rdd2.toDF()
df: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
scala> df.show()
+---+---+
|id1|id2|
+---+---+
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
| 5| 6|
| 6| 7|
+---+---+
scala> val df1 = df.toDF("Left","Right")
df1: org.apache.spark.sql.DataFrame = [Left: int, Right: int]
scala> df1.show()
+----+-----+
|Left|Right|
+----+-----+
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
| 5| 6|
| 6| 7|
+----+-----+
scala> val df2 = df1.select("Left","Right")
df2: org.apache.spark.sql.DataFrame = [Left: int, Right: int]
scala> df2.show
+----+-----+
|Left|Right|
+----+-----+
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
| 5| 6|
| 6| 7|
+----+-----+
// Here headers are wrong
scala> val df2 = df1.select(col("Left")+100,col("Right")+10)
df2: org.apache.spark.sql.DataFrame = [(Left + 100): int, (Right + 10): int]
scala> df2.show
+------------+------------+
|(Left + 100)|(Right + 10)|
+------------+------------+
| 101| 12|
| 102| 13|
| 103| 14|
| 104| 15|
| 105| 16|
| 106| 17|
+------------+------------+
// Here we properly managed headers using alias names
scala> val df2 = df1.select(col("Left")+100 as "Left",col("Right")+10 as "Right")
df2: org.apache.spark.sql.DataFrame = [Left: int, Right: int]
scala> df2.show
+----+-----+
|Left|Right|
+----+-----+
| 101| 12|
| 102| 13|
| 103| 14|
| 104| 15|
| 105| 16|
| 106| 17|
+----+-----+
scala> val df3 = df2.select("Left","Right").groupBy("Left").agg(count("*"))
df3: org.apache.spark.sql.DataFrame = [Left: int, count(1): bigint]
scala> df3.show
+----+--------+
|Left|count(1)|
+----+--------+
| 101| 1|
| 103| 1|
| 102| 1|
| 105| 1|
| 106| 1|
| 104| 1|
+----+--------+
scala> val df3 = df2.select("Left","Right").groupBy("Right").agg(count("*"))
df3: org.apache.spark.sql.DataFrame = [Right: int, count(1): bigint]
scala> df3.show
+-----+--------+
|Right|count(1)|
+-----+--------+
| 12| 1|
| 13| 1|
| 16| 1|
| 15| 1|
| 17| 1|
| 14| 1|
+-----+--------+