Thursday, 31 January 2019

Case Class and StructType, StructField Examples in Spark with Scala

Input


people.txt:
------------
Michael, 29
Andy, 30
Justin, 19

people.json:
-----------
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}​


hadoop@hadoop:~/Desktop/vow$ hdfs dfs -copyFromLocal people.* /user/


hadoop@hadoop:~/Desktop/vow$ hdfs dfs -ls /user/people*.*
-rw-r--r--   1 hadoop supergroup         76 2019-02-01 11:43 /user/people.json
-rw-r--r--   1 hadoop supergroup         32 2019-02-01 11:43 /user/people.txt

scala> val df = spark.read.json("hdfs://localhost:9000/user/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

-- OR --

scala> val df = spark.read.format("json").load("hdfs://localhost:9000/user/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)


scala> df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
             

scala> df.select("age","name").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


scala> df.select(col("age"),col("name")).show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


scala> df.select(df("age"),df("name")).show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+




// Add 1 to Age
scala> df.select(df("name"),df("age")+1).show
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+


scala> df.select($"name",$"age"+1).show
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+


scala> df.select(col("name"),col("age")+1).show
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+


scala> df.select($"age" > 21).show
+----------+
|(age > 21)|
+----------+
|      null|
|      true|
|     false|
+----------+

scala> df.select(col("age") > 21).show
+----------+
|(age > 21)|
+----------+
|      null|
|      true|
|     false|
+----------+

//Filter operations


scala> df.filter($"age" > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+


scala> df.filter(col("age") > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+


scala> df.filter(df("age") > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



// Group By
scala> df.groupBy("age").count().show
+----+-----+                                                                   
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+




import org.apache.spark.sql._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._

scala> val peopleRDD = spark.sparkContext.textFile("hdfs://localhost:9000/user/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/people.txt MapPartitionsRDD[80] at textFile at <console>:33

scala> peopleRDD.collect.foreach(println)
Michael, 29
Andy, 30
Justin, 19


// schema using StructType and StructField defination
scala> val schema = StructType(StructField("Name",StringType)::StructField("Age",IntegerType)::Nil )
schema: org.apache.spark.sql.types.StructType = StructType(StructField(Name,StringType,true), StructField(Age,IntegerType,true))

// StructType Not applied here
val rowRDD  = peopleRDD.map (x => {
       val fields = x.split(",")
       val Name = fields(0).trim()
       val Age  = fields(1).trim()
   (Name,Age)
       })

scala> rowRDD.collect.foreach(println)
(Michael,29)
(Andy,30)
(Justin,19)

val rowRDD  = peopleRDD.map (x => {
       val fields = x.split(",")
       val Name = fields(0).trim()
       val Age  = fields(1).trim().toInt
   Row(Name,Age)
       })

scala> val rowRDD  = peopleRDD.map (x => {
     |        val fields = x.split(",")
     |        val Name = fields(0).trim()
     |        val Age  = fields(1).trim().toInt
     |      Row(Name,Age)
     |        })
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[84] at map at <console>:35

scala> rowRDD.collect.foreach(println)
[Michael,29]
[Andy,30]
[Justin,19]


// Creating Dataframe with schema (StructType and StructField here)
scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF: org.apache.spark.sql.DataFrame = [Name: string, Age: int]

scala> peopleDF.createOrReplaceTempView("people")
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

scala> val results = spark.sql("SELECT name FROM people")
results: org.apache.spark.sql.DataFrame = [name: string]

scala> results.map(attributes => "Name: " + attributes(0)).show()
+-------------+
|        value|
+-------------+
|Name: Michael|
|   Name: Andy|
| Name: Justin|
+-------------+



 scala> val ds = Seq(1,2,3).toDS()
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> ds.show
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
+-----+


scala> ds.printSchema
root
 |-- value: integer (nullable = false)


scala> case class Person(name:String, age:Long)
defined class Person


scala> val ds = Seq(Person("Sankar",42),Person("Sudha",40),Person("Vijay",18),Person("Aish",13)).toDS
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> ds.printSchema
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = false)


scala> ds.show
+------+---+
|  name|age|
+------+---+
|Sankar| 42|
| Sudha| 40|
| Vijay| 18|
|  Aish| 13|
+------+---+

scala> val path = "hdfs://localhost:9000/user/people.json"
path: String = hdfs://localhost:9000/user/people.json


// Case Class definition
scala> val peopleDS = spark.read.format("json").load(path).as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

// as [CaseClassName]
scala> val peopleDS = spark.read.format("json").load("hdfs://localhost:9000/user/people.json").as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

scala> peopleDS.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)


scala> peopleDS.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

scala> val peopleFiltered = peopleDS.filter("age is not null")
peopleFiltered: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

scala> peopleFiltered.show
+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...