Input
people.txt:
------------
Michael, 29
Andy, 30
Justin, 19
people.json:
-----------
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
hadoop@hadoop:~/Desktop/vow$ hdfs dfs -copyFromLocal people.* /user/
hadoop@hadoop:~/Desktop/vow$ hdfs dfs -ls /user/people*.*
-rw-r--r-- 1 hadoop supergroup 76 2019-02-01 11:43 /user/people.json
-rw-r--r-- 1 hadoop supergroup 32 2019-02-01 11:43 /user/people.txt
scala> val df = spark.read.json("hdfs://localhost:9000/user/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
-- OR --
scala> val df = spark.read.format("json").load("hdfs://localhost:9000/user/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> df.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.select("age","name").show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.select(col("age"),col("name")).show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.select(df("age"),df("name")).show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
// Add 1 to Age
scala> df.select(df("name"),df("age")+1).show
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
scala> df.select($"name",$"age"+1).show
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
scala> df.select(col("name"),col("age")+1).show
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
scala> df.select($"age" > 21).show
+----------+
|(age > 21)|
+----------+
| null|
| true|
| false|
+----------+
scala> df.select(col("age") > 21).show
+----------+
|(age > 21)|
+----------+
| null|
| true|
| false|
+----------+
//Filter operations
scala> df.filter($"age" > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala> df.filter(col("age") > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala> df.filter(df("age") > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
// Group By
scala> df.groupBy("age").count().show
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
import org.apache.spark.sql._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._
scala> val peopleRDD = spark.sparkContext.textFile("hdfs://localhost:9000/user/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/people.txt MapPartitionsRDD[80] at textFile at <console>:33
scala> peopleRDD.collect.foreach(println)
Michael, 29
Andy, 30
Justin, 19
// schema using StructType and StructField defination
scala> val schema = StructType(StructField("Name",StringType)::StructField("Age",IntegerType)::Nil )
schema: org.apache.spark.sql.types.StructType = StructType(StructField(Name,StringType,true), StructField(Age,IntegerType,true))
// StructType Not applied here
val rowRDD = peopleRDD.map (x => {
val fields = x.split(",")
val Name = fields(0).trim()
val Age = fields(1).trim()
(Name,Age)
})
scala> rowRDD.collect.foreach(println)
(Michael,29)
(Andy,30)
(Justin,19)
val rowRDD = peopleRDD.map (x => {
val fields = x.split(",")
val Name = fields(0).trim()
val Age = fields(1).trim().toInt
Row(Name,Age)
})
scala> val rowRDD = peopleRDD.map (x => {
| val fields = x.split(",")
| val Name = fields(0).trim()
| val Age = fields(1).trim().toInt
| Row(Name,Age)
| })
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[84] at map at <console>:35
scala> rowRDD.collect.foreach(println)
[Michael,29]
[Andy,30]
[Justin,19]
// Creating Dataframe with schema (StructType and StructField here)
scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF: org.apache.spark.sql.DataFrame = [Name: string, Age: int]
scala> peopleDF.createOrReplaceTempView("people")
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
scala> val results = spark.sql("SELECT name FROM people")
results: org.apache.spark.sql.DataFrame = [name: string]
scala> results.map(attributes => "Name: " + attributes(0)).show()
+-------------+
| value|
+-------------+
|Name: Michael|
| Name: Andy|
| Name: Justin|
+-------------+
scala> val ds = Seq(1,2,3).toDS()
ds: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds.show
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
+-----+
scala> ds.printSchema
root
|-- value: integer (nullable = false)
scala> case class Person(name:String, age:Long)
defined class Person
scala> val ds = Seq(Person("Sankar",42),Person("Sudha",40),Person("Vijay",18),Person("Aish",13)).toDS
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> ds.printSchema
root
|-- name: string (nullable = true)
|-- age: long (nullable = false)
scala> ds.show
+------+---+
| name|age|
+------+---+
|Sankar| 42|
| Sudha| 40|
| Vijay| 18|
| Aish| 13|
+------+---+
scala> val path = "hdfs://localhost:9000/user/people.json"
path: String = hdfs://localhost:9000/user/people.json
// Case Class definition
scala> val peopleDS = spark.read.format("json").load(path).as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
// as [CaseClassName]
scala> val peopleDS = spark.read.format("json").load("hdfs://localhost:9000/user/people.json").as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
scala> peopleDS.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> peopleDS.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> val peopleFiltered = peopleDS.filter("age is not null")
peopleFiltered: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
scala> peopleFiltered.show
+---+------+
|age| name|
+---+------+
| 30| Andy|
| 19|Justin|
+---+------+
people.txt:
------------
Michael, 29
Andy, 30
Justin, 19
people.json:
-----------
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
hadoop@hadoop:~/Desktop/vow$ hdfs dfs -copyFromLocal people.* /user/
hadoop@hadoop:~/Desktop/vow$ hdfs dfs -ls /user/people*.*
-rw-r--r-- 1 hadoop supergroup 76 2019-02-01 11:43 /user/people.json
-rw-r--r-- 1 hadoop supergroup 32 2019-02-01 11:43 /user/people.txt
scala> val df = spark.read.json("hdfs://localhost:9000/user/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
-- OR --
scala> val df = spark.read.format("json").load("hdfs://localhost:9000/user/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> df.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.select("age","name").show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.select(col("age"),col("name")).show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.select(df("age"),df("name")).show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
// Add 1 to Age
scala> df.select(df("name"),df("age")+1).show
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
scala> df.select($"name",$"age"+1).show
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
scala> df.select(col("name"),col("age")+1).show
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
scala> df.select($"age" > 21).show
+----------+
|(age > 21)|
+----------+
| null|
| true|
| false|
+----------+
scala> df.select(col("age") > 21).show
+----------+
|(age > 21)|
+----------+
| null|
| true|
| false|
+----------+
//Filter operations
scala> df.filter($"age" > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala> df.filter(col("age") > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala> df.filter(df("age") > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
// Group By
scala> df.groupBy("age").count().show
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
import org.apache.spark.sql._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._
scala> val peopleRDD = spark.sparkContext.textFile("hdfs://localhost:9000/user/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/people.txt MapPartitionsRDD[80] at textFile at <console>:33
scala> peopleRDD.collect.foreach(println)
Michael, 29
Andy, 30
Justin, 19
// schema using StructType and StructField defination
scala> val schema = StructType(StructField("Name",StringType)::StructField("Age",IntegerType)::Nil )
schema: org.apache.spark.sql.types.StructType = StructType(StructField(Name,StringType,true), StructField(Age,IntegerType,true))
// StructType Not applied here
val rowRDD = peopleRDD.map (x => {
val fields = x.split(",")
val Name = fields(0).trim()
val Age = fields(1).trim()
(Name,Age)
})
scala> rowRDD.collect.foreach(println)
(Michael,29)
(Andy,30)
(Justin,19)
val rowRDD = peopleRDD.map (x => {
val fields = x.split(",")
val Name = fields(0).trim()
val Age = fields(1).trim().toInt
Row(Name,Age)
})
scala> val rowRDD = peopleRDD.map (x => {
| val fields = x.split(",")
| val Name = fields(0).trim()
| val Age = fields(1).trim().toInt
| Row(Name,Age)
| })
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[84] at map at <console>:35
scala> rowRDD.collect.foreach(println)
[Michael,29]
[Andy,30]
[Justin,19]
// Creating Dataframe with schema (StructType and StructField here)
scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF: org.apache.spark.sql.DataFrame = [Name: string, Age: int]
scala> peopleDF.createOrReplaceTempView("people")
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
scala> val results = spark.sql("SELECT name FROM people")
results: org.apache.spark.sql.DataFrame = [name: string]
scala> results.map(attributes => "Name: " + attributes(0)).show()
+-------------+
| value|
+-------------+
|Name: Michael|
| Name: Andy|
| Name: Justin|
+-------------+
scala> val ds = Seq(1,2,3).toDS()
ds: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds.show
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
+-----+
scala> ds.printSchema
root
|-- value: integer (nullable = false)
scala> case class Person(name:String, age:Long)
defined class Person
scala> val ds = Seq(Person("Sankar",42),Person("Sudha",40),Person("Vijay",18),Person("Aish",13)).toDS
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> ds.printSchema
root
|-- name: string (nullable = true)
|-- age: long (nullable = false)
scala> ds.show
+------+---+
| name|age|
+------+---+
|Sankar| 42|
| Sudha| 40|
| Vijay| 18|
| Aish| 13|
+------+---+
scala> val path = "hdfs://localhost:9000/user/people.json"
path: String = hdfs://localhost:9000/user/people.json
// Case Class definition
scala> val peopleDS = spark.read.format("json").load(path).as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
// as [CaseClassName]
scala> val peopleDS = spark.read.format("json").load("hdfs://localhost:9000/user/people.json").as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
scala> peopleDS.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> peopleDS.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> val peopleFiltered = peopleDS.filter("age is not null")
peopleFiltered: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
scala> peopleFiltered.show
+---+------+
|age| name|
+---+------+
| 30| Andy|
| 19|Justin|
+---+------+