Friday, 28 August 2020

Pattern Matching Example in Scala

  • pattern matching is similar to switch statements in C#, Java
  • no fall-through - at least one condition matched
  • no breaks

object PatternExa {
  def PatternMatchExa(dayOfWeek:String) = {
    val MenuOfTheDay = dayOfWeek match {
      case "Mon" => "Tanjavur Idly"
      case "Tue" => "Manapparai Murukku"
      case "Wed" => "Chettinad Sweets"
      case "Thu" => "Madurai Idiyappam"
      case "Fri" => "Mysoor Bonda"
      case "Sat" => "Rava Idli"
      case "Sun" => "Salem Thattuvadai"
      case _     => "Something Special"
    }
       MenuOfTheDay
  }

def main(args:Array[String]){
  println(PatternMatchExa("Mon"))
  println(PatternMatchExa("Sunday"))
  }
}

Tanjavur Idly
Something Else



object PatternExa {
  def PatternMatchExa(dayOfWeek:String) = {
    val MenuOfTheDay = dayOfWeek match {
      case "Mon" | "Wed" | "Fri" => "Dindigual Chicken Biryani"
      case "Tue" | "Thu" | "Sat" => "Chettinadu Chicken Biryani"
      case "Sun" => "Hyderabadi Chicken Biryani"
      case _     => "Something Special"
    }
       MenuOfTheDay
  }

def main(args:Array[String]){
  println(PatternMatchExa("Mon"))
  println(PatternMatchExa("Sunday"))
  }
}


Result:
Dindigual Chicken Biryani
Something Special


String Interpolation Example in Scala

String Interpolation Example in Scala: 

f, s, raw are the String Interpolation 

Embed variable references directly in processed string

object StringInterpolation {
def main(args:Array[String]){
  val name = "Sankaranarayanan"
  val height = 179.23423423
  println(s"Name : $name, Height : $height")
  println(f"Name : $name%s, Height : $height%f")
  println(s"Hello\t\t\tworld!")
  println(raw"Hello\t\t\tworld")  // Display the raw unformatted code
 }
}



Result:

Name : Sankaranarayanan, Height : 179.23423423
Name : Sankaranarayanan, Height : 179.234234
Hello world!
Hello\t\t\tworld

Thursday, 27 August 2020

Class, Object, Companion Object, Companion Class in Scala with Examples

class AreaOfSquare {  // class
  var lengthOftheSide = 10;

  private var msg = "Welcome"
  private def displayMessage() = {
    println("Message : " + msg)
  }

 def area(): Unit = {
    {
      var ar = lengthOftheSide * lengthOftheSide
      println("Length : " + lengthOftheSide)
      print ("Area Of Square : "+ar)
    }
  }
}

object AreaOfSquare {  // object
  var lengthOftheSide = 10;

  private var msg = "Welcome"
  private def displayMessage() = {
    println("Message : " + msg)
  }

  def area(): Unit =
  {
    var ar = lengthOftheSide * lengthOftheSide
    println("Length : " + lengthOftheSide)
    print ("Area Of Square : "+ar)
  }

}

class my1st { // companion class
  var lengthOftheSide = 10;

  private var msg = ""
  private def displayMessage() = {
    println("Message : " + msg)
  }

  def area(): Unit = {
    {
      var ar = lengthOftheSide * lengthOftheSide
      println("Length : " + lengthOftheSide)
      print ("Area Of Square : "+ar)
    }

  }

}

object my1st {
  def main(args:Array[String]) : Unit = {
    var obj = new AreaOfSquare() /*Creating instance of the class*/
    obj.lengthOftheSide = 15
    // obj.msg = "Something" // inaccessible because the variable is declared as private
    // obj.displayMessage()  // inaccessible because the variable is declared as private
    obj.area()

    println()

    AreaOfSquare.lengthOftheSide = 11 // Simply calling the object - static equivalent - no need to instantiate*/
    //  AreaOfSquare.msg = "Something"  // inaccessible because the variable is declared as private
   // AreaOfSquare.displayMessage() // inaccessible because the variable is declared as private
    AreaOfSquare.area()

    println()

    var myobj = new my1st() // object name = class name --> companion object, class

    /* Companion object : class and object where we calling the class name both are same
    * A companion object is allowed to access both private methods and private fields of the class.*/
    myobj.lengthOftheSide = 8
    myobj.area()

    myobj.msg = "Welcome to Companion Objects and Class" // companion object can access private fields and methods
    myobj.displayMessage() // companion object can access private fields and methods
  }
}


Result : 
Length : 15
Area Of Square : 225
Length : 11
Area Of Square : 121
Length : 8
Area Of Square : 64Message : Welcome to Companion Objects and Class

Wednesday, 26 August 2020

SBT build tool - Pass command line argument to Scala Main


// program expects command line arguments

object my1st {
  def main(args:Array[String]) : Unit = {
    println("Hello " + args(0))
  }
}


sbt package --> To build jar file

C:\Users\sankara\IdeaProjects\my1st>sbt package
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
[info] welcome to sbt 1.3.13 (Oracle Corporation Java 1.8.0_202)
[info] loading project definition from C:\Users\sankara\IdeaProjects\my1st\project
[info] loading settings for project my1st from build.sbt ...
[info] set current project to my1st (in build file:/C:/Users/sankara/IdeaProjects/my1st/)
[info] Compiling 1 Scala source to C:\Users\sankara\IdeaProjects\my1st\target\scala-2.11\classes ...
[success] Total time: 4 s, completed 26 Aug, 2020 1:46:21 PM


sbt "runMain objectName Argument1" --> to run the jar file with passing the command line arguments

C:\Users\sankara\IdeaProjects\my1st>sbt "runMain my1st India"
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
[info] welcome to sbt 1.3.13 (Oracle Corporation Java 1.8.0_202)
[info] loading project definition from C:\Users\sankara\IdeaProjects\my1st\project
[info] loading settings for project my1st from build.sbt ...
[info] set current project to my1st (in build file:/C:/Users/sankara/IdeaProjects/my1st/)
[info] running my1st India
Hello India
[success] Total time: 4 s, completed 26 Aug, 2020 1:50:13 PM

How to build jar file and run the jar using SBT?

// Sample program
package my1st

object my1st {
  def main(args:Array[String]) : Unit = {
    println("Hello World")
  }
}


SBT - Scala Build Tool (Simple Build Tool) / Simple Build Tool

Download and install sbt from here : https://www.scala-sbt.org/download.html



Open the project in IntelliJ Idea
Right click the project name - Copy - Copy Path
Select : Absolute Path : C:\Users\sankara\IdeaProjects\my1st
Open Windows Explorer
Do paste in address bar
Shift + Right click on the free space 
Open command prompt here


C:\Users\sankara\IdeaProjects\my1st>dir
 Volume in drive C has no label.
 Volume Serial Number is 363F-334D

 Directory of C:\Users\sankara\IdeaProjects\my1st

26-08-2020  12:44    <DIR>          .
26-08-2020  12:44    <DIR>          ..
26-08-2020  13:21    <DIR>          .idea
26-08-2020  12:42               328 build.sbt
26-08-2020  12:43    <DIR>          project
26-08-2020  12:40    <DIR>          src
26-08-2020  12:44    <DIR>          target
               1 File(s)            328 bytes
               6 Dir(s)  334,877,937,664 bytes free

sbt package --> to build jar file

C:\Users\sankara\IdeaProjects\my1st>sbt package
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; sup
port was removed in 8.0
[info] welcome to sbt 1.3.13 (Oracle Corporation Java 1.8.0_202)
[info] loading project definition from C:\Users\sankara\IdeaProjects\my1st\project
[info] loading settings for project my1st from build.sbt ...
[info] set current project to my1st (in build file:/C:/Users/sankara/IdeaProjects/my1st/)
[success] Total time: 2 s, completed 26 Aug, 2020 1:31:33 PM

sbt run --> to run the jar 

C:\Users\sankara\IdeaProjects\my1st>sbt run
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
[info] welcome to sbt 1.3.13 (Oracle Corporation Java 1.8.0_202)
[info] loading project definition from C:\Users\sankara\IdeaProjects\my1st\project
[info] loading settings for project my1st from build.sbt ...
[info] set current project to my1st (in build file:/C:/Users/sankara/IdeaProjects/my1st/)
[info] running my1st.my1st
Hello World
[success] Total time: 3 s, completed 26 Aug, 2020 1:32:34 PM

Saturday, 22 August 2020

Lead, Lag Examples in Spark SQL with Scala

// Lead Example 
// Lead means Next row's salary value 
spark.sql("SELECT id, fname,lname, designation, technology,salary, LEAD(salary) OVER (PARTITION BY technology ORDER BY salary desc) as Lead FROM teams").show(21)

// Lead will fetch next row's salary value 
+---+------------+-----------+-----------+----------+------+-----+
| id|       fname|      lname|designation|technology|salary| Lead|
+---+------------+-----------+-----------+----------+------+-----+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|50000|  
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|40000|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|20000|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|12000|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000| null|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|50000|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|45000|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|45000|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|20000|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000| null|
|115|      Donald|      Trump|    Analyst|   Android| 45000|45000|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|40000|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|38000|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|25000|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000| null|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|50000|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|50000|
|103|        Raja|       Mani|  Developer|     Spark| 50000|50000|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|40000|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|25000|
|106|        Arun|    Pandian|     Tester|     Spark| 25000| null|
+---+------------+-----------+-----------+----------+------+-----+


// Lag Example
spark.sql("SELECT id, fname,lname, designation, technology,salary, LAG(salary) OVER (PARTITION BY technology ORDER BY salary desc) as Lag FROM teams").show(21)

// Lag will fetch previous row's salary value 

+---+------------+-----------+-----------+----------+------+-----+
| id|       fname|      lname|designation|technology|salary|  Lag|
+---+------------+-----------+-----------+----------+------+-----+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000| null|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|98000|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|50000|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|40000|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|20000|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000| null|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|69000|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|50000|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|45000|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|45000|
|115|      Donald|      Trump|    Analyst|   Android| 45000| null|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|45000|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|45000|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|40000|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|38000|
|114|      Salman|       Khan|    Analyst|     Spark| 67000| null|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|67000|
|103|        Raja|       Mani|  Developer|     Spark| 50000|50000|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|50000|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|50000|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|40000|
+---+------------+-----------+-----------+----------+------+-----+

// fetching salary which is lagging for 2 records
spark.sql("SELECT id, fname,lname, designation, technology,salary, LAG(salary,2) OVER (PARTITION BY technology ORDER BY salary desc) as Lag FROM teams").show(21)


+---+------------+-----------+-----------+----------+------+-----+
| id|       fname|      lname|designation|technology|salary|  Lag|
+---+------------+-----------+-----------+----------+------+-----+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000| null|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000| null|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|98000|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|50000|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|40000|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000| null|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000| null|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|69000|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|50000|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|45000|
|115|      Donald|      Trump|    Analyst|   Android| 45000| null|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000| null|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|45000|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|45000|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|40000|
|114|      Salman|       Khan|    Analyst|     Spark| 67000| null|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000| null|
|103|        Raja|       Mani|  Developer|     Spark| 50000|67000|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|50000|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|50000|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|50000|
+---+------------+-----------+-----------+----------+------+-----+

//fetching salary which is leading 3 records
spark.sql("SELECT id, fname,lname, designation, technology,salary, LEAD(salary,3) OVER (PARTITION BY technology ORDER BY salary desc) as Lead FROM teams").show(21)



+---+------------+-----------+-----------+----------+------+-----+
| id|       fname|      lname|designation|technology|salary| Lead|
+---+------------+-----------+-----------+----------+------+-----+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|20000|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|12000|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000| null|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000| null|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000| null|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|45000|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|20000|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000| null|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000| null|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000| null|
|115|      Donald|      Trump|    Analyst|   Android| 45000|38000|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|25000|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000| null|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000| null|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000| null|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|50000|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|40000|
|103|        Raja|       Mani|  Developer|     Spark| 50000|25000|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000| null|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000| null|
|106|        Arun|    Pandian|     Tester|     Spark| 25000| null|
+---+------------+-----------+-----------+----------+------+-----+


// Find the difference amount of salary between LAG value and current salary value
spark.sql("SELECT id, fname,lname, designation, technology,salary, LAG(salary) OVER (PARTITION BY technology ORDER BY salary desc) as Lag, (salary - LAG(salary) OVER (PARTITION BY technology ORDER BY salary desc)) as diff FROM teams").show(21)

+---+------------+-----------+-----------+----------+------+-----+------+
| id|       fname|      lname|designation|technology|salary|  Lag|  diff|
+---+------------+-----------+-----------+----------+------+-----+------+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000| null|  null|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|98000|-48000|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|50000|-10000|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|40000|-20000|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|20000| -8000|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000| null|  null|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|69000|-19000|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|50000| -5000|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|45000|     0|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|45000|-25000|
|115|      Donald|      Trump|    Analyst|   Android| 45000| null|  null|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|45000|     0|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|45000| -5000|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|40000| -2000|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|38000|-13000|
|114|      Salman|       Khan|    Analyst|     Spark| 67000| null|  null|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|67000|-17000|
|103|        Raja|       Mani|  Developer|     Spark| 50000|50000|     0|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|50000|     0|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|50000|-10000|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|40000|-15000|
+---+------------+-----------+-----------+----------+------+-----+------+

// Find the difference amount of salary between LEAD value and current salary value
spark.sql("SELECT id, fname,lname, designation, technology,salary, LEAD(salary) OVER (PARTITION BY technology ORDER BY salary desc) as Lead, (salary - LEAD(salary) OVER (PARTITION BY technology ORDER BY salary desc) ) as diff FROM teams").show(21)

+---+------------+-----------+-----------+----------+------+-----+-----+
| id|       fname|      lname|designation|technology|salary| Lead| diff|
+---+------------+-----------+-----------+----------+------+-----+-----+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|50000|48000|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|40000|10000|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|20000|20000|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|12000| 8000|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000| null| null|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|50000|19000|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|45000| 5000|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|45000|    0|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|20000|25000|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000| null| null|
|115|      Donald|      Trump|    Analyst|   Android| 45000|45000|    0|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|40000| 5000|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|38000| 2000|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|25000|13000|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000| null| null|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|50000|17000|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|50000|    0|
|103|        Raja|       Mani|  Developer|     Spark| 50000|50000|    0|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|40000|10000|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|25000|15000|
|106|        Arun|    Pandian|     Tester|     Spark| 25000| null| null|
+---+------------+-----------+-----------+----------+------+-----+-----+ 



ROW_NUMBER, RANK, DENSE RANK Windowing Functions in Spark SQL

team.csv:
---------
id,fname,lname,designation,technology,salary
100,Sankara,Narayanan,Developer,Spark,50000
101,Ramesh,Kumar,Tester,Android,40000
102,Ganapathy,Govindan,Tester,Android,25000
103,Raja,Mani,Developer,Spark,50000
104,Pavithra,Lokesh,Developer,iOS,20000
105,Kamala,Kamesh,Developer,iOS,98000
106,Arun,Pandian,Tester,Spark,25000
107,Praveen,Mani,Analyst,iOS,50000
108,Anbu,Sudha,Tester,iOS,12000
109,Nagarethinam,Dhanukkodi,Tester,Node JS,20000
110,Veeraiah,Dhanukkodi,Analyst,iOS,40000
111,Arivu,Madhi,Developer,Spark,50000
112,Muthu,Kumar,Analyst,Node JS,45000
113,Sikkandar,Bhai,Developer,Spark,40000
114,Salman,Khan,Analyst,Spark,67000
115,Donald,Trump,Analyst,Android,45000
116,Anitha,Kuppusamy,Developer,Android,45000
117,Pushpavanam,Kuppusamy,Analyst,Android,38000
118,Latha,Rajinikanth,Tester,Node JS,45000
119,Aishwarya,Dhanush,Developer,Node JS,69000
120,Vijay,Kumar,Developer,Node JS,50000


/FileStore/tables/team.csv


 val df = spark.read.format("csv").option("inferSchema","true").option("header",true).load("/FileStore/tables/team.csv")
 
 df.printSchema()
 
root
 |-- id: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- technology: string (nullable = true)
 |-- salary: integer (nullable = true)
 
 
 df.show()
 
+---+------------+-----------+-----------+----------+------+
| id|       fname|      lname|designation|technology|salary|
+---+------------+-----------+-----------+----------+------+
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|
|103|        Raja|       Mani|  Developer|     Spark| 50000|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|
|115|      Donald|      Trump|    Analyst|   Android| 45000|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|
+---+------------+-----------+-----------+----------+------+


df.createOrReplaceTempView("teams")

spark.sql("select * from teams").show(21)

+---+------------+-----------+-----------+----------+------+
| id|       fname|      lname|designation|technology|salary|
+---+------------+-----------+-----------+----------+------+
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|
|103|        Raja|       Mani|  Developer|     Spark| 50000|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|
|115|      Donald|      Trump|    Analyst|   Android| 45000|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|
+---+------------+-----------+-----------+----------+------+


// ROW_NUMBER -- to add row number sequence 


spark.sql("SELECT id,fname,lname,technology,designation,salary, ROW_NUMBER() OVER (PARTITION BY designation ORDER BY salary desc) AS row_num FROM teams").show(21)


+---+------------+-----------+----------+-----------+------+-------+
| id|       fname|      lname|technology|designation|salary|row_num|
+---+------------+-----------+----------+-----------+------+-------+
|118|       Latha|Rajinikanth|   Node JS|     Tester| 45000|      1|
|101|      Ramesh|      Kumar|   Android|     Tester| 40000|      2|
|102|   Ganapathy|   Govindan|   Android|     Tester| 25000|      3|
|106|        Arun|    Pandian|     Spark|     Tester| 25000|      4|
|109|Nagarethinam| Dhanukkodi|   Node JS|     Tester| 20000|      5|
|108|        Anbu|      Sudha|       iOS|     Tester| 12000|      6|
|105|      Kamala|     Kamesh|       iOS|  Developer| 98000|      1|
|119|   Aishwarya|    Dhanush|   Node JS|  Developer| 69000|      2|
|100|     Sankara|  Narayanan|     Spark|  Developer| 50000|      3|
|103|        Raja|       Mani|     Spark|  Developer| 50000|      4|
|111|       Arivu|      Madhi|     Spark|  Developer| 50000|      5|
|120|       Vijay|      Kumar|   Node JS|  Developer| 50000|      6|
|116|      Anitha|  Kuppusamy|   Android|  Developer| 45000|      7|
|113|   Sikkandar|       Bhai|     Spark|  Developer| 40000|      8|
|104|    Pavithra|     Lokesh|       iOS|  Developer| 20000|      9|
|114|      Salman|       Khan|     Spark|    Analyst| 67000|      1|
|107|     Praveen|       Mani|       iOS|    Analyst| 50000|      2|
|112|       Muthu|      Kumar|   Node JS|    Analyst| 45000|      3|
|115|      Donald|      Trump|   Android|    Analyst| 45000|      4|
|110|    Veeraiah| Dhanukkodi|       iOS|    Analyst| 40000|      5|
|117| Pushpavanam|  Kuppusamy|   Android|    Analyst| 38000|      6|
+---+------------+-----------+----------+-----------+------+-------+


// ROW_NUMBER()

spark.sql("SELECT id,fname,lname,designation,technology,salary, ROW_NUMBER() OVER (PARTITION BY technology ORDER BY salary desc) AS row_num FROM teams").show(21)


+---+------------+-----------+-----------+----------+------+-------+
| id|       fname|      lname|designation|technology|salary|row_num|
+---+------------+-----------+-----------+----------+------+-------+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|      1|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|      2|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|      3|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|      4|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|      5|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|      1|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|      2|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|      3|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|      4|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|      5|
|115|      Donald|      Trump|    Analyst|   Android| 45000|      1|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|      2|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|      3|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|      4|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|      5|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|      1|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|      2|
|103|        Raja|       Mani|  Developer|     Spark| 50000|      3|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|      4|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|      5|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|      6|
+---+------------+-----------+-----------+----------+------+-------+


 
// RANK()
spark.sql("SELECT id,fname,lname,technology,designation,salary, RANK() OVER (PARTITION BY designation ORDER BY salary desc) AS rank FROM teams").show(21)

+---+------------+-----------+----------+-----------+------+----+
| id|       fname|      lname|technology|designation|salary|rank|
+---+------------+-----------+----------+-----------+------+----+
|118|       Latha|Rajinikanth|   Node JS|     Tester| 45000|   1|
|101|      Ramesh|      Kumar|   Android|     Tester| 40000|   2|
|102|   Ganapathy|   Govindan|   Android|     Tester| 25000|   3|
|106|        Arun|    Pandian|     Spark|     Tester| 25000|   3| -- 4 skipped 
|109|Nagarethinam| Dhanukkodi|   Node JS|     Tester| 20000|   5|
|108|        Anbu|      Sudha|       iOS|     Tester| 12000|   6|
|105|      Kamala|     Kamesh|       iOS|  Developer| 98000|   1|
|119|   Aishwarya|    Dhanush|   Node JS|  Developer| 69000|   2|
|100|     Sankara|  Narayanan|     Spark|  Developer| 50000|   3|
|103|        Raja|       Mani|     Spark|  Developer| 50000|   3|
|111|       Arivu|      Madhi|     Spark|  Developer| 50000|   3|
|120|       Vijay|      Kumar|   Node JS|  Developer| 50000|   3|
|116|      Anitha|  Kuppusamy|   Android|  Developer| 45000|   7| -- 4,5,6 skipped 
|113|   Sikkandar|       Bhai|     Spark|  Developer| 40000|   8|
|104|    Pavithra|     Lokesh|       iOS|  Developer| 20000|   9|
|114|      Salman|       Khan|     Spark|    Analyst| 67000|   1|
|107|     Praveen|       Mani|       iOS|    Analyst| 50000|   2|
|112|       Muthu|      Kumar|   Node JS|    Analyst| 45000|   3|
|115|      Donald|      Trump|   Android|    Analyst| 45000|   3| -- 4 skipped 
|110|    Veeraiah| Dhanukkodi|       iOS|    Analyst| 40000|   5|
|117| Pushpavanam|  Kuppusamy|   Android|    Analyst| 38000|   6|
+---+------------+-----------+----------+-----------+------+----+

// RANK()
spark.sql("SELECT id,fname,lname,designation,technology,salary, RANK() OVER (PARTITION BY technology ORDER BY salary desc) AS rank FROM teams").show(21)


+---+------------+-----------+-----------+----------+------+----+
| id|       fname|      lname|designation|technology|salary|rank|
+---+------------+-----------+-----------+----------+------+----+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|   1|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|   2|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|   3|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|   4|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|   5|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|   1|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|   2|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|   3|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|   3|  -- 4 is skipped
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|   5|
|115|      Donald|      Trump|    Analyst|   Android| 45000|   1|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|   1| -- 2 is skipped 
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|   3|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|   4|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|   5|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|   1|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|   2|
|103|        Raja|       Mani|  Developer|     Spark| 50000|   2| -- 3 is skipped 
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|   2|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|   5|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|   6|
+---+------------+-----------+-----------+----------+------+----+



// DENSE_RANK()
spark.sql("SELECT id,fname,lname,technology,designation,salary, DENSE_RANK() OVER (PARTITION BY designation ORDER BY salary desc) AS dense_rank FROM teams").show(21)

+---+------------+-----------+----------+-----------+------+----------+
| id|       fname|      lname|technology|designation|salary|dense_rank|
+---+------------+-----------+----------+-----------+------+----------+
|118|       Latha|Rajinikanth|   Node JS|     Tester| 45000|         1|
|101|      Ramesh|      Kumar|   Android|     Tester| 40000|         2|
|102|   Ganapathy|   Govindan|   Android|     Tester| 25000|         3|
|106|        Arun|    Pandian|     Spark|     Tester| 25000|         3|
|109|Nagarethinam| Dhanukkodi|   Node JS|     Tester| 20000|         4|
|108|        Anbu|      Sudha|       iOS|     Tester| 12000|         5|
|105|      Kamala|     Kamesh|       iOS|  Developer| 98000|         1|
|119|   Aishwarya|    Dhanush|   Node JS|  Developer| 69000|         2|
|100|     Sankara|  Narayanan|     Spark|  Developer| 50000|         3|
|103|        Raja|       Mani|     Spark|  Developer| 50000|         3|
|111|       Arivu|      Madhi|     Spark|  Developer| 50000|         3|
|120|       Vijay|      Kumar|   Node JS|  Developer| 50000|         3|
|116|      Anitha|  Kuppusamy|   Android|  Developer| 45000|         4|
|113|   Sikkandar|       Bhai|     Spark|  Developer| 40000|         5|
|104|    Pavithra|     Lokesh|       iOS|  Developer| 20000|         6|
|114|      Salman|       Khan|     Spark|    Analyst| 67000|         1|
|107|     Praveen|       Mani|       iOS|    Analyst| 50000|         2|
|112|       Muthu|      Kumar|   Node JS|    Analyst| 45000|         3|
|115|      Donald|      Trump|   Android|    Analyst| 45000|         3|
|110|    Veeraiah| Dhanukkodi|       iOS|    Analyst| 40000|         4|
|117| Pushpavanam|  Kuppusamy|   Android|    Analyst| 38000|         5|
+---+------------+-----------+----------+-----------+------+----------+

// dense_rank
spark.sql("SELECT id,fname,lname,designation,technology,salary, DENSE_RANK() OVER (PARTITION BY technology ORDER BY salary desc) AS dense_rank FROM teams").show(21)

+---+------------+-----------+-----------+----------+------+----------+
| id|       fname|      lname|designation|technology|salary|dense_rank|
+---+------------+-----------+-----------+----------+------+----------+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|         1|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|         2|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|         3|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|         4|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|         5|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|         1|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|         2|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|         3|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|         3|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|         4|
|115|      Donald|      Trump|    Analyst|   Android| 45000|         1|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|         1|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|         2|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|         3|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|         4|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|         1|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|         2|
|103|        Raja|       Mani|  Developer|     Spark| 50000|         2|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|         2|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|         3|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|         4|
+---+------------+-----------+-----------+----------+------+----------+


 // ROW_NUMBER, RANK, DENSE_RANK all together

spark.sql("SELECT id,fname,lname,designation,technology,salary, ROW_NUMBER() OVER (PARTITION BY technology ORDER BY salary desc) AS row_num,RANK() OVER (PARTITION BY technology ORDER BY salary desc) AS rank,DENSE_RANK() OVER (PARTITION BY technology ORDER BY salary desc) AS dense_rank FROM teams").show(21)


+---+------------+-----------+-----------+----------+------+-------+----+----------+
| id|       fname|      lname|designation|technology|salary|row_num|rank|dense_rank|
+---+------------+-----------+-----------+----------+------+-------+----+----------+
|105|      Kamala|     Kamesh|  Developer|       iOS| 98000|      1|   1|         1|
|107|     Praveen|       Mani|    Analyst|       iOS| 50000|      2|   2|         2|
|110|    Veeraiah| Dhanukkodi|    Analyst|       iOS| 40000|      3|   3|         3|
|104|    Pavithra|     Lokesh|  Developer|       iOS| 20000|      4|   4|         4|
|108|        Anbu|      Sudha|     Tester|       iOS| 12000|      5|   5|         5|
|119|   Aishwarya|    Dhanush|  Developer|   Node JS| 69000|      1|   1|         1|
|120|       Vijay|      Kumar|  Developer|   Node JS| 50000|      2|   2|         2|
|112|       Muthu|      Kumar|    Analyst|   Node JS| 45000|      3|   3|         3|
|118|       Latha|Rajinikanth|     Tester|   Node JS| 45000|      4|   3|         3|
|109|Nagarethinam| Dhanukkodi|     Tester|   Node JS| 20000|      5|   5|         4|
|115|      Donald|      Trump|    Analyst|   Android| 45000|      1|   1|         1|
|116|      Anitha|  Kuppusamy|  Developer|   Android| 45000|      2|   1|         1|
|101|      Ramesh|      Kumar|     Tester|   Android| 40000|      3|   3|         2|
|117| Pushpavanam|  Kuppusamy|    Analyst|   Android| 38000|      4|   4|         3|
|102|   Ganapathy|   Govindan|     Tester|   Android| 25000|      5|   5|         4|
|114|      Salman|       Khan|    Analyst|     Spark| 67000|      1|   1|         1|
|100|     Sankara|  Narayanan|  Developer|     Spark| 50000|      2|   2|         2|
|103|        Raja|       Mani|  Developer|     Spark| 50000|      3|   2|         2|
|111|       Arivu|      Madhi|  Developer|     Spark| 50000|      4|   2|         2|
|113|   Sikkandar|       Bhai|  Developer|     Spark| 40000|      5|   5|         3|
|106|        Arun|    Pandian|     Tester|     Spark| 25000|      6|   6|         4|
+---+------------+-----------+-----------+----------+------+-------+----+----------+

Tuesday, 18 August 2020

Item, StoreSales DataSets - Spark with Scala Programming


val itemDF = spark.read.format("csv").options(Map("header"->"true", "delimiter"->"|", "inferSchema"->"true")).load("/FileStore/tables/retailer/data/item.dat")
 
itemDF.printSchema()

root
 |-- i_item_sk: integer (nullable = true)
 |-- i_item_id: string (nullable = true)
 |-- i_rec_start_date: string (nullable = true)
 |-- i_rec_end_date: string (nullable = true)
 |-- i_item_desc: string (nullable = true)
 |-- i_current_price: double (nullable = true)
 |-- i_wholesale_cost: double (nullable = true)
 |-- i_brand_id: integer (nullable = true)
 |-- i_brand: string (nullable = true)
 |-- i_class_id: integer (nullable = true)
 |-- i_class: string (nullable = true)
 |-- i_category_id: integer (nullable = true)
 |-- i_category: string (nullable = true)
 |-- i_manufact_id: integer (nullable = true)
 |-- i_manufact: string (nullable = true)
 |-- i_size: string (nullable = true)
 |-- i_formulation: string (nullable = true)
 |-- i_color: string (nullable = true)
 |-- i_units: string (nullable = true)
 |-- i_container: string (nullable = true)
 |-- i_manager_id: integer (nullable = true)
 |-- i_product_name: string (nullable = true)
 
 
import org.apache.spark.sql.functions._
itemDF.select(min("i_wholesale_cost").alias("Minn"), max("i_wholesale_cost").alias("Maxx")).show()

+----+-----+
|Minn| Maxx|
+----+-----+
|0.02|87.36|
+----+-----+

Or

import org.apache.spark.sql.functions._

val minMaxDF = itemDF.select(min("i_wholesale_cost"),max("i_wholesale_cost")).withColumnRenamed("min(i_wholesale_cost)","Minn").withColumnRenamed("max(i_wholesale_cost)","Maxx")
minMaxDF.show()

+----+-----+
|Minn| Maxx|
+----+-----+
|0.02|87.36|
+----+-----+


// Filter condition
display(itemDF.filter($"i_wholesale_cost" === 0.02))



import org.apache.spark.sql.functions._
val minWSCostDF = itemDF.select(min("i_wholesale_cost").alias("Minn"))

minWSCostDF.show()

+----+
|Minn|
+----+
|0.02|
+----+

val cheapestItemDF = itemDF.join(minWSCostDF,itemDF.col("i_wholesale_cost") === minWSCostDF.col("Minn"),"inner")

cheapestItemDF.select("i_item_desc","i_current_price","i_category").show()

+--------------------+---------------+--------------------+
|         i_item_desc|i_current_price|          i_category|
+--------------------+---------------+--------------------+
|Forces can testif...|           0.09|Books            ...|
|Forces can testif...|           9.21|Sports           ...|
+--------------------+---------------+--------------------+

 

import org.apache.spark.sql.functions._
itemDF.select(max("i_current_price").alias("MaxCurrentPrice")).show()

+---------------+
|MaxCurrentPrice|
+---------------+
|          99.99|
+---------------+


val storeSalesDF = spark.read.format("csv").option("header","true").option("sep","|").option("inferSchema","true").load("/FileStore/tables/retailer/data/store_sales.dat")

storeSalesDF.count()
2880404

storeSalesDF.printSchema


storeSalesDF.printSchema
root
 |-- ss_sold_date_sk: integer (nullable = true)
 |-- ss_sold_time_sk: integer (nullable = true)
 |-- ss_item_sk: integer (nullable = true)
 |-- ss_customer_sk: integer (nullable = true)
 |-- ss_cdemo_sk: integer (nullable = true)
 |-- ss_hdemo_sk: integer (nullable = true)
 |-- ss_addr_sk: integer (nullable = true)
 |-- ss_store_sk: integer (nullable = true)
 |-- ss_promo_sk: integer (nullable = true)
 |-- ss_ticket_number: integer (nullable = true)
 |-- ss_quantity: integer (nullable = true)
 |-- ss_wholesale_cost: double (nullable = true)
 |-- ss_list_price: double (nullable = true)
 |-- ss_sales_price: double (nullable = true)
 |-- ss_ext_discount_amt: double (nullable = true)
 |-- ss_ext_sales_price: double (nullable = true)
 |-- ss_ext_wholesale_cost: double (nullable = true)
 |-- ss_ext_list_price: double (nullable = true)
 |-- ss_ext_tax: double (nullable = true)
 |-- ss_coupon_amt: double (nullable = true)
 |-- ss_net_paid: double (nullable = true)
 |-- ss_net_paid_inc_tax: double (nullable = true)
 |-- ss_net_profit: double (nullable = true)
 
 
 import org.apache.spark.sql.functions.sum
 
 storeSalesDF.select(sum("ss_net_paid_inc_tax").alias("NetIncomeTaxPaid"), sum("ss_net_profit").alias("NetProfit")).show()
 
+-------------------+--------------------+
|   NetIncomeTaxPaid|           NetProfit|
+-------------------+--------------------+
|4.954755825529975E9|-2.276100670919993E9|
+-------------------+--------------------+

import org.apache.spark.sql.functions.{sum,sumDistinct}

storeSalesDF.select(sumDistinct("ss_quantity"),sum("ss_quantity")).show()

+-------------------------+----------------+
|sum(DISTINCT ss_quantity)|sum(ss_quantity)|
+-------------------------+----------------+
|                     5050|       138943711|
+-------------------------+----------------+



import org.apache.spark.sql.functions.{avg,mean}
storeSalesDF.select(avg("ss_quantity"),mean("ss_quantity")).show()

+-----------------+-----------------+
| avg(ss_quantity)| avg(ss_quantity)|
+-----------------+-----------------+
|50.51749085953793|50.51749085953793|
+-----------------+-----------------+

import org.apache.spark.sql.functions.{avg,mean,min,max}

storeSalesDF.select(
avg("ss_quantity").as("Average_Purchases"),
mean("ss_quantity").as("Mean_Purchases"),
sum("ss_quantity") / count("ss_quantity"),
min("ss_quantity").as("Min_Purchases"),
max("ss_quantity").alias("Max_Purchases")).show()
+-----------------+-----------------+---------------------------------------+-------------+-------------+
|Average_Purchases|   Mean_Purchases|(sum(ss_quantity) / count(ss_quantity))|Min_Purchases|Max_Purchases|
+-----------------+-----------------+---------------------------------------+-------------+-------------+
|50.51749085953793|50.51749085953793|                      50.51749085953793|            1|          100|
+-----------------+-----------------+---------------------------------------+-------------+-------------+

Customer, CustomerAddress, IncomeBand - DataSets in Spark with Scala

val df = spark.read.format("csv").option("inferSchema","True").option("header","true").load("/FileStore/tables/retailer/data/customer.csv") 

df.printSchema

root
 |-- c_customer_sk: integer (nullable = true)
 |-- c_customer_id: string (nullable = true)
 |-- c_current_cdemo_sk: integer (nullable = true)
 |-- c_current_hdemo_sk: integer (nullable = true)
 |-- c_current_addr_sk: integer (nullable = true)
 |-- c_first_shipto_date_sk: integer (nullable = true)
 |-- c_first_sales_date_sk: integer (nullable = true)
 |-- c_salutation: string (nullable = true)
 |-- c_first_name: string (nullable = true)
 |-- c_last_name: string (nullable = true)
 |-- c_preferred_cust_flag: string (nullable = true)
 |-- c_birth_day: integer (nullable = true)
 |-- c_birth_month: integer (nullable = true)
 |-- c_birth_year: integer (nullable = true)
 |-- c_birth_country: string (nullable = true)
 |-- c_login: string (nullable = true)
 |-- c_email_address: string (nullable = true)
 |-- c_last_review_date: double (nullable = true)
 
 df.columns.foreach(println)
 
 
c_customer_sk
c_customer_id
c_current_cdemo_sk
c_current_hdemo_sk
c_current_addr_sk
c_first_shipto_date_sk
c_first_sales_date_sk
c_salutation
c_first_name
c_last_name
c_preferred_cust_flag
c_birth_day
c_birth_month
c_birth_year
c_birth_country
c_login
c_email_address
c_last_review_date

df.columns.length
18

df.select("c_customer_id","c_first_name","c_last_name","c_birth_year","c_birth_month","c_birth_day").show(3,truncate=false)


+----------------+--------------------+------------------------------+------------+-------------+-----------+
|c_customer_id   |c_first_name        |c_last_name                   |c_birth_year|c_birth_month|c_birth_day|
+----------------+--------------------+------------------------------+------------+-------------+-----------+
|AAAAAAAABAAAAAAA|Javier              |Lewis                         |1936        |12           |9          |
|AAAAAAAACAAAAAAA|Amy                 |Moses                         |1966        |4            |9          |
|AAAAAAAADAAAAAAA|Latisha             |Hamilton                      |1979        |9            |18         |
+----------------+--------------------+------------------------------+------------+-------------+-----------+



df.select("c_customer_id","c_first_name","c_last_name","c_birth_year","c_birth_month","c_birth_day").show(3)


import org.apache.spark.sql.functions.{col, column}

df.select(col("c_last_name"),column("c_first_name"), $"c_birth_year").show(3)


+--------------------+--------------------+------------+
|         c_last_name|        c_first_name|c_birth_year|
+--------------------+--------------------+------------+
|Lewis            ...|Javier              |        1936|
|Moses            ...|Amy                 |        1966|
|Hamilton         ...|Latisha             |        1979|
+--------------------+--------------------+------------+


val customerWithBday = df.select("c_customer_id","c_first_name","c_last_name","c_birth_year","c_birth_month","c_birth_day")


customerWithBday.printSchema()

root
 |-- c_customer_id: string (nullable = true)
 |-- c_first_name: string (nullable = true)
 |-- c_last_name: string (nullable = true)
 |-- c_birth_year: integer (nullable = true)
 |-- c_birth_month: integer (nullable = true)
 |-- c_birth_day: integer (nullable = true)
 
 
 customerWithBday.schema
 
 
 res21: org.apache.spark.sql.types.StructType = StructType(
StructField(c_customer_id,StringType,true), 
StructField(c_first_name,StringType,true), 
StructField(c_last_name,StringType,true), 
StructField(c_birth_year,IntegerType,true), 
StructField(c_birth_month,IntegerType,true), 
StructField(c_birth_day,IntegerType,true))
%fs head /FileStore/tables/retailer/data/customer_address.dat

ca_address_sk|ca_address_id|ca_street_number|ca_street_name|ca_street_type|ca_suite_number|ca_city|ca_county|ca_state|ca_zip|ca_country|ca_gmt_offset|ca_location_type
1|AAAAAAAABAAAAAAA|18        |Jackson |Parkway        |Suite 280 |Fairfield|Maricopa County|AZ|86192     |United States|-7.00|condo               
2|AAAAAAAACAAAAAAA|362       |Washington 6th|RD             |Suite 80  |Fairview|Taos County|NM|85709     |United States|-7.00|condo  

 
 
 /FileStore/tables/retailer/data/customer_address.dat
 
 
 val custAddDF = spark.read.format("csv"). option("header","true").option("sep","|").option("inferSchema","true").load("/FileStore/tables/retailer/data/customer_address.dat")
 
 custAddDF.printSchema()
 
 root
 |-- ca_address_sk: integer (nullable = true)
 |-- ca_address_id: string (nullable = true)
 |-- ca_street_number: double (nullable = true)  -- wrong 
 |-- ca_street_name: string (nullable = true)
 |-- ca_street_type: string (nullable = true)
 |-- ca_suite_number: string (nullable = true)
 |-- ca_city: string (nullable = true)
 |-- ca_county: string (nullable = true)
 |-- ca_state: string (nullable = true)
 |-- ca_zip: double (nullable = true)
 |-- ca_country: string (nullable = true)
 |-- ca_gmt_offset: double (nullable = true) -- change it into decimal (5,2)
 |-- ca_location_type: string (nullable = true)
 
  
 
val custAddSchemaDDL  = "ca_address_sk long, ca_address_id string, ca_street_number string, ca_street_name string, "  +
"ca_street_type string, ca_suite_number string, ca_city string, " +
"ca_county string, ca_state string, ca_zip string, ca_country string, ca_gmt_offset decimal(5,2), ca_location_type string"

val custAddDF = spark.read.format("csv").schema(custAddSchemaDDL).option("header","true").option("sep","|").option("inferSchema","true").load("/FileStore/tables/retailer/data/customer_address.dat")

custAddDF.printSchema()

root
 |-- ca_address_sk: long (nullable = true)
 |-- ca_address_id: string (nullable = true)
 |-- ca_street_number: string (nullable = true)
 |-- ca_street_name: string (nullable = true)
 |-- ca_street_type: string (nullable = true)
 |-- ca_suite_number: string (nullable = true)
 |-- ca_city: string (nullable = true)
 |-- ca_county: string (nullable = true)
 |-- ca_state: string (nullable = true)
 |-- ca_zip: string (nullable = true)
 |-- ca_country: string (nullable = true)
 |-- ca_gmt_offset: decimal(5,2) (nullable = true)
 |-- ca_location_type: string (nullable = true)
 
 
 custAddDF.schema.toDDL
 
 res38: String = `ca_address_sk` BIGINT,`ca_address_id` STRING,`ca_street_number` STRING,`ca_street_name` STRING,`ca_street_type` STRING,`ca_suite_number` STRING,`ca_city` STRING,`ca_county` STRING,`ca_state` STRING,`ca_zip` STRING,`ca_country` STRING,`ca_gmt_offset` DECIMAL(5,2),`ca_location_type` STRING



custAddDF.schema

res39: org.apache.spark.sql.types.StructType = StructType(StructField(ca_address_sk,LongType,true), StructField(ca_address_id,StringType,true), StructField(ca_street_number,StringType,true), StructField(ca_street_name,StringType,true), StructField(ca_street_type,StringType,true), StructField(ca_suite_number,StringType,true), StructField(ca_city,StringType,true), StructField(ca_county,StringType,true), StructField(ca_state,StringType,true), StructField(ca_zip,StringType,true), StructField(ca_country,StringType,true), StructField(ca_gmt_offset,DecimalType(5,2),true), StructField(ca_location_type,StringType,true))


val incomeBandDF = spark.read.format("csv").schema("ib_lower_band_sk long, ib_lower_bound int, ib_upper_bound int").option("sep","|").load("/FileStore/tables/retailer/data/income_band.dat")

incomeBandDF.printSchema()

root
 |-- ib_lower_band_sk: long (nullable = true)
 |-- ib_lower_bound: integer (nullable = true)
 |-- ib_upper_bound: integer (nullable = true)
 
 
 incomeBandDF.show(5)
 
 +----------------+--------------+--------------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|
+----------------+--------------+--------------+
|               1|             0|         10000|
|               2|         10001|         20000|
|               3|         20001|         30000|
|               4|         30001|         40000|
|               5|         40001|         50000|
+----------------+--------------+--------------+

// create a new column 
val incomeBandwithGroups = incomeBandDF.withColumn("isFirstIncomeGroup",incomeBandDF.col("ib_upper_bound") <= 60000)

incomeBandwithGroups.show(5)

+----------------+--------------+--------------+------------------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|isFirstIncomeGroup|
+----------------+--------------+--------------+------------------+
|               1|             0|         10000|              true|
|               2|         10001|         20000|              true|
|               3|         20001|         30000|              true|
|               4|         30001|         40000|              true|
|               5|         40001|         50000|              true|
+----------------+--------------+--------------+------------------+


// create static column with constant value we need to use lit
import org.apache.spark.sql.functions.{col, lit, when}

val incomeBandwithGroups = incomeBandDF.
withColumn("isFirstIncomeGroup",incomeBandDF.col("ib_upper_bound") <= 60000).
withColumn("isSecodIncomeGroup",incomeBandDF("ib_upper_bound") > 60000 and incomeBandDF("ib_upper_bound") <= 12000).
withColumn("isThirdIncomeGroup",incomeBandDF("ib_upper_bound") > 12000 and incomeBandDF("ib_upper_bound") < 200000).
withColumn("demo",lit("demoValue"))
incomeBandwithGroups.show(5)

+----------------+--------------+--------------+------------------+------------------+------------------+---------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|isFirstIncomeGroup|isSecodIncomeGroup|isThirdIncomeGroup|     demo|
+----------------+--------------+--------------+------------------+------------------+------------------+---------+
|               1|             0|         10000|              true|             false|             false|demoValue|
|               2|         10001|         20000|              true|             false|              true|demoValue|
|               3|         20001|         30000|              true|             false|              true|demoValue|
|               4|         30001|         40000|              true|             false|              true|demoValue|
|               5|         40001|         50000|              true|             false|              true|demoValue|
+----------------+--------------+--------------+------------------+------------------+------------------+---------+
// Rename an existing columns

val incomeClasses = incomeBandwithGroups.
withColumnRenamed("isThirdIncomeGroup","isHighIncomeClass").
withColumnRenamed("isFirstIncomeGroup","isStandardIncomeClass").
withColumnRenamed("isSecodIncomeGroup","isMediumIncomeClass")
incomeClasses.show(5)

+----------------+--------------+--------------+---------------------+-------------------+-----------------+---------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|isStandardIncomeClass|isMediumIncomeClass|isHighIncomeClass|     demo|
+----------------+--------------+--------------+---------------------+-------------------+-----------------+---------+
|               1|             0|         10000|                 true|              false|            false|demoValue|
|               2|         10001|         20000|                 true|              false|             true|demoValue|
|               3|         20001|         30000|                 true|              false|             true|demoValue|
|               4|         30001|         40000|                 true|              false|             true|demoValue|
|               5|         40001|         50000|                 true|              false|             true|demoValue|
+----------------+--------------+--------------+---------------------+-------------------+-----------------+---------+
only showing top 5 rows


// Remove columns
val onlyMediumDF = incomeClasses.drop("demo","isStandardIncomeClass", "isHighIncomeClass")

onlyMediumDF.show(5)


+----------------+--------------+--------------+-------------------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|isMediumIncomeClass|
+----------------+--------------+--------------+-------------------+
|               1|             0|         10000|              false|
|               2|         10001|         20000|              false|
|               3|         20001|         30000|              false|
|               4|         30001|         40000|              false|
|               5|         40001|         50000|              false|
+----------------+--------------+--------------+-------------------+


val custDF = spark.read.format("csv").option("inferSchema",true).option("header",true).option("sep","|").load("/FileStore/tables/retailer/data/customer.dat")

custDF.count()
res75: Long = 100000  -- total records 


// validate date range 
custDF.filter($"c_birth_day" > 0 and $"c_birth_day" <= 31 ).count()

res77: Long = 96539  -- valid records 



validRecordsDF.select("c_customer_id", "c_first_name","c_birth_day", "c_birth_month", "c_birth_year").show(5)


+----------------+--------------------+-----------+-------------+------------+
|   c_customer_id|        c_first_name|c_birth_day|c_birth_month|c_birth_year|
+----------------+--------------------+-----------+-------------+------------+
|AAAAAAAABAAAAAAA|Javier              |          9|           12|        1936|
|AAAAAAAACAAAAAAA|Amy                 |          9|            4|        1966|
|AAAAAAAADAAAAAAA|Latisha             |         18|            9|        1979|
|AAAAAAAAEAAAAAAA|Michael             |          7|            6|        1983|
|AAAAAAAAFAAAAAAA|Robert              |          8|            5|        1956|
+----------------+--------------------+-----------+-------------+------------+


// validate dates and months 
val validRecordsDF = custDF.filter($"c_birth_day" > 0 and $"c_birth_day" <= 31 and $"c_birth_month" > 0 and $"c_birth_month" <= 12 and 'c_birth_year' > 0)

validRecordsDF.count()
94791

validRecordsDF.count()
94791


Keys:
customer.c_current_addr_sk  
customer_address.ca_address_sk


val custDF = spark.read.format("csv").option("inferSchema",true).
              option("header",true).option("sep","|").load("/FileStore/tables/retailer/data/customer.dat")

val custAddDF = spark.read.format("csv").option("inferSchema",true).option("header",true).option("sep","|").
              load("/FileStore/tables/retailer/data/customer_address.dat")

val joinExpression = custDF.col("c_current_addr_sk") === custAddDF.col("ca_address_sk")
val custwithAddDF  = custDF.join(custAddDF,joinExpression,"inner").select("c_customer_id","ca_address_sk","c_first_name","c_last_name")




 custwithAddDF.show(5)
 +----------------+-------------+--------------------+--------------------+
|   c_customer_id|ca_address_sk|        c_first_name|         c_last_name|
+----------------+-------------+--------------------+--------------------+
|AAAAAAAABAAAAAAA|        32946|Javier              |Lewis            ...|
|AAAAAAAACAAAAAAA|        31655|Amy                 |Moses            ...|
|AAAAAAAADAAAAAAA|        48572|Latisha             |Hamilton         ...|
|AAAAAAAAEAAAAAAA|        39558|Michael             |White            ...|
|AAAAAAAAFAAAAAAA|        36368|Robert              |Moran            ...|
+----------------+-------------+--------------------+--------------------+



custAddDF.printSchema

root
 |-- ca_address_sk: integer (nullable = true)
 |-- ca_address_id: string (nullable = true)
 |-- ca_street_number: double (nullable = true)
 |-- ca_street_name: string (nullable = true)
 |-- ca_street_type: string (nullable = true)
 |-- ca_suite_number: string (nullable = true)
 |-- ca_city: string (nullable = true)
 |-- ca_county: string (nullable = true)
 |-- ca_state: string (nullable = true)
 |-- ca_zip: double (nullable = true)
 |-- ca_country: string (nullable = true)
 |-- ca_gmt_offset: double (nullable = true)
 |-- ca_location_type: string (nullable = true
 
 
 custDF.join(custAddDF,joinExpression,"inner").select("c_customer_id","c_first_name","c_last_name", "ca_street_name","ca_city").show(5)
 
 +----------------+--------------------+--------------------+--------------+-----------+
|   c_customer_id|        c_first_name|         c_last_name|ca_street_name|    ca_city|
+----------------+--------------------+--------------------+--------------+-----------+
|AAAAAAAABAAAAAAA|Javier              |Lewis            ...| Chestnut Main|Spring Hill|
|AAAAAAAACAAAAAAA|Amy                 |Moses            ...|       8th Oak|    Antioch|
|AAAAAAAADAAAAAAA|Latisha             |Hamilton         ...|Park Jefferson|Cedar Grove|
|AAAAAAAAEAAAAAAA|Michael             |White            ...|Cedar Sycamore|   Lakewood|
|AAAAAAAAFAAAAAAA|Robert              |Moran            ...|   Miller Main|   Waterloo|
+----------------+--------------------+--------------------+--------------+-----------+



custDF.count()
res89: Long = 100000


import org.apache.spark.sql.functions.count
custDF.select(count("*")).show()

100000



custDF.select(count("c_first_name")).show()
96508 -- null values ignored




custDF.filter($"c_first_name".isNotNull).count()
96508 -- null values ignored

// Display the count of all c_first_name is NULL
custDF.filter($"c_first_name".isNull).count()
3492


//countDistinct

import org.apache.spark.sql.functions.countDistinct

custDF.select(countDistinct("c_first_name")).show()

4131


custDF.select("c_first_name").distinct().count()
res98: Long = 4132

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...