Monday, 28 January 2019

Experimenting with DataFrame, DataSet in Spark with Scala

Input
employee.json:
--------------
{"name":"John","age":28}
{"name":"Andrew","age":28}
{"name":"Clarke","age":22}
{"name":"Kevin","age":42}
{"name":"Richard","age":51}

scala> import spark.implicits._
import spark.implicits._

scala> val df = spark.read.json("/home/hadoop/Desktop/employee.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]             

scala> df.show()
+---+-------+
|age|   name|
+---+-------+
| 28|   John|
| 28| Andrew|
| 22| Clarke|
| 42|  Kevin|
| 51|Richard|
+---+-------+

scala> df.select("name").show()
+-------+
|   name|
+-------+
|   John|
| Andrew|
| Clarke|
|  Kevin|
|Richard|
+-------+

scala> df.where("age > 30").show()
+---+-------+
|age|   name|
+---+-------+
| 42|  Kevin|
| 51|Richard|
+---+-------+


scala> df.filter($"age" > 30).show()
+---+-------+
|age|   name|
+---+-------+
| 42|  Kevin|
| 51|Richard|
+---+-------+

scala> val sqlDF = df.createOrReplaceTempView("employee")

scala> sqlDF.show()
+---+-------+
|age|   name|
+---+-------+
| 28|   John|
| 28| Andrew|
| 22| Clarke|
| 42|  Kevin|
| 51|Richard|
+---+-------+

scala> val sqlDF = spark.sql("SELECT * FROM employee where age > 40")

scala> sqlDF.show()

+---+-------+
|age|   name|
+---+-------+
| 42|  Kevin|
| 51|Richard|
+---+-------+


scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)


scala> df.select("name").show()
+-------+
|   name|
+-------+
|   John|
| Andrew|
| Clarke|
|  Kevin|
|Richard|
+-------+


scala> df.select($"name",$"age"+2).show()
+-------+---------+
|   name|(age + 2)|
+-------+---------+
|   John|       30|
| Andrew|       30|
| Clarke|       24|
|  Kevin|       44|
|Richard|       53|
+-------+---------+


scala> df.filter($"age" > 30).show()
+---+-------+
|age|   name|
+---+-------+
| 42|  Kevin|
| 51|Richard|
+---+-------+


scala> df.groupBy("age").count().show()
+---+-----+
|age|count|
+---+-----+
| 22|    1|
| 51|    1|
| 28|    2|
| 42|    1|
+---+-----+


scala> case class Employee(name:String, age:Long)
defined class Employee

scala> val ds = Seq(Employee("Andrew",55), Employee("Praveen",38), Employee("Saro",60)).toDS()
ds: org.apache.spark.sql.Dataset[Employee] = [name: string, age: bigint]

scala> ds.show();
+-------+---+
|   name|age|
+-------+---+
| Andrew| 55|
|Praveen| 38|
|   Saro| 60|
+-------+---+




val path = "/home/hadoop/Desktop/employee.json"
val employeeDS = spark.read.json(path).as[Employee]
employeeDS.show()

scala> val path = "/home/hadoop/Desktop/employee.json"
path: String = /home/hadoop/Desktop/employee.json

scala> val employeeDS = spark.read.json(path).as[Employee]
employeeDS: org.apache.spark.sql.Dataset[Employee] = [age: bigint, name: string]

scala> employeeDS.show()
+---+-------+
|age|   name|
+---+-------+
| 28|   John|
| 28| Andrew|
| 22| Clarke|
| 42|  Kevin|
| 51|Richard|
+---+-------+

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...