Tuesday, 11 August 2020

Playing with Spark Dataframes

scala> val df = spark.sql("select * from ohm.products")
df: org.apache.spark.sql.DataFrame = [product_id: int, product_category_id: int ... 4 more fields]

scala> df.printSchema()
root
 |-- product_id: integer (nullable = true)
 |-- product_category_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_description: string (nullable = true)
 |-- product_price: float (nullable = true)
 |-- product_image: string (nullable = true)


scala> df.show(5)
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|         1|                  2|Quest Q64 10 FT. ...|                   |        59.98|http://images.acm...|
|         2|                  2|Under Armour Men'...|                   |       129.99|http://images.acm...|
|         3|                  2|Under Armour Men'...|                   |        89.99|http://images.acm...|
|         4|                  2|Under Armour Men'...|                   |        89.99|http://images.acm...|
|         5|                  2|Riddell Youth Rev...|                   |       199.99|http://images.acm...|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
only showing top 5 rows



scala> df.select("product_id","product_name","product_price").show(5,false)
+----------+---------------------------------------------+-------------+
|product_id|product_name                                 |product_price|
+----------+---------------------------------------------+-------------+
|1         |Quest Q64 10 FT. x 10 FT. Slant Leg Instant U|59.98        |
|2         |Under Armour Men's Highlight MC Football Clea|129.99       |
|3         |Under Armour Men's Renegade D Mid Football Cl|89.99        |
|4         |Under Armour Men's Renegade D Mid Football Cl|89.99        |
|5         |Riddell Youth Revolution Speed Custom Footbal|199.99       |
+----------+---------------------------------------------+-------------+

scala> val df1 = df.select("product_id","product_name","product_price")
df1: org.apache.spark.sql.DataFrame = [product_id: int, product_name: string ... 1 more field]

scala> df1.show(5)
+----------+--------------------+-------------+
|product_id|        product_name|product_price|
+----------+--------------------+-------------+
|         1|Quest Q64 10 FT. ...|        59.98|
|         2|Under Armour Men'...|       129.99|
|         3|Under Armour Men'...|        89.99|
|         4|Under Armour Men'...|        89.99|
|         5|Riddell Youth Rev...|       199.99|
+----------+--------------------+-------------+
only showing top 5 rows


// sort by price ascending order

scala> df1.sort("product_price").show(10)
+----------+--------------------+-------------+
|product_id|        product_name|product_price|
+----------+--------------------+-------------+
|       388|Nike Men's Hyperv...|          0.0|
|       517|Nike Men's Hyperv...|          0.0|
|       934|Callaway X Hot Dr...|          0.0|
|      1284|Nike Men's Hyperv...|          0.0|
|       414|Nike Men's Hyperv...|          0.0|
|       547|Nike Men's Hyperv...|          0.0|
|        38|Nike Men's Hyperv...|          0.0|
|       815|Zero Friction Pra...|         4.99|
|       624|adidas Batting He...|         4.99|
|       336|Nike Swoosh Headb...|          5.0|
+----------+--------------------+-------------+
only showing top 10 rows

scala> df1.orderBy("product_price").show(10)
+----------+--------------------+-------------+
|product_id|        product_name|product_price|
+----------+--------------------+-------------+
|       388|Nike Men's Hyperv...|          0.0|
|       517|Nike Men's Hyperv...|          0.0|
|       934|Callaway X Hot Dr...|          0.0|
|      1284|Nike Men's Hyperv...|          0.0|
|       414|Nike Men's Hyperv...|          0.0|
|       547|Nike Men's Hyperv...|          0.0|
|        38|Nike Men's Hyperv...|          0.0|
|       815|Zero Friction Pra...|         4.99|
|       624|adidas Batting He...|         4.99|
|       336|Nike Swoosh Headb...|          5.0|
+----------+--------------------+-------------+
only showing top 10 rows


// sort by price - descending order
scala> df1.sort(desc("product_price")).show(10)
+----------+--------------------+-------------+
|product_id|        product_name|product_price|
+----------+--------------------+-------------+
|       208| SOLE E35 Elliptical|      1999.99|
|       199|  SOLE F85 Treadmill|      1799.99|
|        66|  SOLE F85 Treadmill|      1799.99|
|       496|  SOLE F85 Treadmill|      1799.99|
|      1048|Spalding Beast 60...|      1099.99|
|        60| SOLE E25 Elliptical|       999.99|
|       197| SOLE E25 Elliptical|       999.99|
|       694|Callaway Women's ...|       999.99|
|       488| SOLE E25 Elliptical|       999.99|
|       695|Callaway Women's ...|       999.99|
+----------+--------------------+-------------+
only showing top 10 rows


scala> df1.orderBy(desc("product_price")).show(10)
+----------+--------------------+-------------+
|product_id|        product_name|product_price|
+----------+--------------------+-------------+
|       208| SOLE E35 Elliptical|      1999.99|
|       199|  SOLE F85 Treadmill|      1799.99|
|        66|  SOLE F85 Treadmill|      1799.99|
|       496|  SOLE F85 Treadmill|      1799.99|
|      1048|Spalding Beast 60...|      1099.99|
|        60| SOLE E25 Elliptical|       999.99|
|       197| SOLE E25 Elliptical|       999.99|
|       694|Callaway Women's ...|       999.99|
|       488| SOLE E25 Elliptical|       999.99|
|       695|Callaway Women's ...|       999.99|
+----------+--------------------+-------------+
only showing top 10 rows


scala> df.select("product_name","product_price").sort("product_name","product_price").show()
+--------------------+-------------+
|        product_name|product_price|
+--------------------+-------------+
|       $10 Gift Card|         10.0|
| AB Roller Evolution|        29.99|
|ASICS Men's GEL-B...|       139.99|
|ASICS Women's GEL...|        89.99|
|ASICS Women's GEL...|        89.99|
|ASICS Women's GEL...|       119.99|
|ASICS Women's GEL...|       119.99|
|ASICS Women's GEL...|       119.99|
|ASICS Women's GEL...|       139.99|
|ASICS Women's GEL...|       139.99|
|ASICS Women's GEL...|       139.99|
|Adams Golf Women'...|       499.99|
|Antigua Women's 2...|        54.99|
|BOSU Balance Trainer|       119.99|
|BOSU Balance Trainer|       119.99|
|   BOSU Ballast Ball|        59.99|
|Bag Boy Beverage ...|        24.99|
|Bag Boy C3 Push Cart|       199.99|
|   Bag Boy Cart Seat|        49.99|
|Bag Boy Express D...|       159.99|
+--------------------+-------------+
only showing top 20 rows


scala> df.select("product_name").distinct().count()
res22: Long = 750                                                               

scala> df.count()
res23: Long = 1345


// same product_name with > 10 variant
scala> df.groupBy("product_name").agg(count("product_name")).where (count("product_name") > 10).show()
+--------------------+-------------------+                                      
|        product_name|count(product_name)|
+--------------------+-------------------+
|Nike Women's Pro ...|                 12|
|Fitbit Flex Wirel...|                 13|
|   Nike+ Fuelband SE|                 15|
|Nike Kids' Grade ...|                 11|
|Nike Women's Temp...|                 12|
|Nike Women's Pro ...|                 11|
|Under Armour Men'...|                 20|
+--------------------+-------------------+


scala> val df = spark.sql("select * from ohm.customers")
df: org.apache.spark.sql.DataFrame = [customer_id: int, customer_fname: string ... 7 more fields]

scala> df.printSchema()
root
 |-- customer_id: integer (nullable = true)
 |-- customer_fname: string (nullable = true)
 |-- customer_lname: string (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- customer_password: string (nullable = true)
 |-- customer_street: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_zipcode: string (nullable = true)


scala> df.select("customer_fname").show(5)
+--------------+
|customer_fname|
+--------------+
|       Richard|
|          Mary|
|           Ann|
|          Mary|
|        Robert|
+--------------+
only showing top 5 rows




scala> val df = Seq( 
(101,"Ravi","Chennai",1000), 
(102,"Siva","Bangalore",1500),
(103,"Lara","Mumbai",1350), 
(104,"Kalai","Chennai",1560), 
(105, "Nila","Bangalore",1650)).toDF("id","name","city","salary")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]


scala> df.printSchema()
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: integer (nullable = false)


scala> df.show()
+---+-----+---------+------+
| id| name|     city|salary|
+---+-----+---------+------+
|101| Ravi|  Chennai|  1000|
|102| Siva|Bangalore|  1500|
|103| Lara|   Mumbai|  1350|
|104|Kalai|  Chennai|  1560|
|105| Nila|Bangalore|  1650|
+---+-----+---------+------+


scala> df.groupBy("city").agg(sum("salary")).show()
+---------+-----------+
|     city|sum(salary)|
+---------+-----------+
|Bangalore|       3150|
|  Chennai|       2560|
|   Mumbai|       1350|
+---------+-----------+


scala> df.groupBy("city").agg(min("salary") as "Minn", max("salary") as "Maxx").show()
+---------+----+----+
|     city|Minn|Maxx|
+---------+----+----+
|Bangalore|1500|1650|
|  Chennai|1000|1560|
|   Mumbai|1350|1350|


scala> df.sort("city","name","salary").select("city","name","salary").show()
+---------+-----+------+
|     city| name|salary|
+---------+-----+------+
|Bangalore| Nila|  1650|
|Bangalore| Siva|  1500|
|  Chennai|Kalai|  1560|
|  Chennai| Ravi|  1000|
|   Mumbai| Lara|  1350|
+---------+-----+------+


scala> df.groupBy("city").agg(count("city") as "Countt").show()
+---------+------+
|     city|Countt|
+---------+------+
|Bangalore|     2|
|  Chennai|     2|
|   Mumbai|     1|
+---------+------+



scala> df.groupBy("city").agg(avg("salary") as "Avgsalary").show()
+---------+---------+
|     city|Avgsalary|
+---------+---------+
|Bangalore|   1575.0|
|  Chennai|   1280.0|
|   Mumbai|   1350.0|
+---------+---------+

scala> df.select("city","name","salary").orderBy("city").show()
+---------+-----+------+
|     city| name|salary|
+---------+-----+------+
|Bangalore| Siva|  1500|
|Bangalore| Nila|  1650|
|  Chennai| Ravi|  1000|
|  Chennai|Kalai|  1560|
|   Mumbai| Lara|  1350|
+---------+-----+------+


scala> df.groupBy("city").agg(min("salary") as "Minn", max("salary") as "Maxx", avg("salary") as "Avg", sum("salary") as "Summ").show()
+---------+----+----+------+----+                                               
|     city|Minn|Maxx|   Avg|Summ|
+---------+----+----+------+----+
|Bangalore|1500|1650|1575.0|3150|
|  Chennai|1000|1560|1280.0|2560|
|   Mumbai|1350|1350|1350.0|1350|
+---------+----+----+------+----+

scala> df.groupBy("city").agg(min("salary") as "Minn", max("salary") as "Maxx", avg("salary") as "Avg", sum("salary") as "Summ").orderBy(desc("city")).show()
+---------+----+----+------+----+                                               
|     city|Minn|Maxx|   Avg|Summ|
+---------+----+----+------+----+
|   Mumbai|1350|1350|1350.0|1350|
|  Chennai|1000|1560|1280.0|2560|
|Bangalore|1500|1650|1575.0|3150|


scala> df.filter($"city" === "Bangalore").show()
+---+----+---------+------+
| id|name|     city|salary|
+---+----+---------+------+
|102|Siva|Bangalore|  1500|
|105|Nila|Bangalore|  1650|


scala> df.filter($"city" !== "Bangalore").show()
warning: there was one deprecation warning; re-run with -deprecation for details
+---+-----+-------+------+
| id| name|   city|salary|
+---+-----+-------+------+
|101| Ravi|Chennai|  1000|
|103| Lara| Mumbai|  1350|
|104|Kalai|Chennai|  1560|
+---+-----+-------+------+


scala> val dfBangalore = df.filter($"city" === "Bangalore")
dfBangalore: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 2 more fields]

scala> val dfChennai = df.filter($"city" === "Chennai")
dfChennai: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 2 more fields]

scala> val dfMumbai = df.filter($"city" === "Mumbai")
dfMumbai: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 2 more fields]


scala> val dfAll = dfBangalore.unionAll(dfChennai).unionAll(dfMumbai)
warning: there were two deprecation warnings; re-run with -deprecation for details
dfAll: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 2 more fields]




scala> dfAll.select("city").distinct().show()
+---------+
|     city|
+---------+
|Bangalore|
|  Chennai|
|   Mumbai|
+---------+

scala> dfAll.show()
+---+-----+---------+------+
| id| name|     city|salary|
+---+-----+---------+------+
|102| Siva|Bangalore|  1500|
|105| Nila|Bangalore|  1650|
|101| Ravi|  Chennai|  1000|
|104|Kalai|  Chennai|  1560|
|103| Lara|   Mumbai|  1350|
+---+-----+---------+------+

scala> dfBangalore.show()
+---+----+---------+------+
| id|name|     city|salary|
+---+----+---------+------+
|102|Siva|Bangalore|  1500|
|105|Nila|Bangalore|  1650|
+---+----+---------+------+


// All - Bangalore // Exclude Bangalore from All

scala> val dfBangaloreExcluded = dfAll.except(dfBangalore)
dfBangaloreExcluded: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 2 more fields]

scala> dfBangaloreExcluded.show()
+---+-----+-------+------+
| id| name|   city|salary|
+---+-----+-------+------+
|104|Kalai|Chennai|  1560|
|103| Lara| Mumbai|  1350|
|101| Ravi|Chennai|  1000|
+---+-----+-------+------+


// common rows between 2 different dataframes
scala> dfAll.intersect(dfBangalore).show()
+---+----+---------+------+                                                     
| id|name|     city|salary|
+---+----+---------+------+
|105|Nila|Bangalore|  1650|
|102|Siva|Bangalore|  1500|


scala> val first3DF = dfAll.limit(3)
first3DF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 2 more fields]

scala> first3DF.show()
+---+----+---------+------+
| id|name|     city|salary|
+---+----+---------+------+
|102|Siva|Bangalore|  1500|
|105|Nila|Bangalore|  1650|
|101|Ravi|  Chennai|  1000|
+---+----+---------+------+

// remove a column
scala> val idRemoved = first3DF.drop($"id")
idRemoved: org.apache.spark.sql.DataFrame = [name: string, city: string ... 1 more field]

scala> idRemoved.show()
+----+---------+------+
|name|     city|salary|
+----+---------+------+
|Siva|Bangalore|  1500|
|Nila|Bangalore|  1650|
|Ravi|  Chennai|  1000|
+----+---------+------+


scala> val cityOnly = idRemoved.select("city")
cityOnly: org.apache.spark.sql.DataFrame = [city: string]

scala> cityOnly.show()
+---------+
|     city|
+---------+
|Bangalore|
|Bangalore|
|  Chennai|
+---------+


// remove duplicates
scala> cityOnly.dropDuplicates().show()
+---------+                                                                     
|     city|
+---------+
|Bangalore|
|  Chennai|
+---------+


// we have some duplicates in the dataframe 
scala> val df = Seq( ("Ravi","Bangalore"), ("Raja","Bangalore"),("Ravi","Bangalore"),("Raja","Bangalore"),("Kumar","Chennai")).toDF("name","city")
df: org.apache.spark.sql.DataFrame = [name: string, city: string]

scala> df.show()
+-----+---------+
| name|     city|
+-----+---------+
| Ravi|Bangalore|
| Raja|Bangalore|
| Ravi|Bangalore|
| Raja|Bangalore|
|Kumar|  Chennai|
+-----+---------+


// with duplicates count
scala> df.count()
res94: Long = 5

// count after removing duplicates 
scala> df.dropDuplicates().count()

scala> df.dropDuplicates().show()
+-----+---------+
| name|     city|
+-----+---------+
| Ravi|Bangalore|
| Raja|Bangalore|
|Kumar|  Chennai|
+-----+---------+

scala> df.distinct().show()
+-----+---------+
| name|     city|
+-----+---------+
| Ravi|Bangalore|
| Raja|Bangalore|
|Kumar|  Chennai|
+-----+---------+


res96: Long = 3      


scala> val df = spark.sql("Select * from ohm.products")
df: org.apache.spark.sql.DataFrame = [product_id: int, product_category_id: int ... 4 more fields]

scala> df.createOrReplaceTempView("products")

scala> spark.sql("select * from products").show(5)
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|         1|                  2|Quest Q64 10 FT. ...|                   |        59.98|http://images.acm...|
|         2|                  2|Under Armour Men'...|                   |       129.99|http://images.acm...|
|         3|                  2|Under Armour Men'...|                   |        89.99|http://images.acm...|
|         4|                  2|Under Armour Men'...|                   |        89.99|http://images.acm...|
|         5|                  2|Riddell Youth Rev...|                   |       199.99|http://images.acm...|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+


scala> spark.sql("select distinct(product_name) from products").show(5)
+--------------------+
|        product_name|
+--------------------+
|Nike Women's Pro ...|
|Fitness Gear Heav...|
|Callaway X Hot La...|
|adidas Men's 2014...|
|Top Flite Kids' 2...|
+--------------------+
only showing top 5 rows


scala> spark.sql("select product_name, count(product_name) from products group by product_name having count(product_name) > 12").show(5)
+--------------------+-------------------+                                      
|        product_name|count(product_name)|
+--------------------+-------------------+
|Fitbit Flex Wirel...|                 13|
|   Nike+ Fuelband SE|                 15|
|Under Armour Men'...|                 20|
+--------------------+-------------------+





No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...