Showing posts with label spark. Show all posts
Showing posts with label spark. Show all posts

Saturday, 22 August 2020

Lead, Lag Examples in Spark SQL with Scala

// Lead Example 
// Lead means Next row's salary value 
spark.sql("SELECT id, fname,lname, designation, technology,salary, LEAD(salary) OVER (PARTITION BY technology ORDER BY salary desc) as Lead FROM teams").show(21)

// Lead will fetch next row's salary value 
+---+------------+-----------+-----------+----------+------+-----+
| id|       fname|      lname|designation|technology|salary| Lead|
+---+------------+-----------+-----------+----------+------+-----+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|50000|  
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|40000|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|20000|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|12000|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000| null|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|50000|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|45000|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|45000|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|20000|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000| null|
|115|      Donald|      Trump|    Analyst|   Android| 45000|45000|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|40000|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|38000|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|25000|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000| null|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|50000|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|50000|
|103|        Raja|       Mani|  Developer|     Spark| 50000|50000|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|40000|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|25000|
|106|        Arun|    Pandian|     Tester|     Spark| 25000| null|
+---+------------+-----------+-----------+----------+------+-----+


// Lag Example
spark.sql("SELECT id, fname,lname, designation, technology,salary, LAG(salary) OVER (PARTITION BY technology ORDER BY salary desc) as Lag FROM teams").show(21)

// Lag will fetch previous row's salary value 

+---+------------+-----------+-----------+----------+------+-----+
| id|       fname|      lname|designation|technology|salary|  Lag|
+---+------------+-----------+-----------+----------+------+-----+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000| null|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|98000|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|50000|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|40000|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|20000|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000| null|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|69000|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|50000|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|45000|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|45000|
|115|      Donald|      Trump|    Analyst|   Android| 45000| null|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|45000|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|45000|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|40000|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|38000|
|114|      Salman|       Khan|    Analyst|     Spark| 67000| null|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|67000|
|103|        Raja|       Mani|  Developer|     Spark| 50000|50000|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|50000|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|50000|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|40000|
+---+------------+-----------+-----------+----------+------+-----+

// fetching salary which is lagging for 2 records
spark.sql("SELECT id, fname,lname, designation, technology,salary, LAG(salary,2) OVER (PARTITION BY technology ORDER BY salary desc) as Lag FROM teams").show(21)


+---+------------+-----------+-----------+----------+------+-----+
| id|       fname|      lname|designation|technology|salary|  Lag|
+---+------------+-----------+-----------+----------+------+-----+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000| null|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000| null|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|98000|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|50000|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|40000|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000| null|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000| null|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|69000|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|50000|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|45000|
|115|      Donald|      Trump|    Analyst|   Android| 45000| null|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000| null|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|45000|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|45000|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|40000|
|114|      Salman|       Khan|    Analyst|     Spark| 67000| null|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000| null|
|103|        Raja|       Mani|  Developer|     Spark| 50000|67000|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|50000|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|50000|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|50000|
+---+------------+-----------+-----------+----------+------+-----+

//fetching salary which is leading 3 records
spark.sql("SELECT id, fname,lname, designation, technology,salary, LEAD(salary,3) OVER (PARTITION BY technology ORDER BY salary desc) as Lead FROM teams").show(21)



+---+------------+-----------+-----------+----------+------+-----+
| id|       fname|      lname|designation|technology|salary| Lead|
+---+------------+-----------+-----------+----------+------+-----+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|20000|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|12000|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000| null|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000| null|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000| null|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|45000|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|20000|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000| null|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000| null|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000| null|
|115|      Donald|      Trump|    Analyst|   Android| 45000|38000|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|25000|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000| null|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000| null|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000| null|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|50000|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|40000|
|103|        Raja|       Mani|  Developer|     Spark| 50000|25000|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000| null|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000| null|
|106|        Arun|    Pandian|     Tester|     Spark| 25000| null|
+---+------------+-----------+-----------+----------+------+-----+


// Find the difference amount of salary between LAG value and current salary value
spark.sql("SELECT id, fname,lname, designation, technology,salary, LAG(salary) OVER (PARTITION BY technology ORDER BY salary desc) as Lag, (salary - LAG(salary) OVER (PARTITION BY technology ORDER BY salary desc)) as diff FROM teams").show(21)

+---+------------+-----------+-----------+----------+------+-----+------+
| id|       fname|      lname|designation|technology|salary|  Lag|  diff|
+---+------------+-----------+-----------+----------+------+-----+------+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000| null|  null|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|98000|-48000|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|50000|-10000|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|40000|-20000|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|20000| -8000|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000| null|  null|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|69000|-19000|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|50000| -5000|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|45000|     0|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|45000|-25000|
|115|      Donald|      Trump|    Analyst|   Android| 45000| null|  null|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|45000|     0|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|45000| -5000|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|40000| -2000|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|38000|-13000|
|114|      Salman|       Khan|    Analyst|     Spark| 67000| null|  null|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|67000|-17000|
|103|        Raja|       Mani|  Developer|     Spark| 50000|50000|     0|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|50000|     0|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|50000|-10000|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|40000|-15000|
+---+------------+-----------+-----------+----------+------+-----+------+

// Find the difference amount of salary between LEAD value and current salary value
spark.sql("SELECT id, fname,lname, designation, technology,salary, LEAD(salary) OVER (PARTITION BY technology ORDER BY salary desc) as Lead, (salary - LEAD(salary) OVER (PARTITION BY technology ORDER BY salary desc) ) as diff FROM teams").show(21)

+---+------------+-----------+-----------+----------+------+-----+-----+
| id|       fname|      lname|designation|technology|salary| Lead| diff|
+---+------------+-----------+-----------+----------+------+-----+-----+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|50000|48000|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|40000|10000|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|20000|20000|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|12000| 8000|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000| null| null|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|50000|19000|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|45000| 5000|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|45000|    0|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|20000|25000|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000| null| null|
|115|      Donald|      Trump|    Analyst|   Android| 45000|45000|    0|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|40000| 5000|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|38000| 2000|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|25000|13000|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000| null| null|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|50000|17000|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|50000|    0|
|103|        Raja|       Mani|  Developer|     Spark| 50000|50000|    0|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|40000|10000|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|25000|15000|
|106|        Arun|    Pandian|     Tester|     Spark| 25000| null| null|
+---+------------+-----------+-----------+----------+------+-----+-----+ 



ROW_NUMBER, RANK, DENSE RANK Windowing Functions in Spark SQL

team.csv:
---------
id,fname,lname,designation,technology,salary
100,Sankara,Narayanan,Developer,Spark,50000
101,Ramesh,Kumar,Tester,Android,40000
102,Ganapathy,Govindan,Tester,Android,25000
103,Raja,Mani,Developer,Spark,50000
104,Pavithra,Lokesh,Developer,iOS,20000
105,Kamala,Kamesh,Developer,iOS,98000
106,Arun,Pandian,Tester,Spark,25000
107,Praveen,Mani,Analyst,iOS,50000
108,Anbu,Sudha,Tester,iOS,12000
109,Nagarethinam,Dhanukkodi,Tester,Node JS,20000
110,Veeraiah,Dhanukkodi,Analyst,iOS,40000
111,Arivu,Madhi,Developer,Spark,50000
112,Muthu,Kumar,Analyst,Node JS,45000
113,Sikkandar,Bhai,Developer,Spark,40000
114,Salman,Khan,Analyst,Spark,67000
115,Donald,Trump,Analyst,Android,45000
116,Anitha,Kuppusamy,Developer,Android,45000
117,Pushpavanam,Kuppusamy,Analyst,Android,38000
118,Latha,Rajinikanth,Tester,Node JS,45000
119,Aishwarya,Dhanush,Developer,Node JS,69000
120,Vijay,Kumar,Developer,Node JS,50000


/FileStore/tables/team.csv


 val df = spark.read.format("csv").option("inferSchema","true").option("header",true).load("/FileStore/tables/team.csv")
 
 df.printSchema()
 
root
 |-- id: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- technology: string (nullable = true)
 |-- salary: integer (nullable = true)
 
 
 df.show()
 
+---+------------+-----------+-----------+----------+------+
| id|       fname|      lname|designation|technology|salary|
+---+------------+-----------+-----------+----------+------+
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|
|103|        Raja|       Mani|  Developer|     Spark| 50000|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|
|115|      Donald|      Trump|    Analyst|   Android| 45000|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|
+---+------------+-----------+-----------+----------+------+


df.createOrReplaceTempView("teams")

spark.sql("select * from teams").show(21)

+---+------------+-----------+-----------+----------+------+
| id|       fname|      lname|designation|technology|salary|
+---+------------+-----------+-----------+----------+------+
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|
|103|        Raja|       Mani|  Developer|     Spark| 50000|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|
|115|      Donald|      Trump|    Analyst|   Android| 45000|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|
+---+------------+-----------+-----------+----------+------+


// ROW_NUMBER -- to add row number sequence 


spark.sql("SELECT id,fname,lname,technology,designation,salary, ROW_NUMBER() OVER (PARTITION BY designation ORDER BY salary desc) AS row_num FROM teams").show(21)


+---+------------+-----------+----------+-----------+------+-------+
| id|       fname|      lname|technology|designation|salary|row_num|
+---+------------+-----------+----------+-----------+------+-------+
|118|       Latha|Rajinikanth|   Node JS|     Tester| 45000|      1|
|101|      Ramesh|      Kumar|   Android|     Tester| 40000|      2|
|102|   Ganapathy|   Govindan|   Android|     Tester| 25000|      3|
|106|        Arun|    Pandian|     Spark|     Tester| 25000|      4|
|109|Nagarethinam| Dhanukkodi|   Node JS|     Tester| 20000|      5|
|108|        Anbu|      Sudha|       iOS|     Tester| 12000|      6|
|105|      Kamala|     Kamesh|       iOS|  Developer| 98000|      1|
|119|   Aishwarya|    Dhanush|   Node JS|  Developer| 69000|      2|
|100|     Sankara|  Narayanan|     Spark|  Developer| 50000|      3|
|103|        Raja|       Mani|     Spark|  Developer| 50000|      4|
|111|       Arivu|      Madhi|     Spark|  Developer| 50000|      5|
|120|       Vijay|      Kumar|   Node JS|  Developer| 50000|      6|
|116|      Anitha|  Kuppusamy|   Android|  Developer| 45000|      7|
|113|   Sikkandar|       Bhai|     Spark|  Developer| 40000|      8|
|104|    Pavithra|     Lokesh|       iOS|  Developer| 20000|      9|
|114|      Salman|       Khan|     Spark|    Analyst| 67000|      1|
|107|     Praveen|       Mani|       iOS|    Analyst| 50000|      2|
|112|       Muthu|      Kumar|   Node JS|    Analyst| 45000|      3|
|115|      Donald|      Trump|   Android|    Analyst| 45000|      4|
|110|    Veeraiah| Dhanukkodi|       iOS|    Analyst| 40000|      5|
|117| Pushpavanam|  Kuppusamy|   Android|    Analyst| 38000|      6|
+---+------------+-----------+----------+-----------+------+-------+


// ROW_NUMBER()

spark.sql("SELECT id,fname,lname,designation,technology,salary, ROW_NUMBER() OVER (PARTITION BY technology ORDER BY salary desc) AS row_num FROM teams").show(21)


+---+------------+-----------+-----------+----------+------+-------+
| id|       fname|      lname|designation|technology|salary|row_num|
+---+------------+-----------+-----------+----------+------+-------+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|      1|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|      2|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|      3|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|      4|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|      5|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|      1|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|      2|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|      3|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|      4|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|      5|
|115|      Donald|      Trump|    Analyst|   Android| 45000|      1|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|      2|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|      3|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|      4|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|      5|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|      1|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|      2|
|103|        Raja|       Mani|  Developer|     Spark| 50000|      3|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|      4|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|      5|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|      6|
+---+------------+-----------+-----------+----------+------+-------+


 
// RANK()
spark.sql("SELECT id,fname,lname,technology,designation,salary, RANK() OVER (PARTITION BY designation ORDER BY salary desc) AS rank FROM teams").show(21)

+---+------------+-----------+----------+-----------+------+----+
| id|       fname|      lname|technology|designation|salary|rank|
+---+------------+-----------+----------+-----------+------+----+
|118|       Latha|Rajinikanth|   Node JS|     Tester| 45000|   1|
|101|      Ramesh|      Kumar|   Android|     Tester| 40000|   2|
|102|   Ganapathy|   Govindan|   Android|     Tester| 25000|   3|
|106|        Arun|    Pandian|     Spark|     Tester| 25000|   3| -- 4 skipped 
|109|Nagarethinam| Dhanukkodi|   Node JS|     Tester| 20000|   5|
|108|        Anbu|      Sudha|       iOS|     Tester| 12000|   6|
|105|      Kamala|     Kamesh|       iOS|  Developer| 98000|   1|
|119|   Aishwarya|    Dhanush|   Node JS|  Developer| 69000|   2|
|100|     Sankara|  Narayanan|     Spark|  Developer| 50000|   3|
|103|        Raja|       Mani|     Spark|  Developer| 50000|   3|
|111|       Arivu|      Madhi|     Spark|  Developer| 50000|   3|
|120|       Vijay|      Kumar|   Node JS|  Developer| 50000|   3|
|116|      Anitha|  Kuppusamy|   Android|  Developer| 45000|   7| -- 4,5,6 skipped 
|113|   Sikkandar|       Bhai|     Spark|  Developer| 40000|   8|
|104|    Pavithra|     Lokesh|       iOS|  Developer| 20000|   9|
|114|      Salman|       Khan|     Spark|    Analyst| 67000|   1|
|107|     Praveen|       Mani|       iOS|    Analyst| 50000|   2|
|112|       Muthu|      Kumar|   Node JS|    Analyst| 45000|   3|
|115|      Donald|      Trump|   Android|    Analyst| 45000|   3| -- 4 skipped 
|110|    Veeraiah| Dhanukkodi|       iOS|    Analyst| 40000|   5|
|117| Pushpavanam|  Kuppusamy|   Android|    Analyst| 38000|   6|
+---+------------+-----------+----------+-----------+------+----+

// RANK()
spark.sql("SELECT id,fname,lname,designation,technology,salary, RANK() OVER (PARTITION BY technology ORDER BY salary desc) AS rank FROM teams").show(21)


+---+------------+-----------+-----------+----------+------+----+
| id|       fname|      lname|designation|technology|salary|rank|
+---+------------+-----------+-----------+----------+------+----+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|   1|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|   2|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|   3|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|   4|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|   5|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|   1|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|   2|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|   3|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|   3|  -- 4 is skipped
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|   5|
|115|      Donald|      Trump|    Analyst|   Android| 45000|   1|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|   1| -- 2 is skipped 
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|   3|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|   4|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|   5|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|   1|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|   2|
|103|        Raja|       Mani|  Developer|     Spark| 50000|   2| -- 3 is skipped 
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|   2|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|   5|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|   6|
+---+------------+-----------+-----------+----------+------+----+



// DENSE_RANK()
spark.sql("SELECT id,fname,lname,technology,designation,salary, DENSE_RANK() OVER (PARTITION BY designation ORDER BY salary desc) AS dense_rank FROM teams").show(21)

+---+------------+-----------+----------+-----------+------+----------+
| id|       fname|      lname|technology|designation|salary|dense_rank|
+---+------------+-----------+----------+-----------+------+----------+
|118|       Latha|Rajinikanth|   Node JS|     Tester| 45000|         1|
|101|      Ramesh|      Kumar|   Android|     Tester| 40000|         2|
|102|   Ganapathy|   Govindan|   Android|     Tester| 25000|         3|
|106|        Arun|    Pandian|     Spark|     Tester| 25000|         3|
|109|Nagarethinam| Dhanukkodi|   Node JS|     Tester| 20000|         4|
|108|        Anbu|      Sudha|       iOS|     Tester| 12000|         5|
|105|      Kamala|     Kamesh|       iOS|  Developer| 98000|         1|
|119|   Aishwarya|    Dhanush|   Node JS|  Developer| 69000|         2|
|100|     Sankara|  Narayanan|     Spark|  Developer| 50000|         3|
|103|        Raja|       Mani|     Spark|  Developer| 50000|         3|
|111|       Arivu|      Madhi|     Spark|  Developer| 50000|         3|
|120|       Vijay|      Kumar|   Node JS|  Developer| 50000|         3|
|116|      Anitha|  Kuppusamy|   Android|  Developer| 45000|         4|
|113|   Sikkandar|       Bhai|     Spark|  Developer| 40000|         5|
|104|    Pavithra|     Lokesh|       iOS|  Developer| 20000|         6|
|114|      Salman|       Khan|     Spark|    Analyst| 67000|         1|
|107|     Praveen|       Mani|       iOS|    Analyst| 50000|         2|
|112|       Muthu|      Kumar|   Node JS|    Analyst| 45000|         3|
|115|      Donald|      Trump|   Android|    Analyst| 45000|         3|
|110|    Veeraiah| Dhanukkodi|       iOS|    Analyst| 40000|         4|
|117| Pushpavanam|  Kuppusamy|   Android|    Analyst| 38000|         5|
+---+------------+-----------+----------+-----------+------+----------+

// dense_rank
spark.sql("SELECT id,fname,lname,designation,technology,salary, DENSE_RANK() OVER (PARTITION BY technology ORDER BY salary desc) AS dense_rank FROM teams").show(21)

+---+------------+-----------+-----------+----------+------+----------+
| id|       fname|      lname|designation|technology|salary|dense_rank|
+---+------------+-----------+-----------+----------+------+----------+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|         1|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|         2|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|         3|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|         4|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|         5|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|         1|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|         2|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|         3|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|         3|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|         4|
|115|      Donald|      Trump|    Analyst|   Android| 45000|         1|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|         1|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|         2|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|         3|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|         4|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|         1|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|         2|
|103|        Raja|       Mani|  Developer|     Spark| 50000|         2|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|         2|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|         3|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|         4|
+---+------------+-----------+-----------+----------+------+----------+


 // ROW_NUMBER, RANK, DENSE_RANK all together

spark.sql("SELECT id,fname,lname,designation,technology,salary, ROW_NUMBER() OVER (PARTITION BY technology ORDER BY salary desc) AS row_num,RANK() OVER (PARTITION BY technology ORDER BY salary desc) AS rank,DENSE_RANK() OVER (PARTITION BY technology ORDER BY salary desc) AS dense_rank FROM teams").show(21)


+---+------------+-----------+-----------+----------+------+-------+----+----------+
| id|       fname|      lname|designation|technology|salary|row_num|rank|dense_rank|
+---+------------+-----------+-----------+----------+------+-------+----+----------+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|      1|   1|         1|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|      2|   2|         2|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|      3|   3|         3|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|      4|   4|         4|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|      5|   5|         5|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|      1|   1|         1|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|      2|   2|         2|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|      3|   3|         3|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|      4|   3|         3|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|      5|   5|         4|
|115|      Donald|      Trump|    Analyst|   Android| 45000|      1|   1|         1|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|      2|   1|         1|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|      3|   3|         2|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|      4|   4|         3|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|      5|   5|         4|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|      1|   1|         1|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|      2|   2|         2|
|103|        Raja|       Mani|  Developer|     Spark| 50000|      3|   2|         2|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|      4|   2|         2|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|      5|   5|         3|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|      6|   6|         4|
+---+------------+-----------+-----------+----------+------+-------+----+----------+

Tuesday, 18 August 2020

Item, StoreSales DataSets - Spark with Scala Programming


val itemDF = spark.read.format("csv").options(Map("header"->"true", "delimiter"->"|", "inferSchema"->"true")).load("/FileStore/tables/retailer/data/item.dat")
 
itemDF.printSchema()

root
 |-- i_item_sk: integer (nullable = true)
 |-- i_item_id: string (nullable = true)
 |-- i_rec_start_date: string (nullable = true)
 |-- i_rec_end_date: string (nullable = true)
 |-- i_item_desc: string (nullable = true)
 |-- i_current_price: double (nullable = true)
 |-- i_wholesale_cost: double (nullable = true)
 |-- i_brand_id: integer (nullable = true)
 |-- i_brand: string (nullable = true)
 |-- i_class_id: integer (nullable = true)
 |-- i_class: string (nullable = true)
 |-- i_category_id: integer (nullable = true)
 |-- i_category: string (nullable = true)
 |-- i_manufact_id: integer (nullable = true)
 |-- i_manufact: string (nullable = true)
 |-- i_size: string (nullable = true)
 |-- i_formulation: string (nullable = true)
 |-- i_color: string (nullable = true)
 |-- i_units: string (nullable = true)
 |-- i_container: string (nullable = true)
 |-- i_manager_id: integer (nullable = true)
 |-- i_product_name: string (nullable = true)
 
 
import org.apache.spark.sql.functions._
itemDF.select(min("i_wholesale_cost").alias("Minn"), max("i_wholesale_cost").alias("Maxx")).show()

+----+-----+
|Minn| Maxx|
+----+-----+
|0.02|87.36|
+----+-----+

Or

import org.apache.spark.sql.functions._

val minMaxDF = itemDF.select(min("i_wholesale_cost"),max("i_wholesale_cost")).withColumnRenamed("min(i_wholesale_cost)","Minn").withColumnRenamed("max(i_wholesale_cost)","Maxx")
minMaxDF.show()

+----+-----+
|Minn| Maxx|
+----+-----+
|0.02|87.36|
+----+-----+


// Filter condition
display(itemDF.filter($"i_wholesale_cost" === 0.02))



import org.apache.spark.sql.functions._
val minWSCostDF = itemDF.select(min("i_wholesale_cost").alias("Minn"))

minWSCostDF.show()

+----+
|Minn|
+----+
|0.02|
+----+

val cheapestItemDF = itemDF.join(minWSCostDF,itemDF.col("i_wholesale_cost") === minWSCostDF.col("Minn"),"inner")

cheapestItemDF.select("i_item_desc","i_current_price","i_category").show()

+--------------------+---------------+--------------------+
|         i_item_desc|i_current_price|          i_category|
+--------------------+---------------+--------------------+
|Forces can testif...|           0.09|Books            ...|
|Forces can testif...|           9.21|Sports           ...|
+--------------------+---------------+--------------------+

 

import org.apache.spark.sql.functions._
itemDF.select(max("i_current_price").alias("MaxCurrentPrice")).show()

+---------------+
|MaxCurrentPrice|
+---------------+
|          99.99|
+---------------+


val storeSalesDF = spark.read.format("csv").option("header","true").option("sep","|").option("inferSchema","true").load("/FileStore/tables/retailer/data/store_sales.dat")

storeSalesDF.count()
2880404

storeSalesDF.printSchema


storeSalesDF.printSchema
root
 |-- ss_sold_date_sk: integer (nullable = true)
 |-- ss_sold_time_sk: integer (nullable = true)
 |-- ss_item_sk: integer (nullable = true)
 |-- ss_customer_sk: integer (nullable = true)
 |-- ss_cdemo_sk: integer (nullable = true)
 |-- ss_hdemo_sk: integer (nullable = true)
 |-- ss_addr_sk: integer (nullable = true)
 |-- ss_store_sk: integer (nullable = true)
 |-- ss_promo_sk: integer (nullable = true)
 |-- ss_ticket_number: integer (nullable = true)
 |-- ss_quantity: integer (nullable = true)
 |-- ss_wholesale_cost: double (nullable = true)
 |-- ss_list_price: double (nullable = true)
 |-- ss_sales_price: double (nullable = true)
 |-- ss_ext_discount_amt: double (nullable = true)
 |-- ss_ext_sales_price: double (nullable = true)
 |-- ss_ext_wholesale_cost: double (nullable = true)
 |-- ss_ext_list_price: double (nullable = true)
 |-- ss_ext_tax: double (nullable = true)
 |-- ss_coupon_amt: double (nullable = true)
 |-- ss_net_paid: double (nullable = true)
 |-- ss_net_paid_inc_tax: double (nullable = true)
 |-- ss_net_profit: double (nullable = true)
 
 
 import org.apache.spark.sql.functions.sum
 
 storeSalesDF.select(sum("ss_net_paid_inc_tax").alias("NetIncomeTaxPaid"), sum("ss_net_profit").alias("NetProfit")).show()
 
+-------------------+--------------------+
|   NetIncomeTaxPaid|           NetProfit|
+-------------------+--------------------+
|4.954755825529975E9|-2.276100670919993E9|
+-------------------+--------------------+

import org.apache.spark.sql.functions.{sum,sumDistinct}

storeSalesDF.select(sumDistinct("ss_quantity"),sum("ss_quantity")).show()

+-------------------------+----------------+
|sum(DISTINCT ss_quantity)|sum(ss_quantity)|
+-------------------------+----------------+
|                     5050|       138943711|
+-------------------------+----------------+



import org.apache.spark.sql.functions.{avg,mean}
storeSalesDF.select(avg("ss_quantity"),mean("ss_quantity")).show()

+-----------------+-----------------+
| avg(ss_quantity)| avg(ss_quantity)|
+-----------------+-----------------+
|50.51749085953793|50.51749085953793|
+-----------------+-----------------+

import org.apache.spark.sql.functions.{avg,mean,min,max}

storeSalesDF.select(
avg("ss_quantity").as("Average_Purchases"),
mean("ss_quantity").as("Mean_Purchases"),
sum("ss_quantity") / count("ss_quantity"),
min("ss_quantity").as("Min_Purchases"),
max("ss_quantity").alias("Max_Purchases")).show()
+-----------------+-----------------+---------------------------------------+-------------+-------------+
|Average_Purchases|   Mean_Purchases|(sum(ss_quantity) / count(ss_quantity))|Min_Purchases|Max_Purchases|
+-----------------+-----------------+---------------------------------------+-------------+-------------+
|50.51749085953793|50.51749085953793|                      50.51749085953793|            1|          100|
+-----------------+-----------------+---------------------------------------+-------------+-------------+

Customer, CustomerAddress, IncomeBand - DataSets in Spark with Scala

val df = spark.read.format("csv").option("inferSchema","True").option("header","true").load("/FileStore/tables/retailer/data/customer.csv") 

df.printSchema

root
 |-- c_customer_sk: integer (nullable = true)
 |-- c_customer_id: string (nullable = true)
 |-- c_current_cdemo_sk: integer (nullable = true)
 |-- c_current_hdemo_sk: integer (nullable = true)
 |-- c_current_addr_sk: integer (nullable = true)
 |-- c_first_shipto_date_sk: integer (nullable = true)
 |-- c_first_sales_date_sk: integer (nullable = true)
 |-- c_salutation: string (nullable = true)
 |-- c_first_name: string (nullable = true)
 |-- c_last_name: string (nullable = true)
 |-- c_preferred_cust_flag: string (nullable = true)
 |-- c_birth_day: integer (nullable = true)
 |-- c_birth_month: integer (nullable = true)
 |-- c_birth_year: integer (nullable = true)
 |-- c_birth_country: string (nullable = true)
 |-- c_login: string (nullable = true)
 |-- c_email_address: string (nullable = true)
 |-- c_last_review_date: double (nullable = true)
 
 df.columns.foreach(println)
 
 
c_customer_sk
c_customer_id
c_current_cdemo_sk
c_current_hdemo_sk
c_current_addr_sk
c_first_shipto_date_sk
c_first_sales_date_sk
c_salutation
c_first_name
c_last_name
c_preferred_cust_flag
c_birth_day
c_birth_month
c_birth_year
c_birth_country
c_login
c_email_address
c_last_review_date

df.columns.length
18

df.select("c_customer_id","c_first_name","c_last_name","c_birth_year","c_birth_month","c_birth_day").show(3,truncate=false)


+----------------+--------------------+------------------------------+------------+-------------+-----------+
|c_customer_id   |c_first_name        |c_last_name                   |c_birth_year|c_birth_month|c_birth_day|
+----------------+--------------------+------------------------------+------------+-------------+-----------+
|AAAAAAAABAAAAAAA|Javier              |Lewis                         |1936        |12           |9          |
|AAAAAAAACAAAAAAA|Amy                 |Moses                         |1966        |4            |9          |
|AAAAAAAADAAAAAAA|Latisha             |Hamilton                      |1979        |9            |18         |
+----------------+--------------------+------------------------------+------------+-------------+-----------+



df.select("c_customer_id","c_first_name","c_last_name","c_birth_year","c_birth_month","c_birth_day").show(3)


import org.apache.spark.sql.functions.{col, column}

df.select(col("c_last_name"),column("c_first_name"), $"c_birth_year").show(3)


+--------------------+--------------------+------------+
|         c_last_name|        c_first_name|c_birth_year|
+--------------------+--------------------+------------+
|Lewis            ...|Javier              |        1936|
|Moses            ...|Amy                 |        1966|
|Hamilton         ...|Latisha             |        1979|
+--------------------+--------------------+------------+


val customerWithBday = df.select("c_customer_id","c_first_name","c_last_name","c_birth_year","c_birth_month","c_birth_day")


customerWithBday.printSchema()

root
 |-- c_customer_id: string (nullable = true)
 |-- c_first_name: string (nullable = true)
 |-- c_last_name: string (nullable = true)
 |-- c_birth_year: integer (nullable = true)
 |-- c_birth_month: integer (nullable = true)
 |-- c_birth_day: integer (nullable = true)
 
 
 customerWithBday.schema
 
 
 res21: org.apache.spark.sql.types.StructType = StructType(
StructField(c_customer_id,StringType,true), 
StructField(c_first_name,StringType,true), 
StructField(c_last_name,StringType,true), 
StructField(c_birth_year,IntegerType,true), 
StructField(c_birth_month,IntegerType,true), 
StructField(c_birth_day,IntegerType,true))
%fs head /FileStore/tables/retailer/data/customer_address.dat

ca_address_sk|ca_address_id|ca_street_number|ca_street_name|ca_street_type|ca_suite_number|ca_city|ca_county|ca_state|ca_zip|ca_country|ca_gmt_offset|ca_location_type
1|AAAAAAAABAAAAAAA|18        |Jackson |Parkway        |Suite 280 |Fairfield|Maricopa County|AZ|86192     |United States|-7.00|condo               
2|AAAAAAAACAAAAAAA|362       |Washington 6th|RD             |Suite 80  |Fairview|Taos County|NM|85709     |United States|-7.00|condo  

 
 
 /FileStore/tables/retailer/data/customer_address.dat
 
 
 val custAddDF = spark.read.format("csv"). option("header","true").option("sep","|").option("inferSchema","true").load("/FileStore/tables/retailer/data/customer_address.dat")
 
 custAddDF.printSchema()
 
 root
 |-- ca_address_sk: integer (nullable = true)
 |-- ca_address_id: string (nullable = true)
 |-- ca_street_number: double (nullable = true)  -- wrong 
 |-- ca_street_name: string (nullable = true)
 |-- ca_street_type: string (nullable = true)
 |-- ca_suite_number: string (nullable = true)
 |-- ca_city: string (nullable = true)
 |-- ca_county: string (nullable = true)
 |-- ca_state: string (nullable = true)
 |-- ca_zip: double (nullable = true)
 |-- ca_country: string (nullable = true)
 |-- ca_gmt_offset: double (nullable = true) -- change it into decimal (5,2)
 |-- ca_location_type: string (nullable = true)
 
  
 
val custAddSchemaDDL  = "ca_address_sk long, ca_address_id string, ca_street_number string, ca_street_name string, "  +
"ca_street_type string, ca_suite_number string, ca_city string, " +
"ca_county string, ca_state string, ca_zip string, ca_country string, ca_gmt_offset decimal(5,2), ca_location_type string"

val custAddDF = spark.read.format("csv").schema(custAddSchemaDDL).option("header","true").option("sep","|").option("inferSchema","true").load("/FileStore/tables/retailer/data/customer_address.dat")

custAddDF.printSchema()

root
 |-- ca_address_sk: long (nullable = true)
 |-- ca_address_id: string (nullable = true)
 |-- ca_street_number: string (nullable = true)
 |-- ca_street_name: string (nullable = true)
 |-- ca_street_type: string (nullable = true)
 |-- ca_suite_number: string (nullable = true)
 |-- ca_city: string (nullable = true)
 |-- ca_county: string (nullable = true)
 |-- ca_state: string (nullable = true)
 |-- ca_zip: string (nullable = true)
 |-- ca_country: string (nullable = true)
 |-- ca_gmt_offset: decimal(5,2) (nullable = true)
 |-- ca_location_type: string (nullable = true)
 
 
 custAddDF.schema.toDDL
 
 res38: String = `ca_address_sk` BIGINT,`ca_address_id` STRING,`ca_street_number` STRING,`ca_street_name` STRING,`ca_street_type` STRING,`ca_suite_number` STRING,`ca_city` STRING,`ca_county` STRING,`ca_state` STRING,`ca_zip` STRING,`ca_country` STRING,`ca_gmt_offset` DECIMAL(5,2),`ca_location_type` STRING



custAddDF.schema

res39: org.apache.spark.sql.types.StructType = StructType(StructField(ca_address_sk,LongType,true), StructField(ca_address_id,StringType,true), StructField(ca_street_number,StringType,true), StructField(ca_street_name,StringType,true), StructField(ca_street_type,StringType,true), StructField(ca_suite_number,StringType,true), StructField(ca_city,StringType,true), StructField(ca_county,StringType,true), StructField(ca_state,StringType,true), StructField(ca_zip,StringType,true), StructField(ca_country,StringType,true), StructField(ca_gmt_offset,DecimalType(5,2),true), StructField(ca_location_type,StringType,true))


val incomeBandDF = spark.read.format("csv").schema("ib_lower_band_sk long, ib_lower_bound int, ib_upper_bound int").option("sep","|").load("/FileStore/tables/retailer/data/income_band.dat")

incomeBandDF.printSchema()

root
 |-- ib_lower_band_sk: long (nullable = true)
 |-- ib_lower_bound: integer (nullable = true)
 |-- ib_upper_bound: integer (nullable = true)
 
 
 incomeBandDF.show(5)
 
 +----------------+--------------+--------------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|
+----------------+--------------+--------------+
|               1|             0|         10000|
|               2|         10001|         20000|
|               3|         20001|         30000|
|               4|         30001|         40000|
|               5|         40001|         50000|
+----------------+--------------+--------------+

// create a new column 
val incomeBandwithGroups = incomeBandDF.withColumn("isFirstIncomeGroup",incomeBandDF.col("ib_upper_bound") <= 60000)

incomeBandwithGroups.show(5)

+----------------+--------------+--------------+------------------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|isFirstIncomeGroup|
+----------------+--------------+--------------+------------------+
|               1|             0|         10000|              true|
|               2|         10001|         20000|              true|
|               3|         20001|         30000|              true|
|               4|         30001|         40000|              true|
|               5|         40001|         50000|              true|
+----------------+--------------+--------------+------------------+


// create static column with constant value we need to use lit
import org.apache.spark.sql.functions.{col, lit, when}

val incomeBandwithGroups = incomeBandDF.
withColumn("isFirstIncomeGroup",incomeBandDF.col("ib_upper_bound") <= 60000).
withColumn("isSecodIncomeGroup",incomeBandDF("ib_upper_bound") > 60000 and incomeBandDF("ib_upper_bound") <= 12000).
withColumn("isThirdIncomeGroup",incomeBandDF("ib_upper_bound") > 12000 and incomeBandDF("ib_upper_bound") < 200000).
withColumn("demo",lit("demoValue"))
incomeBandwithGroups.show(5)

+----------------+--------------+--------------+------------------+------------------+------------------+---------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|isFirstIncomeGroup|isSecodIncomeGroup|isThirdIncomeGroup|     demo|
+----------------+--------------+--------------+------------------+------------------+------------------+---------+
|               1|             0|         10000|              true|             false|             false|demoValue|
|               2|         10001|         20000|              true|             false|              true|demoValue|
|               3|         20001|         30000|              true|             false|              true|demoValue|
|               4|         30001|         40000|              true|             false|              true|demoValue|
|               5|         40001|         50000|              true|             false|              true|demoValue|
+----------------+--------------+--------------+------------------+------------------+------------------+---------+
// Rename an existing columns

val incomeClasses = incomeBandwithGroups.
withColumnRenamed("isThirdIncomeGroup","isHighIncomeClass").
withColumnRenamed("isFirstIncomeGroup","isStandardIncomeClass").
withColumnRenamed("isSecodIncomeGroup","isMediumIncomeClass")
incomeClasses.show(5)

+----------------+--------------+--------------+---------------------+-------------------+-----------------+---------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|isStandardIncomeClass|isMediumIncomeClass|isHighIncomeClass|     demo|
+----------------+--------------+--------------+---------------------+-------------------+-----------------+---------+
|               1|             0|         10000|                 true|              false|            false|demoValue|
|               2|         10001|         20000|                 true|              false|             true|demoValue|
|               3|         20001|         30000|                 true|              false|             true|demoValue|
|               4|         30001|         40000|                 true|              false|             true|demoValue|
|               5|         40001|         50000|                 true|              false|             true|demoValue|
+----------------+--------------+--------------+---------------------+-------------------+-----------------+---------+
only showing top 5 rows


// Remove columns
val onlyMediumDF = incomeClasses.drop("demo","isStandardIncomeClass", "isHighIncomeClass")

onlyMediumDF.show(5)


+----------------+--------------+--------------+-------------------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|isMediumIncomeClass|
+----------------+--------------+--------------+-------------------+
|               1|             0|         10000|              false|
|               2|         10001|         20000|              false|
|               3|         20001|         30000|              false|
|               4|         30001|         40000|              false|
|               5|         40001|         50000|              false|
+----------------+--------------+--------------+-------------------+


val custDF = spark.read.format("csv").option("inferSchema",true).option("header",true).option("sep","|").load("/FileStore/tables/retailer/data/customer.dat")

custDF.count()
res75: Long = 100000  -- total records 


// validate date range 
custDF.filter($"c_birth_day" > 0 and $"c_birth_day" <= 31 ).count()

res77: Long = 96539  -- valid records 



validRecordsDF.select("c_customer_id", "c_first_name","c_birth_day", "c_birth_month", "c_birth_year").show(5)


+----------------+--------------------+-----------+-------------+------------+
|   c_customer_id|        c_first_name|c_birth_day|c_birth_month|c_birth_year|
+----------------+--------------------+-----------+-------------+------------+
|AAAAAAAABAAAAAAA|Javier              |          9|           12|        1936|
|AAAAAAAACAAAAAAA|Amy                 |          9|            4|        1966|
|AAAAAAAADAAAAAAA|Latisha             |         18|            9|        1979|
|AAAAAAAAEAAAAAAA|Michael             |          7|            6|        1983|
|AAAAAAAAFAAAAAAA|Robert              |          8|            5|        1956|
+----------------+--------------------+-----------+-------------+------------+


// validate dates and months 
val validRecordsDF = custDF.filter($"c_birth_day" > 0 and $"c_birth_day" <= 31 and $"c_birth_month" > 0 and $"c_birth_month" <= 12 and 'c_birth_year' > 0)

validRecordsDF.count()
94791

validRecordsDF.count()
94791


Keys:
customer.c_current_addr_sk  
customer_address.ca_address_sk


val custDF = spark.read.format("csv").option("inferSchema",true).
              option("header",true).option("sep","|").load("/FileStore/tables/retailer/data/customer.dat")

val custAddDF = spark.read.format("csv").option("inferSchema",true).option("header",true).option("sep","|").
              load("/FileStore/tables/retailer/data/customer_address.dat")

val joinExpression = custDF.col("c_current_addr_sk") === custAddDF.col("ca_address_sk")
val custwithAddDF  = custDF.join(custAddDF,joinExpression,"inner").select("c_customer_id","ca_address_sk","c_first_name","c_last_name")




 custwithAddDF.show(5)
 +----------------+-------------+--------------------+--------------------+
|   c_customer_id|ca_address_sk|        c_first_name|         c_last_name|
+----------------+-------------+--------------------+--------------------+
|AAAAAAAABAAAAAAA|        32946|Javier              |Lewis            ...|
|AAAAAAAACAAAAAAA|        31655|Amy                 |Moses            ...|
|AAAAAAAADAAAAAAA|        48572|Latisha             |Hamilton         ...|
|AAAAAAAAEAAAAAAA|        39558|Michael             |White            ...|
|AAAAAAAAFAAAAAAA|        36368|Robert              |Moran            ...|
+----------------+-------------+--------------------+--------------------+



custAddDF.printSchema

root
 |-- ca_address_sk: integer (nullable = true)
 |-- ca_address_id: string (nullable = true)
 |-- ca_street_number: double (nullable = true)
 |-- ca_street_name: string (nullable = true)
 |-- ca_street_type: string (nullable = true)
 |-- ca_suite_number: string (nullable = true)
 |-- ca_city: string (nullable = true)
 |-- ca_county: string (nullable = true)
 |-- ca_state: string (nullable = true)
 |-- ca_zip: double (nullable = true)
 |-- ca_country: string (nullable = true)
 |-- ca_gmt_offset: double (nullable = true)
 |-- ca_location_type: string (nullable = true
 
 
 custDF.join(custAddDF,joinExpression,"inner").select("c_customer_id","c_first_name","c_last_name", "ca_street_name","ca_city").show(5)
 
 +----------------+--------------------+--------------------+--------------+-----------+
|   c_customer_id|        c_first_name|         c_last_name|ca_street_name|    ca_city|
+----------------+--------------------+--------------------+--------------+-----------+
|AAAAAAAABAAAAAAA|Javier              |Lewis            ...| Chestnut Main|Spring Hill|
|AAAAAAAACAAAAAAA|Amy                 |Moses            ...|       8th Oak|    Antioch|
|AAAAAAAADAAAAAAA|Latisha             |Hamilton         ...|Park Jefferson|Cedar Grove|
|AAAAAAAAEAAAAAAA|Michael             |White            ...|Cedar Sycamore|   Lakewood|
|AAAAAAAAFAAAAAAA|Robert              |Moran            ...|   Miller Main|   Waterloo|
+----------------+--------------------+--------------------+--------------+-----------+



custDF.count()
res89: Long = 100000


import org.apache.spark.sql.functions.count
custDF.select(count("*")).show()

100000



custDF.select(count("c_first_name")).show()
96508 -- null values ignored




custDF.filter($"c_first_name".isNotNull).count()
96508 -- null values ignored

// Display the count of all c_first_name is NULL
custDF.filter($"c_first_name".isNull).count()
3492


//countDistinct

import org.apache.spark.sql.functions.countDistinct

custDF.select(countDistinct("c_first_name")).show()

4131


custDF.select("c_first_name").distinct().count()
res98: Long = 4132

Saturday, 15 August 2020

FIFA 2019 DataSet Analytics with Spark and Scala

// FIFA 2019 Dataset Analytics with Spark and Scala

scala> val df = spark.read.format("csv").option("header","True").option("inferSchema","True").load("D:\\Ex\\FIFA\\data.csv")
df: org.apache.spark.sql.DataFrame = [ID: int, Name: int ... 86 more fields]

scala> df.printSchema()
root
 |-- _c0: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Name: integer (nullable = true)
 |-- Age: string (nullable = true)
 |-- Photo: integer (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- Flag: string (nullable = true)
 |-- Overall: string (nullable = true)
 |-- Potential: integer (nullable = true)
 |-- Club: integer (nullable = true)
 |-- Club Logo: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Wage: string (nullable = true)
 |-- Special: string (nullable = true)
 |-- Preferred Foot: integer (nullable = true)
 |-- International Reputation: string (nullable = true)
 |-- Weak Foot: integer (nullable = true)
 |-- Skill Moves: integer (nullable = true)
 |-- Work Rate: integer (nullable = true)
 |-- Body Type: string (nullable = true)
 |-- Real Face: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Jersey Number: string (nullable = true)
 |-- Joined: integer (nullable = true)
 |-- Loaned From: string (nullable = true)
 |-- Contract Valid Until: string (nullable = true)
 |-- Height: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- LS: string (nullable = true)
 |-- ST: string (nullable = true)
 |-- RS: string (nullable = true)
 |-- LW: string (nullable = true)
 |-- LF: string (nullable = true)
 |-- CF: string (nullable = true)
 |-- RF: string (nullable = true)
 |-- RW: string (nullable = true)
 |-- LAM: string (nullable = true)
 |-- CAM: string (nullable = true)
 |-- RAM: string (nullable = true)
 |-- LM: string (nullable = true)
 |-- LCM: string (nullable = true)
 |-- CM: string (nullable = true)
 |-- RCM: string (nullable = true)
 |-- RM: string (nullable = true)
 |-- LWB: string (nullable = true)
 |-- LDM: string (nullable = true)
 |-- CDM: string (nullable = true)
 |-- RDM: string (nullable = true)
 |-- RWB: string (nullable = true)
 |-- LB: string (nullable = true)
 |-- LCB: string (nullable = true)
 |-- CB: string (nullable = true)
 |-- RCB: string (nullable = true)
 |-- RB: string (nullable = true)
 |-- Crossing: string (nullable = true)
 |-- Finishing: integer (nullable = true)
 |-- HeadingAccuracy: integer (nullable = true)
 |-- ShortPassing: integer (nullable = true)
 |-- Volleys: integer (nullable = true)
 |-- Dribbling: integer (nullable = true)
 |-- Curve: integer (nullable = true)
 |-- FKAccuracy: integer (nullable = true)
 |-- LongPassing: integer (nullable = true)
 |-- BallControl: integer (nullable = true)
 |-- Acceleration: integer (nullable = true)
 |-- SprintSpeed: integer (nullable = true)
 |-- Agility: integer (nullable = true)
 |-- Reactions: integer (nullable = true)
 |-- Balance: integer (nullable = true)
 |-- ShotPower: integer (nullable = true)
 |-- Jumping: integer (nullable = true)
 |-- Stamina: integer (nullable = true)
 |-- Strength: integer (nullable = true)
 |-- LongShots: integer (nullable = true)
 |-- Aggression: integer (nullable = true)
 |-- Interceptions: integer (nullable = true)
 |-- Positioning: integer (nullable = true)
 |-- Vision: integer (nullable = true)
 |-- Penalties: integer (nullable = true)
 |-- Composure: integer (nullable = true)
 |-- Marking: integer (nullable = true)
 |-- StandingTackle: integer (nullable = true)
 |-- SlidingTackle: integer (nullable = true)
 |-- GKDiving: integer (nullable = true)
 |-- GKHandling: integer (nullable = true)
 |-- GKKicking: integer (nullable = true)
 |-- GKPositioning: integer (nullable = true)
 |-- GKReflexes: integer (nullable = true)
 |-- Release Clause: integer (nullable = true)
 
// Total Row Count
scala> df.count()
res1: Long = 18207


// Display all the column names 
scala> df.columns.foreach(println)
_c0
ID
Name
Age
Photo
Nationality
Flag
Overall
Potential
Club
Club Logo
Value
Wage
Special
Preferred Foot
International Reputation
Weak Foot
Skill Moves
Work Rate
Body Type
Real Face
Position
Jersey Number
Joined
Loaned From
Contract Valid Until
Height
Weight
LS
ST
RS
LW
LF
CF
RF
RW
LAM
CAM
RAM
LM
LCM
CM
RCM
RM
LWB
LDM
CDM
RDM
RWB
LB
LCB
CB
RCB
RB
Crossing
Finishing
HeadingAccuracy
ShortPassing
Volleys
Dribbling
Curve
FKAccuracy
LongPassing
BallControl
Acceleration
SprintSpeed
Agility
Reactions
Balance
ShotPower
Jumping
Stamina
Strength
LongShots
Aggression
Interceptions
Positioning
Vision
Penalties
Composure
Marking
StandingTackle
SlidingTackle
GKDiving
GKHandling
GKKicking
GKPositioning
GKReflexes
Release Clause


// Total number of columns
scala> df.columns.length
res4: Int = 88

scala> df.columns.size
res5: Int = 88



scala> df.describe("Value").show()
+-------+-----+
|summary|Value|
+-------+-----+
|  count|18207|
|   mean| null|
| stddev| null|
|    min|   ?0|
|    max|  ?9M|
+-------+-----+

// Display Name and Nationality
scala> df.select("Name","Nationality").show(25)
+-----------------+-----------+
|             Name|Nationality|
+-----------------+-----------+
|         L. Messi|  Argentina|
|Cristiano Ronaldo|   Portugal|
|        Neymar Jr|     Brazil|
|           De Gea|      Spain|
|     K. De Bruyne|    Belgium|
|        E. Hazard|    Belgium|
|        L. Modri?|    Croatia|
|        L. Suárez|    Uruguay|
|     Sergio Ramos|      Spain|
|         J. Oblak|   Slovenia|
|   R. Lewandowski|     Poland|
|         T. Kroos|    Germany|
|         D. Godín|    Uruguay|
|      David Silva|      Spain|
|         N. Kanté|     France|
|        P. Dybala|  Argentina|
|          H. Kane|    England|
|     A. Griezmann|     France|
|    M. ter Stegen|    Germany|
|      T. Courtois|    Belgium|
|  Sergio Busquets|      Spain|
|        E. Cavani|    Uruguay|
|         M. Neuer|    Germany|
|        S. Agüero|  Argentina|
|     G. Chiellini|      Italy|
+-----------------+-----------+
only showing top 25 rows


// Display the total players count for each Nationality 
scala> df.groupBy("Nationality").agg(count("Name").alias("Total Player")).show(2000)
+--------------------+------------+
|         Nationality|Total Player|
+--------------------+-----------+
|                Chad|          2|
|              Russia|         79|
|            Paraguay|         85|
|             Senegal|        130|
|              Sweden|        397|
|              Guyana|          3|
|         Philippines|          2|
|             Eritrea|          2|
|                Fiji|          1|
|              Turkey|        303|
|                Iraq|          7|
|             Germany|       1198|
|      St Kitts Nevis|          3|
|             Comoros|          6|
|         Afghanistan|          4|
|         Ivory Coast|        100|
|              Jordan|          1|
|              Rwanda|          1|
|               Sudan|          3|
|              France|        914|
|              Greece|        102|
|              Kosovo|         33|
|Central African Rep.|          3|
|            DR Congo|         52|
|          Montserrat|          4|
|             Algeria|         60|
|                Togo|         12|
|   Equatorial Guinea|          5|
|            Slovakia|         54|
|           Argentina|        937|
|               Wales|        129|
|             Belgium|        260|
|              Angola|         15|
|            St Lucia|          1|
|             Ecuador|         43|
|               Qatar|          1|
|             Albania|         40|
|          Madagascar|         12|
|             Finland|         67|
|       New Caledonia|          1|
|               Ghana|        114|
|           Nicaragua|          2|
|                Peru|         37|
|               Benin|         15|
|        Sierra Leone|          6|
|       United States|        353|
|             Curacao|         14|
|               India|         30|
|             Belarus|          4|
|              Kuwait|          1|
|               Malta|          1|
|               Chile|        391|
|         Puerto Rico|          1|
|             Croatia|        126|
|             Burundi|          3|
|             Nigeria|        121|
|             Bolivia|         30|
|             Andorra|          1|
|               Gabon|         15|
|      Korea Republic|        335|
|               Italy|        702|
|            Suriname|          4|
|           Lithuania|          8|
|              Norway|        341|
|               Spain|       1072|
|                Cuba|          4|
|          Mauritania|          4|
|             Denmark|        336|
|               Niger|          3|
|            Barbados|          3|
|                Iran|         17|
|               Congo|         25|
|       Liechtenstein|          3|
|            Thailand|          5|
|             Morocco|         85|
|          Cape Verde|         19|
|              Panama|         15|
|           Hong Kong|          2|
|           Korea DPR|          4|
|             Ukraine|         73|
|           Venezuela|         67|
|             Iceland|         47|
|              Israel|         14|
|  Bosnia Herzegovina|         61|
|                Oman|          1|
|              Cyprus|          8|
|           Palestine|          1|
|             Uruguay|        149|
|              Mexico|        366|
|       FYR Macedonia|         20|
|          Montenegro|         23|
|            Zimbabwe|         13|
|             Estonia|         13|
|             Georgia|         26|
|           Indonesia|          1|
|           Guatemala|          3|
|                Guam|          1|
|               Libya|          4|
|          Azerbaijan|          5|
|             Grenada|          1|
|             Armenia|         10|
|             Tunisia|         32|
|             Liberia|          1|
|               Syria|          9|
|            Honduras|         16|
|        Saudi Arabia|        340|
|              Uganda|          6|
|             Namibia|          3|
|         Switzerland|        220|
|              Zambia|          9|
|            Ethiopia|          1|
|             Jamaica|         32|
|              Latvia|          6|
|United Arab Emirates|          1|
|         South Sudan|          1|
|              Guinea|         31|
|              Canada|         64|
|          Uzbekistan|          2|
|       Faroe Islands|          6|
|      Czech Republic|        100|
|          Mozambique|          4|
|              Brazil|        827|
|              Belize|          1|
|               Kenya|         10|
|              Gambia|         15|
|             Lebanon|          1|
|            Slovenia|         55|
|  Dominican Republic|          2|
|               Japan|        478|
|            Tanzania|          3|
|            Botswana|          1|
| Republic of Ireland|        368|
|          Luxembourg|          8|
|         New Zealand|         44|
|             England|       1662|
|   Trinidad & Tobago|          4|
|            China PR|        392|
|               Haiti|         10|
|              Poland|        350|
|            Portugal|        322|
|            Cameroon|         90|
|           Australia|        236|
|             Romania|         54|
|            Bulgaria|         32|
|             Austria|        298|
|               Egypt|         31|
|          Costa Rica|         30|
|         El Salvador|          5|
|          Kazakhstan|          4|
|              Serbia|        126|
|        Burkina Faso|         16|
|        South Africa|         71|
|             Bermuda|          2|
|            Scotland|        286|
|            Colombia|        618|
|    Northern Ireland|         80|
|             Hungary|         38|
|       Guinea Bissau|         15|
|   Antigua & Barbuda|          4|
|           Mauritius|          1|
|             Moldova|          5|
|         Netherlands|        453|
|                Mali|         43|
| S?o Tomé & Príncipe|          1|
+--------------------+-----------+


// Top 20 Nationalities in ASCENDING order 
scala> df.groupBy("Nationality").agg(count("Name").alias("Total Player")).orderBy("Nationality").show(
+------------------+------------+
|       Nationality|Total Player|
+------------------+------------+
|       Afghanistan|           4|
|           Albania|          40|
|           Algeria|          60|
|           Andorra|           1|
|            Angola|          15|
| Antigua & Barbuda|           4|
|         Argentina|         937|
|           Armenia|          10|
|         Australia|         236|
|           Austria|         298|
|        Azerbaijan|           5|
|          Barbados|           3|
|           Belarus|           4|
|           Belgium|         260|
|            Belize|           1|
|             Benin|          15|
|           Bermuda|           2|
|           Bolivia|          30|
|Bosnia Herzegovina|          61|
|          Botswana|           1|
+------------------+------------+


// Nationality wise descending order 
scala> df.groupBy("Nationality").agg(count("Name").alias("Total Player")).orderBy(desc("Nationality")).show(20)
+--------------------+------------+
|         Nationality|Total Player|
+--------------------+------------+
|            Zimbabwe|          13|
|              Zambia|           9|
|               Wales|         129|
|           Venezuela|          67|
|          Uzbekistan|           2|
|             Uruguay|         149|
|       United States|         353|
|United Arab Emirates|           1|
|             Ukraine|          73|
|              Uganda|           6|
|              Turkey|         303|
|             Tunisia|          32|
|   Trinidad & Tobago|           4|
|                Togo|          12|
|            Thailand|           5|
|            Tanzania|           3|
| S?o Tomé & Príncipe|           1|
|               Syria|           9|
|         Switzerland|         220|
|              Sweden|         397|
+--------------------+------------+
only showing top 20 rows

// Total Players for each Nationality in ASCending Order
scala> df.groupBy("Nationality").agg(count("Name").alias("TotalPlayer")).orderBy(desc("TotalPlayer")).show(20)
+-------------------+-----------+
|        Nationality|TotalPlayer|
+-------------------+-----------+
|            England|       1662|
|            Germany|       1198|
|              Spain|       1072|
|          Argentina|        937|
|             France|        914|
|             Brazil|        827|
|              Italy|        702|
|           Colombia|        618|
|              Japan|        478|
|        Netherlands|        453|
|             Sweden|        397|
|           China PR|        392|
|              Chile|        391|
|Republic of Ireland|        368|
|             Mexico|        366|
|      United States|        353|
|             Poland|        350|
|             Norway|        341|
|       Saudi Arabia|        340|
|            Denmark|        336|
+-------------------+-----------+
only showing top 20 rows


// Display Player Name, Nationality, Club informations

scala> df.select("Name","Nationality","Club").show()
+-----------------+-----------+-------------------+
|             Name|Nationality|               Club|
+-----------------+-----------+-------------------+
|         L. Messi|  Argentina|       FC Barcelona|
|Cristiano Ronaldo|   Portugal|           Juventus|
|        Neymar Jr|     Brazil|Paris Saint-Germain|
|           De Gea|      Spain|  Manchester United|
|     K. De Bruyne|    Belgium|    Manchester City|
|        E. Hazard|    Belgium|            Chelsea|
|        L. Modri?|    Croatia|        Real Madrid|
|        L. Suárez|    Uruguay|       FC Barcelona|
|     Sergio Ramos|      Spain|        Real Madrid|
|         J. Oblak|   Slovenia|    Atlético Madrid|
|   R. Lewandowski|     Poland|  FC Bayern München|
|         T. Kroos|    Germany|        Real Madrid|
|         D. Godín|    Uruguay|    Atlético Madrid|
|      David Silva|      Spain|    Manchester City|
|         N. Kanté|     France|            Chelsea|
|        P. Dybala|  Argentina|           Juventus|
|          H. Kane|    England|  Tottenham Hotspur|
|     A. Griezmann|     France|    Atlético Madrid|
|    M. ter Stegen|    Germany|       FC Barcelona|
|      T. Courtois|    Belgium|        Real Madrid|
+-----------------+-----------+-------------------+
only showing top 20 rows


// Age is Lesser than 30 
scala> df.select("ID","Name","Age","Nationality","Potential","Jersey Number","Club").filter("Age < 30").show()
+------+---------------+---+-----------+---------+-------------+-------------------+
|    ID|           Name|Age|Nationality|Potential|Jersey Number|               Club|
+------+---------------+---+-----------+---------+-------------+-------------------+
|190871|      Neymar Jr| 26|     Brazil|       93|           10|Paris Saint-Germain|
|193080|         De Gea| 27|      Spain|       93|            1|  Manchester United|
|192985|   K. De Bruyne| 27|    Belgium|       92|            7|    Manchester City|
|183277|      E. Hazard| 27|    Belgium|       91|           10|            Chelsea|
|200389|       J. Oblak| 25|   Slovenia|       93|            1|    Atlético Madrid|
|188545| R. Lewandowski| 29|     Poland|       90|            9|  FC Bayern München|
|182521|       T. Kroos| 28|    Germany|       90|            8|        Real Madrid|
|215914|       N. Kanté| 27|     France|       90|           13|            Chelsea|
|211110|      P. Dybala| 24|  Argentina|       94|           21|           Juventus|
|202126|        H. Kane| 24|    England|       91|            9|  Tottenham Hotspur|
|194765|   A. Griezmann| 27|     France|       90|            7|    Atlético Madrid|
|192448|  M. ter Stegen| 26|    Germany|       92|           22|       FC Barcelona|
|192119|    T. Courtois| 26|    Belgium|       90|            1|        Real Madrid|
|189511|Sergio Busquets| 29|      Spain|       89|            5|       FC Barcelona|
|231747|      K. Mbappé| 19|     France|       95|           10|Paris Saint-Germain|
|209331|       M. Salah| 26|      Egypt|       89|           10|          Liverpool|
|200145|       Casemiro| 26|     Brazil|       90|           14|        Real Madrid|
|198710|   J. Rodríguez| 26|   Colombia|       89|           10|  FC Bayern München|
|198219|     L. Insigne| 27|      Italy|       88|           10|             Napoli|
|197781|           Isco| 26|      Spain|       91|           22|        Real Madrid|
+------+---------------+---+-----------+---------+-------------+-------------------+
only showing top 20 rows

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...