Thursday, 31 January 2019

RDD, Dataframe,DataSets in Spark with Scala

Converting RDD to DataFrame to DataSet (interoperable)

RDD -> Dataframe ====> .toDF
RDD -> DataSet   ====> .toDS

Dataframe -> DataSet  ====> df.as[Type]
Dataframe -> RDD  ====> df.rdd

DataSet -> RDD  ====> ds.rdd
DataSet -> Dataframe  ====> ds.toDF


//RDDs in Spark
scala> val r1 = sc.textFile("hdfs://localhost:9000/user/movies.csv")
r1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/movies.csv MapPartitionsRDD[6] at textFile at <console>:24

scala> r1.take(5).foreach(println)
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance

val r2 = r1.map (x => x.split(","))
r2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[22] at map at <console>:25


scala> val r3 = r2.map (x => {
       val sno = x(0)
       val moviename = x(1)
       val genre = x(2)
       (sno,moviename,genre)
       }
       )
 
scala> r3.take(3).foreach(println)
(movieId,title,genres)
(1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
(2,Jumanji (1995),Adventure|Children|Fantasy)


// RDD to Dataframe
scala> val dfX = r3.toDF
dfX: org.apache.spark.sql.DataFrame = [_1: string, _2: string ... 1 more field]

scala> dfX.printSchema
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)


scala> dfX.show(3)
+-------+----------------+--------------------+
|     _1|              _2|                  _3|
+-------+----------------+--------------------+
|movieId|           title|              genres|
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 3 rows


// Making DataSet from RDD
scala> val dsX = r3.toDS
dsX: org.apache.spark.sql.Dataset[(String, String, String)] = [_1: string, _2: string ... 1 more field]

scala> dsX.printSchema
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)


scala> dsX.show(3)
+-------+----------------+--------------------+
|     _1|              _2|                  _3|
+-------+----------------+--------------------+
|movieId|           title|              genres|
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 3 rows

// Making RDD from DataFrame
scala> val rddX = dfX.rdd
rddX: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[35] at rdd at <console>:25

scala> rddX.take(3).foreach(println)
[movieId,title,genres]
[1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy]
[2,Jumanji (1995),Adventure|Children|Fantasy]

//Making RDD from DataSet
scala> val rddXX = dsX.rdd
rddXX: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[37] at rdd at <console>:25

scala> rddXX.take(3).foreach(println)
(movieId,title,genres)
(1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
(2,Jumanji (1995),Adventure|Children|Fantasy)









// dataframe Json input
scala> val df = spark.read.format("json").load("hdfs://localhost:9000/user/employee.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show(3)
+---+------+
|age|  name|
+---+------+
| 28|  John|
| 28|Andrew|
| 22|Clarke|
+---+------+
only showing top 3 rows

scala> case class Person(name:String,age:Long)
defined class Person

// Making Dataset from DataFrame which use case class named Person
scala> val dsPerson = df.as[Person]
dsPerson: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

scala> dsPerson.show(3)
+---+------+
|age|  name|
+---+------+
| 28|  John|
| 28|Andrew|
| 22|Clarke|
+---+------+
only showing top 3 rows


scala> dsPerson.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

// Converting dataSet to Dataframe
scala> val tmpdf = dsPerson.toDF
tmpdf: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> tmpdf.show(3)
+---+------+
|age|  name|
+---+------+
| 28|  John|
| 28|Andrew|
| 22|Clarke|
+---+------+
only showing top 3 rows


scala> tmpdf.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)






// dataframe textfile as input. so, every line will be a row
scala> val df1 = spark.read.format("text").load("hdfs://localhost:9000/user/employee.txt")
df1: org.apache.spark.sql.DataFrame = [value: string]

scala> df1.show(3)
+---------+
|    value|
+---------+
|  John,28|
|Andrew,36|
|Clarke,22|
+---------+
only showing top 3 rows

// Converting dataframe into DataSet
scala> val ds1 = df1.as[String]
ds1: org.apache.spark.sql.Dataset[String] = [value: string]


scala> ds1.show(3)
+---------+
|    value|
+---------+
|  John,28|
|Andrew,36|
|Clarke,22|
+---------+

scala> val ds2 = ds1.map (x => x.split(","))
ds2: org.apache.spark.sql.Dataset[Array[String]] = [value: array<string>]

// Array of Strings
scala> ds2.show(5)
+-------------+
|        value|
+-------------+
|   [John, 28]|
| [Andrew, 36]|
| [Clarke, 22]|
|  [Kevin, 42]|
|[Richard, 51]|
+-------------+

scala> case class Person(name:String,age:Long)
defined class Person

scala> val ds3 = ds2.map( x => {
       val name = x(0)
       val age = x(1)
       Person(age,name)
       })
ds3: org.apache.spark.sql.Dataset[(String, String)] = [_1: string, _2: string]

// DataSet of String, String but schema is still missing
scala> ds3.show(5)
+-------+---+
|     _1| _2|
+-------+---+
|   John| 28|
| Andrew| 36|
| Clarke| 22|
|  Kevin| 42|
|Richard| 51|
+-------+---+

scala> ds3.printSchema
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)



hadoop@hadoop:~$ cat > samplefile.txt
spark is a big data technology
hadoop is a big data technology
spark and hadoop are big data technologies
^C
hadoop@hadoop:~$ hdfs dfs -copyFromLocal samplefile.txt /user/

scala>  val r1 = sc.textFile("hdfs://localhost:9000/user/samplefile.txt")
r1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/samplefile.txt MapPartitionsRDD[8] at textFile at <console>:24

scala> r1.collect.foreach(println)
spark is a big data technology
hadoop is a big data technology
spark and hadoop are big data technologies


scala> val d1 = spark.read.format("text").load("hdfs://localhost:9000/user/samplefile.txt")
d1: org.apache.spark.sql.DataFrame = [value: string]


scala> d1.show
+--------------------+
|               value|
+--------------------+
|spark is a big da...|
|hadoop is a big d...|
|spark and hadoop ...|
+--------------------+


No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...