scala> val myFirstRDD = sc.parallelize(List("spark","scala","hadoop"))
myFirstRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> myFirstRDD.foreach(println)
spark
scala
hadoop
scala> val txtRDD = sc.textFile("/home/hadoop/a.txt")
txtRDD: org.apache.spark.rdd.RDD[String] = /home/hadoop/a.txt MapPartitionsRDD[2] at textFile at <console>:24
scala> txtRDD.foreach(println)
001,Ravi,10000
0002,Rahul,20000
003,Arjun,30000
004,Anbu,24350
scala> case class emp (id:Int,name:String,sal:Int)
defined class emp
val rd1 = txtRDD.map { e =>
val employeeRow = e.split(",")
val id = employeeRow(0).toInt
val name = employeeRow(1)
val salary = employeeRow(2).toInt
emp(id,name,salary)
}
scala> rd1.foreach(println)
emp(1,Ravi,10000)
emp(2,Rahul,20000)
emp(3,Arjun,30000)
emp(4,Anbu,24350)
scala> val df = rd1.toDF()
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
scala> df.show
+---+-----+-----+
| id| name| sal|
+---+-----+-----+
| 1| Ravi|10000|
| 2|Rahul|20000|
| 3|Arjun|30000|
| 4| Anbu|24350|
+---+-----+-----+
scala> df.printSchema
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- sal: integer (nullable = false)
scala> df.createOrReplaceTempView("emp")
scala> sql("select * from emp").show
+---+-----+-----+
| id| name| sal|
+---+-----+-----+
| 1| Ravi|10000|
| 2|Rahul|20000|
| 3|Arjun|30000|
| 4| Anbu|24350|
+---+-----+-----+
scala> txtRDD.take(100)
res9: Array[String] = Array(001,Ravi,10000, 0002,Rahul,20000, 003,Arjun,30000, 004,Anbu,24350)
scala> val newRDD = txtRDD.filter (x => x.contains("Ra"))
newRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at filter at <console>:25
scala> newRDD.foreach(println)
001,Ravi,10000
0002,Rahul,20000
scala> val oneMoreRDD = txtRDD.filter(x => x.contains("Arjun"))
oneMoreRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at filter at <console>:25
scala> oneMoreRDD.foreach(println)
003,Arjun,30000
-- filter example ---
scala> val numbers = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
numbers: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24
scala> val evens = numbers.filter(_%2==0).collect
evens: Array[Int] = Array(2, 4, 6, 8, 10)
--- Union and intersection ----
scala> val l1 = sc.parallelize(1 to 10)
l1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at parallelize at <console>:24
scala> val l2 = sc.parallelize(5 to 15)
l2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at parallelize at <console>:24
scala> l1.intersection(l2).collect
res15: Array[Int] = Array(6, 7, 9, 8, 10, 5)
scala> l1.union(l2).collect
res16: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)
--reduce example--
scala> val a = sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[46] at parallelize at <console>:24
scala> a.reduce(_+_)
res18: Int = 55
--sum of the elements in a given partition
scala> val b = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[49] at parallelize at <console>:24
scala> b.foreachPartition(x => println(x.reduce(_ + _)))
6
15
24
scala> val product = List((1,"Redmi",30000),(2,"OnePlus",50000),(3,"Oppo",34000))
product: List[(Int, String, Int)] = List((1,Redmi,30000), (2,OnePlus,50000), (3,Oppo,34000))
scala> val df = product.toDF("id","brand","price")
df: org.apache.spark.sql.DataFrame = [id: int, brand: string ... 1 more field]
scala> df.printSchema
root
|-- id: integer (nullable = false)
|-- brand: string (nullable = true)
|-- price: integer (nullable = false)
scala> df.show
+---+-------+-----+
| id| brand|price|
+---+-------+-----+
| 1| Redmi|30000|
| 2|OnePlus|50000|
| 3| Oppo|34000|
+---+-------+-----+
--read a file from hdfs --
-- copy local file into hadoop
hadoop@hadoop:~/Downloads$ hdfs dfs -copyFromLocal yelp.json /sparkfiles/
scala> val textFile = sc.textFile("hdfs://localhost:9000/sparkfiles/yelp.json")
textFile: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/sparkfiles/yelp.json MapPartitionsRDD[51] at textFile at <console>:27
scala> textFile.first
res28: String = {"business_id":"f9NumwFMBDn751xgFiRbNA","name":"The Range At Lake Norman","address":"10913 Bailey Rd","city":"Cornelius","state":"NC","postal_code":"28031","latitude":35.4627242,"longitude":-80.8526119,"stars":3.5,"review_count":36,"is_open":1,"attributes":{"BusinessAcceptsCreditCards":"True","BikeParking":"True","GoodForKids":"False","BusinessParking":"{'garage': False, 'street': False, 'validated': False, 'lot': True, 'valet': False}","ByAppointmentOnly":"False","RestaurantsPriceRange2":"3"},"categories":"Active Life, Gun\/Rifle Ranges, Guns & Ammo, Shopping","hours":{"Monday":"10:0-18:0","Tuesday":"11:0-20:0","Wednesday":"10:0-18:0","Thursday":"11:0-20:0","Friday":"11:0-20:0","Saturday":"11:0-20:0","Sunday":"13:0-18:0"}}
create a json:
hadoop@hadoop:~$ cat student1.json
{"age": 20,"name": "Sam"},
{"age": 17,"name": "Mick"},
{"age": 18,"name": "Jennet"},
{"age": 19,"name": "Serena"}
hadoop@hadoop:~$ hdfs dfs -rm /sparkfiles/student1.json
Deleted /sparkfiles/student1.json
hadoop@hadoop:~$ hdfs dfs -copyFromLocal student1.json /sparkfiles/
scala> val df = spark.read.json("hdfs://localhost:9000/sparkfiles/student1.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> df.show
+---+------+
|age| name|
+---+------+
| 20| Sam|
| 17| Mick|
| 18|Jennet|
| 19|Serena|
+---+------+
scala> df.select("name").show
+------+
| name|
+------+
| Sam|
| Mick|
|Jennet|
|Serena|
+------+
scala> df.filter($"age" >= 18).show
+---+------+
|age| name|
+---+------+
| 20| Sam|
| 18|Jennet|
| 19|Serena|
+---+------+
scala> df.groupBy("age").count().show
+---+-----+
|age|count|
+---+-----+
| 19| 1|
| 17| 1|
| 18| 1|
| 20| 1|
+---+-----+
scala> df.createOrReplaceTempView("student")
scala> spark.sql("select * from student").show
+---+------+
|age| name|
+---+------+
| 20| Sam|
| 17| Mick|
| 18|Jennet|
| 19|Serena|
+---+------+
scala> spark.sql("select * from student where age >= 18").show
+---+------+
|age| name|
+---+------+
| 20| Sam|
| 18|Jennet|
| 19|Serena|
+---+------+
Input json:
people.json:
--------------
{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}
adoop@hadoop:~$ cat > people.json
{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}
^C
hadoop@hadoop:~$ cat people.json
{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}
hadoop@hadoop:~$ hdfs dfs -copyFromLocal people.json /sparkfiles/
hadoop@hadoop:~$ hdfs dfs -cat hdfs://localhost:9000/sparkfiles/people.json
{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}
hdfs://localhost:9000/sparkfiles/people.json
scala> val df = spark.read.json("hdfs://localhost:9000/sparkfiles/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> df.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.columns
res2: Array[String] = Array(age, name)
scala> df.select("age").show
+----+
| age|
+----+
|null|
| 30|
| 19|
+----+
scala> df.select($"age").show
+----+
| age|
+----+
|null|
| 30|
| 19|
+----+
scala> df.select($"age"+5).show
+---------+
|(age + 5)|
+---------+
| null|
| 35|
| 24|
+---------+
scala> df.describe()
res8: org.apache.spark.sql.DataFrame = [summary: string, age: string ... 1 more field]
scala> df.describe().show
+-------+------------------+-------+
|summary| age| name|
+-------+------------------+-------+
| count| 2| 3|
| mean| 24.5| null|
| stddev|7.7781745930520225| null|
| min| 19| Andy|
| max| 30|Michael|
+-------+------------------+-------+
Make your own schema:
Applying user defined schema while reading json
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
val mySchema = StructType(StructField("age",IntegerType)::StructField("name",StringType)::Nil)
val df_with_schema = spark.read.schema(mySchema).json("hdfs://localhost:9000/sparkfiles/people.json")
scala> df_with_schema.printSchema
root
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
scala> df_with_schema.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.head(10).foreach(println)
[null,Michael]
[30,Andy]
[19,Justin]
scala> df.withColumn("newAge",$"age").show
+----+-------+------+
| age| name|newAge|
+----+-------+------+
|null|Michael| null|
| 30| Andy| 30|
| 19| Justin| 19|
+----+-------+------+
scala> df.withColumn("newAge",$"age"+1).show
+----+-------+------+
| age| name|newAge|
+----+-------+------+
|null|Michael| null|
| 30| Andy| 31|
| 19| Justin| 20|
+----+-------+------+
Rename a column:
scala> val df1 = df.withColumnRenamed("age","superNewAge")
df1: org.apache.spark.sql.DataFrame = [superNewAge: bigint, name: string]
scala> df1.printSchema
root
|-- superNewAge: long (nullable = true)
|-- name: string (nullable = true)
scala> df1.show
+-----------+-------+
|superNewAge| name|
+-----------+-------+
| null|Michael|
| 30| Andy|
| 19| Justin|
+-----------+-------+
scala> df.createOrReplaceTempView("people")
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
text file : people.txt
hadoop@hadoop:~$ cat > people.txt
Michale,29
Justin,19
^C
hadoop@hadoop:~$ cat people.txt
Michale,29
Justin,19
hadoop@hadoop:~$ hdfs dfs -copyFromLocal people.txt /sparkfiles/
hadoop@hadoop:~$ hdfs dfs -cat hdfs://localhost:9000/sparkfiles/people.txt
Michale,29
Justin,19
//add one more record
hadoop@hadoop:~$ cat >> people.txt
Andy,30
^C
// delete existing file in hdfs
hadoop@hadoop:~$ hdfs dfs -rm /sparkfiles/people.txt
Deleted /sparkfiles/people.txt
hadoop@hadoop:~$ hdfs dfs -copyFromLocal people.txt /sparkfiles/
scala> lines.top(3)
res26: Array[String] = Array(Michale,29, Justin,19, Andy,30)
scala> case class Person (name:String, age:Int)
defined class Person
scala> val parts = lines.map{ x =>
| val l = x.split(",")
| val name = l(0)
| val age = l(1).toInt
| Person(name,age)
| }
parts: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[71] at map at <console>:30
scala> parts.foreach(println)
Person(Michale,29)
Person(Justin,19)
Person(Andy,30)
scala> val df = spark.createDataFrame(parts)
df: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> df
res28: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> df.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
scala> df.show
+-------+---+
| name|age|
+-------+---+
|Michale| 29|
| Justin| 19|
| Andy| 30|
+-------+---+
Teenagers:
----------
scala> val teens = spark.sql("select * from people where age >= 13 and age <= 19")
teens: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> teens.show
+------+---+
| name|age|
+------+---+
|Justin| 19|
+------+---+
myFirstRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> myFirstRDD.foreach(println)
spark
scala
hadoop
scala> val txtRDD = sc.textFile("/home/hadoop/a.txt")
txtRDD: org.apache.spark.rdd.RDD[String] = /home/hadoop/a.txt MapPartitionsRDD[2] at textFile at <console>:24
scala> txtRDD.foreach(println)
001,Ravi,10000
0002,Rahul,20000
003,Arjun,30000
004,Anbu,24350
scala> case class emp (id:Int,name:String,sal:Int)
defined class emp
val rd1 = txtRDD.map { e =>
val employeeRow = e.split(",")
val id = employeeRow(0).toInt
val name = employeeRow(1)
val salary = employeeRow(2).toInt
emp(id,name,salary)
}
scala> rd1.foreach(println)
emp(1,Ravi,10000)
emp(2,Rahul,20000)
emp(3,Arjun,30000)
emp(4,Anbu,24350)
scala> val df = rd1.toDF()
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
scala> df.show
+---+-----+-----+
| id| name| sal|
+---+-----+-----+
| 1| Ravi|10000|
| 2|Rahul|20000|
| 3|Arjun|30000|
| 4| Anbu|24350|
+---+-----+-----+
scala> df.printSchema
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- sal: integer (nullable = false)
scala> df.createOrReplaceTempView("emp")
scala> sql("select * from emp").show
+---+-----+-----+
| id| name| sal|
+---+-----+-----+
| 1| Ravi|10000|
| 2|Rahul|20000|
| 3|Arjun|30000|
| 4| Anbu|24350|
+---+-----+-----+
scala> txtRDD.take(100)
res9: Array[String] = Array(001,Ravi,10000, 0002,Rahul,20000, 003,Arjun,30000, 004,Anbu,24350)
scala> val newRDD = txtRDD.filter (x => x.contains("Ra"))
newRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at filter at <console>:25
scala> newRDD.foreach(println)
001,Ravi,10000
0002,Rahul,20000
scala> val oneMoreRDD = txtRDD.filter(x => x.contains("Arjun"))
oneMoreRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at filter at <console>:25
scala> oneMoreRDD.foreach(println)
003,Arjun,30000
-- filter example ---
scala> val numbers = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
numbers: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24
scala> val evens = numbers.filter(_%2==0).collect
evens: Array[Int] = Array(2, 4, 6, 8, 10)
--- Union and intersection ----
scala> val l1 = sc.parallelize(1 to 10)
l1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at parallelize at <console>:24
scala> val l2 = sc.parallelize(5 to 15)
l2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at parallelize at <console>:24
scala> l1.intersection(l2).collect
res15: Array[Int] = Array(6, 7, 9, 8, 10, 5)
scala> l1.union(l2).collect
res16: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)
--reduce example--
scala> val a = sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[46] at parallelize at <console>:24
scala> a.reduce(_+_)
res18: Int = 55
--sum of the elements in a given partition
scala> val b = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[49] at parallelize at <console>:24
scala> b.foreachPartition(x => println(x.reduce(_ + _)))
6
15
24
scala> val product = List((1,"Redmi",30000),(2,"OnePlus",50000),(3,"Oppo",34000))
product: List[(Int, String, Int)] = List((1,Redmi,30000), (2,OnePlus,50000), (3,Oppo,34000))
scala> val df = product.toDF("id","brand","price")
df: org.apache.spark.sql.DataFrame = [id: int, brand: string ... 1 more field]
scala> df.printSchema
root
|-- id: integer (nullable = false)
|-- brand: string (nullable = true)
|-- price: integer (nullable = false)
scala> df.show
+---+-------+-----+
| id| brand|price|
+---+-------+-----+
| 1| Redmi|30000|
| 2|OnePlus|50000|
| 3| Oppo|34000|
+---+-------+-----+
--read a file from hdfs --
-- copy local file into hadoop
hadoop@hadoop:~/Downloads$ hdfs dfs -copyFromLocal yelp.json /sparkfiles/
scala> val textFile = sc.textFile("hdfs://localhost:9000/sparkfiles/yelp.json")
textFile: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/sparkfiles/yelp.json MapPartitionsRDD[51] at textFile at <console>:27
scala> textFile.first
res28: String = {"business_id":"f9NumwFMBDn751xgFiRbNA","name":"The Range At Lake Norman","address":"10913 Bailey Rd","city":"Cornelius","state":"NC","postal_code":"28031","latitude":35.4627242,"longitude":-80.8526119,"stars":3.5,"review_count":36,"is_open":1,"attributes":{"BusinessAcceptsCreditCards":"True","BikeParking":"True","GoodForKids":"False","BusinessParking":"{'garage': False, 'street': False, 'validated': False, 'lot': True, 'valet': False}","ByAppointmentOnly":"False","RestaurantsPriceRange2":"3"},"categories":"Active Life, Gun\/Rifle Ranges, Guns & Ammo, Shopping","hours":{"Monday":"10:0-18:0","Tuesday":"11:0-20:0","Wednesday":"10:0-18:0","Thursday":"11:0-20:0","Friday":"11:0-20:0","Saturday":"11:0-20:0","Sunday":"13:0-18:0"}}
create a json:
hadoop@hadoop:~$ cat student1.json
{"age": 20,"name": "Sam"},
{"age": 17,"name": "Mick"},
{"age": 18,"name": "Jennet"},
{"age": 19,"name": "Serena"}
hadoop@hadoop:~$ hdfs dfs -rm /sparkfiles/student1.json
Deleted /sparkfiles/student1.json
hadoop@hadoop:~$ hdfs dfs -copyFromLocal student1.json /sparkfiles/
scala> val df = spark.read.json("hdfs://localhost:9000/sparkfiles/student1.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> df.show
+---+------+
|age| name|
+---+------+
| 20| Sam|
| 17| Mick|
| 18|Jennet|
| 19|Serena|
+---+------+
scala> df.select("name").show
+------+
| name|
+------+
| Sam|
| Mick|
|Jennet|
|Serena|
+------+
scala> df.filter($"age" >= 18).show
+---+------+
|age| name|
+---+------+
| 20| Sam|
| 18|Jennet|
| 19|Serena|
+---+------+
scala> df.groupBy("age").count().show
+---+-----+
|age|count|
+---+-----+
| 19| 1|
| 17| 1|
| 18| 1|
| 20| 1|
+---+-----+
scala> df.createOrReplaceTempView("student")
scala> spark.sql("select * from student").show
+---+------+
|age| name|
+---+------+
| 20| Sam|
| 17| Mick|
| 18|Jennet|
| 19|Serena|
+---+------+
scala> spark.sql("select * from student where age >= 18").show
+---+------+
|age| name|
+---+------+
| 20| Sam|
| 18|Jennet|
| 19|Serena|
+---+------+
Input json:
people.json:
--------------
{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}
adoop@hadoop:~$ cat > people.json
{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}
^C
hadoop@hadoop:~$ cat people.json
{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}
hadoop@hadoop:~$ hdfs dfs -copyFromLocal people.json /sparkfiles/
hadoop@hadoop:~$ hdfs dfs -cat hdfs://localhost:9000/sparkfiles/people.json
{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}
hdfs://localhost:9000/sparkfiles/people.json
scala> val df = spark.read.json("hdfs://localhost:9000/sparkfiles/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> df.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.columns
res2: Array[String] = Array(age, name)
scala> df.select("age").show
+----+
| age|
+----+
|null|
| 30|
| 19|
+----+
scala> df.select($"age").show
+----+
| age|
+----+
|null|
| 30|
| 19|
+----+
scala> df.select($"age"+5).show
+---------+
|(age + 5)|
+---------+
| null|
| 35|
| 24|
+---------+
scala> df.describe()
res8: org.apache.spark.sql.DataFrame = [summary: string, age: string ... 1 more field]
scala> df.describe().show
+-------+------------------+-------+
|summary| age| name|
+-------+------------------+-------+
| count| 2| 3|
| mean| 24.5| null|
| stddev|7.7781745930520225| null|
| min| 19| Andy|
| max| 30|Michael|
+-------+------------------+-------+
Make your own schema:
Applying user defined schema while reading json
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
val mySchema = StructType(StructField("age",IntegerType)::StructField("name",StringType)::Nil)
val df_with_schema = spark.read.schema(mySchema).json("hdfs://localhost:9000/sparkfiles/people.json")
scala> df_with_schema.printSchema
root
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
scala> df_with_schema.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.head(10).foreach(println)
[null,Michael]
[30,Andy]
[19,Justin]
scala> df.withColumn("newAge",$"age").show
+----+-------+------+
| age| name|newAge|
+----+-------+------+
|null|Michael| null|
| 30| Andy| 30|
| 19| Justin| 19|
+----+-------+------+
scala> df.withColumn("newAge",$"age"+1).show
+----+-------+------+
| age| name|newAge|
+----+-------+------+
|null|Michael| null|
| 30| Andy| 31|
| 19| Justin| 20|
+----+-------+------+
Rename a column:
scala> val df1 = df.withColumnRenamed("age","superNewAge")
df1: org.apache.spark.sql.DataFrame = [superNewAge: bigint, name: string]
scala> df1.printSchema
root
|-- superNewAge: long (nullable = true)
|-- name: string (nullable = true)
scala> df1.show
+-----------+-------+
|superNewAge| name|
+-----------+-------+
| null|Michael|
| 30| Andy|
| 19| Justin|
+-----------+-------+
scala> df.createOrReplaceTempView("people")
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
text file : people.txt
hadoop@hadoop:~$ cat > people.txt
Michale,29
Justin,19
^C
hadoop@hadoop:~$ cat people.txt
Michale,29
Justin,19
hadoop@hadoop:~$ hdfs dfs -copyFromLocal people.txt /sparkfiles/
hadoop@hadoop:~$ hdfs dfs -cat hdfs://localhost:9000/sparkfiles/people.txt
Michale,29
Justin,19
//add one more record
hadoop@hadoop:~$ cat >> people.txt
Andy,30
^C
// delete existing file in hdfs
hadoop@hadoop:~$ hdfs dfs -rm /sparkfiles/people.txt
Deleted /sparkfiles/people.txt
hadoop@hadoop:~$ hdfs dfs -copyFromLocal people.txt /sparkfiles/
scala> lines.top(3)
res26: Array[String] = Array(Michale,29, Justin,19, Andy,30)
scala> case class Person (name:String, age:Int)
defined class Person
scala> val parts = lines.map{ x =>
| val l = x.split(",")
| val name = l(0)
| val age = l(1).toInt
| Person(name,age)
| }
parts: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[71] at map at <console>:30
scala> parts.foreach(println)
Person(Michale,29)
Person(Justin,19)
Person(Andy,30)
scala> val df = spark.createDataFrame(parts)
df: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> df
res28: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> df.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
scala> df.show
+-------+---+
| name|age|
+-------+---+
|Michale| 29|
| Justin| 19|
| Andy| 30|
+-------+---+
Teenagers:
----------
scala> val teens = spark.sql("select * from people where age >= 13 and age <= 19")
teens: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> teens.show
+------+---+
| name|age|
+------+---+
|Justin| 19|
+------+---+
No comments:
Post a Comment