Thursday, 16 April 2020

Spark with Scala - Revisit Part I

scala> val myFirstRDD = sc.parallelize(List("spark","scala","hadoop"))
myFirstRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> myFirstRDD.foreach(println)
spark
scala
hadoop
                                                                               
scala> val txtRDD = sc.textFile("/home/hadoop/a.txt")
txtRDD: org.apache.spark.rdd.RDD[String] = /home/hadoop/a.txt MapPartitionsRDD[2] at textFile at <console>:24

scala> txtRDD.foreach(println)
001,Ravi,10000
0002,Rahul,20000
003,Arjun,30000
004,Anbu,24350

scala> case class emp (id:Int,name:String,sal:Int)
defined class emp



val rd1 = txtRDD.map { e =>
       val employeeRow = e.split(",")
       val id = employeeRow(0).toInt
       val name = employeeRow(1)
       val salary = employeeRow(2).toInt
       emp(id,name,salary)
       }


scala> rd1.foreach(println)
emp(1,Ravi,10000)
emp(2,Rahul,20000)
emp(3,Arjun,30000)
emp(4,Anbu,24350)

scala> val df = rd1.toDF()
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df.show
+---+-----+-----+
| id| name|  sal|
+---+-----+-----+
|  1| Ravi|10000|
|  2|Rahul|20000|
|  3|Arjun|30000|
|  4| Anbu|24350|
+---+-----+-----+


scala> df.printSchema
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- sal: integer (nullable = false)

scala> df.createOrReplaceTempView("emp")

scala> sql("select * from emp").show

+---+-----+-----+
| id| name|  sal|
+---+-----+-----+
|  1| Ravi|10000|
|  2|Rahul|20000|
|  3|Arjun|30000|
|  4| Anbu|24350|
+---+-----+-----+

scala> txtRDD.take(100)
res9: Array[String] = Array(001,Ravi,10000, 0002,Rahul,20000, 003,Arjun,30000, 004,Anbu,24350)

scala> val newRDD = txtRDD.filter (x => x.contains("Ra"))
newRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at filter at <console>:25

scala> newRDD.foreach(println)
001,Ravi,10000
0002,Rahul,20000

scala> val oneMoreRDD = txtRDD.filter(x => x.contains("Arjun"))
oneMoreRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at filter at <console>:25

scala> oneMoreRDD.foreach(println)
003,Arjun,30000


-- filter example ---
scala> val numbers = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
numbers: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24

scala> val evens = numbers.filter(_%2==0).collect
evens: Array[Int] = Array(2, 4, 6, 8, 10)





--- Union and intersection ----
scala> val l1 = sc.parallelize(1 to 10)
l1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at parallelize at <console>:24

scala> val l2 = sc.parallelize(5 to 15)
l2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at parallelize at <console>:24

scala> l1.intersection(l2).collect
res15: Array[Int] = Array(6, 7, 9, 8, 10, 5)


scala> l1.union(l2).collect
res16: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)



--reduce example--
scala> val a = sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[46] at parallelize at <console>:24

scala> a.reduce(_+_)
res18: Int = 55


--sum of the elements in a given partition
scala> val b = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[49] at parallelize at <console>:24

scala> b.foreachPartition(x => println(x.reduce(_ + _)))
6
15
24



scala> val product = List((1,"Redmi",30000),(2,"OnePlus",50000),(3,"Oppo",34000))
product: List[(Int, String, Int)] = List((1,Redmi,30000), (2,OnePlus,50000), (3,Oppo,34000))

scala> val df = product.toDF("id","brand","price")
df: org.apache.spark.sql.DataFrame = [id: int, brand: string ... 1 more field]

scala> df.printSchema
root
 |-- id: integer (nullable = false)
 |-- brand: string (nullable = true)
 |-- price: integer (nullable = false)


scala> df.show
+---+-------+-----+
| id|  brand|price|
+---+-------+-----+
|  1|  Redmi|30000|
|  2|OnePlus|50000|
|  3|   Oppo|34000|
+---+-------+-----+


--read a file from hdfs --

-- copy local file into hadoop
hadoop@hadoop:~/Downloads$ hdfs dfs -copyFromLocal yelp.json /sparkfiles/


scala> val textFile = sc.textFile("hdfs://localhost:9000/sparkfiles/yelp.json")
textFile: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/sparkfiles/yelp.json MapPartitionsRDD[51] at textFile at <console>:27

scala> textFile.first
res28: String = {"business_id":"f9NumwFMBDn751xgFiRbNA","name":"The Range At Lake Norman","address":"10913 Bailey Rd","city":"Cornelius","state":"NC","postal_code":"28031","latitude":35.4627242,"longitude":-80.8526119,"stars":3.5,"review_count":36,"is_open":1,"attributes":{"BusinessAcceptsCreditCards":"True","BikeParking":"True","GoodForKids":"False","BusinessParking":"{'garage': False, 'street': False, 'validated': False, 'lot': True, 'valet': False}","ByAppointmentOnly":"False","RestaurantsPriceRange2":"3"},"categories":"Active Life, Gun\/Rifle Ranges, Guns & Ammo, Shopping","hours":{"Monday":"10:0-18:0","Tuesday":"11:0-20:0","Wednesday":"10:0-18:0","Thursday":"11:0-20:0","Friday":"11:0-20:0","Saturday":"11:0-20:0","Sunday":"13:0-18:0"}}



create a json:


hadoop@hadoop:~$ cat student1.json
{"age": 20,"name": "Sam"},
{"age": 17,"name": "Mick"},
{"age": 18,"name": "Jennet"},
{"age": 19,"name": "Serena"}

hadoop@hadoop:~$ hdfs dfs -rm /sparkfiles/student1.json
Deleted /sparkfiles/student1.json

hadoop@hadoop:~$ hdfs dfs -copyFromLocal student1.json /sparkfiles/


scala> val df = spark.read.json("hdfs://localhost:9000/sparkfiles/student1.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)


scala> df.show
+---+------+
|age|  name|
+---+------+
| 20|   Sam|
| 17|  Mick|
| 18|Jennet|
| 19|Serena|
+---+------+


scala> df.select("name").show
+------+
|  name|
+------+
|   Sam|
|  Mick|
|Jennet|
|Serena|
+------+


scala> df.filter($"age" >= 18).show
+---+------+
|age|  name|
+---+------+
| 20|   Sam|
| 18|Jennet|
| 19|Serena|
+---+------+


scala> df.groupBy("age").count().show
+---+-----+                                                                   
|age|count|
+---+-----+
| 19|    1|
| 17|    1|
| 18|    1|
| 20|    1|
+---+-----+


scala> df.createOrReplaceTempView("student")

scala> spark.sql("select * from student").show
+---+------+
|age|  name|
+---+------+
| 20|   Sam|
| 17|  Mick|
| 18|Jennet|
| 19|Serena|
+---+------+

scala> spark.sql("select * from student where age >= 18").show
+---+------+
|age|  name|
+---+------+
| 20|   Sam|
| 18|Jennet|
| 19|Serena|
+---+------+

Input json:

people.json:
--------------
{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}

adoop@hadoop:~$ cat > people.json
{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}
^C

hadoop@hadoop:~$ cat people.json
{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}

hadoop@hadoop:~$ hdfs dfs -copyFromLocal people.json /sparkfiles/

hadoop@hadoop:~$ hdfs dfs -cat hdfs://localhost:9000/sparkfiles/people.json
{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}



hdfs://localhost:9000/sparkfiles/people.json


scala> val df = spark.read.json("hdfs://localhost:9000/sparkfiles/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]               

scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)


scala> df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

scala> df.columns
res2: Array[String] = Array(age, name)

scala> df.select("age").show
+----+
| age|
+----+
|null|
|  30|
|  19|
+----+


scala> df.select($"age").show
+----+
| age|
+----+
|null|
|  30|
|  19|
+----+


scala> df.select($"age"+5).show
+---------+
|(age + 5)|
+---------+
|     null|
|       35|
|       24|
+---------+


scala> df.describe()
res8: org.apache.spark.sql.DataFrame = [summary: string, age: string ... 1 more field]

scala> df.describe().show
+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+

Make your own schema:
Applying user defined schema while reading json

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

val mySchema = StructType(StructField("age",IntegerType)::StructField("name",StringType)::Nil)

val df_with_schema = spark.read.schema(mySchema).json("hdfs://localhost:9000/sparkfiles/people.json")

scala> df_with_schema.printSchema
root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)


scala> df_with_schema.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



scala> df.head(10).foreach(println)
[null,Michael]
[30,Andy]
[19,Justin]

scala> df.withColumn("newAge",$"age").show
+----+-------+------+
| age|   name|newAge|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    30|
|  19| Justin|    19|
+----+-------+------+


scala> df.withColumn("newAge",$"age"+1).show
+----+-------+------+
| age|   name|newAge|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    31|
|  19| Justin|    20|
+----+-------+------+

Rename a column:
scala> val df1 = df.withColumnRenamed("age","superNewAge")
df1: org.apache.spark.sql.DataFrame = [superNewAge: bigint, name: string]

scala> df1.printSchema
root
 |-- superNewAge: long (nullable = true)
 |-- name: string (nullable = true)


scala> df1.show
+-----------+-------+
|superNewAge|   name|
+-----------+-------+
|       null|Michael|
|         30|   Andy|
|         19| Justin|
+-----------+-------+


scala> df.createOrReplaceTempView("people")
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


text file : people.txt

hadoop@hadoop:~$ cat > people.txt
Michale,29
Justin,19
^C


hadoop@hadoop:~$ cat people.txt
Michale,29
Justin,19


hadoop@hadoop:~$ hdfs dfs -copyFromLocal people.txt /sparkfiles/

hadoop@hadoop:~$ hdfs dfs -cat hdfs://localhost:9000/sparkfiles/people.txt
Michale,29
Justin,19

//add one more record
hadoop@hadoop:~$ cat >> people.txt
Andy,30
^C

// delete existing file in hdfs
hadoop@hadoop:~$ hdfs dfs -rm /sparkfiles/people.txt
Deleted /sparkfiles/people.txt

hadoop@hadoop:~$ hdfs dfs -copyFromLocal people.txt /sparkfiles/


scala> lines.top(3)
res26: Array[String] = Array(Michale,29, Justin,19, Andy,30)

scala>  case class Person (name:String, age:Int)
defined class Person

scala> val parts = lines.map{ x =>
     |       val l = x.split(",")
     |       val name = l(0)
     |       val age = l(1).toInt
     |       Person(name,age)
     |       }
parts: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[71] at map at <console>:30

scala> parts.foreach(println)
Person(Michale,29)
Person(Justin,19)
Person(Andy,30)

scala> val df = spark.createDataFrame(parts)
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> df
res28: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> df.printSchema
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)


scala> df.show
+-------+---+
|   name|age|
+-------+---+
|Michale| 29|
| Justin| 19|
|   Andy| 30|
+-------+---+


Teenagers:
----------

scala> val teens = spark.sql("select * from people where age >= 13 and age <= 19")
teens: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> teens.show
+------+---+
|  name|age|
+------+---+
|Justin| 19|
+------+---+



No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...