scala> val df = spark.sql("select * from ohm.products")
df: org.apache.spark.sql.DataFrame = [product_id: int, product_category_id: int ... 4 more fields]
scala> df.printSchema()
root
|-- product_id: integer (nullable = true)
|-- product_category_id: integer (nullable = true)
|-- product_name: string (nullable = true)
|-- product_description: string (nullable = true)
|-- product_price: float (nullable = true)
|-- product_image: string (nullable = true)
scala> df.show(5)
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id| product_name|product_description|product_price| product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
| 1| 2|Quest Q64 10 FT. ...| | 59.98|http://images.acm...|
| 2| 2|Under Armour Men'...| | 129.99|http://images.acm...|
| 3| 2|Under Armour Men'...| | 89.99|http://images.acm...|
| 4| 2|Under Armour Men'...| | 89.99|http://images.acm...|
| 5| 2|Riddell Youth Rev...| | 199.99|http://images.acm...|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
only showing top 5 rows
scala> df.select("product_id","product_name","product_price").show(5,false)
+----------+---------------------------------------------+-------------+
|product_id|product_name |product_price|
+----------+---------------------------------------------+-------------+
|1 |Quest Q64 10 FT. x 10 FT. Slant Leg Instant U|59.98 |
|2 |Under Armour Men's Highlight MC Football Clea|129.99 |
|3 |Under Armour Men's Renegade D Mid Football Cl|89.99 |
|4 |Under Armour Men's Renegade D Mid Football Cl|89.99 |
|5 |Riddell Youth Revolution Speed Custom Footbal|199.99 |
+----------+---------------------------------------------+-------------+
scala> val df1 = df.select("product_id","product_name","product_price")
df1: org.apache.spark.sql.DataFrame = [product_id: int, product_name: string ... 1 more field]
scala> df1.show(5)
+----------+--------------------+-------------+
|product_id| product_name|product_price|
+----------+--------------------+-------------+
| 1|Quest Q64 10 FT. ...| 59.98|
| 2|Under Armour Men'...| 129.99|
| 3|Under Armour Men'...| 89.99|
| 4|Under Armour Men'...| 89.99|
| 5|Riddell Youth Rev...| 199.99|
+----------+--------------------+-------------+
only showing top 5 rows
// sort by price ascending order
scala> df1.sort("product_price").show(10)
+----------+--------------------+-------------+
|product_id| product_name|product_price|
+----------+--------------------+-------------+
| 388|Nike Men's Hyperv...| 0.0|
| 517|Nike Men's Hyperv...| 0.0|
| 934|Callaway X Hot Dr...| 0.0|
| 1284|Nike Men's Hyperv...| 0.0|
| 414|Nike Men's Hyperv...| 0.0|
| 547|Nike Men's Hyperv...| 0.0|
| 38|Nike Men's Hyperv...| 0.0|
| 815|Zero Friction Pra...| 4.99|
| 624|adidas Batting He...| 4.99|
| 336|Nike Swoosh Headb...| 5.0|
+----------+--------------------+-------------+
only showing top 10 rows
scala> df1.orderBy("product_price").show(10)
+----------+--------------------+-------------+
|product_id| product_name|product_price|
+----------+--------------------+-------------+
| 388|Nike Men's Hyperv...| 0.0|
| 517|Nike Men's Hyperv...| 0.0|
| 934|Callaway X Hot Dr...| 0.0|
| 1284|Nike Men's Hyperv...| 0.0|
| 414|Nike Men's Hyperv...| 0.0|
| 547|Nike Men's Hyperv...| 0.0|
| 38|Nike Men's Hyperv...| 0.0|
| 815|Zero Friction Pra...| 4.99|
| 624|adidas Batting He...| 4.99|
| 336|Nike Swoosh Headb...| 5.0|
+----------+--------------------+-------------+
only showing top 10 rows
// sort by price - descending order
scala> df1.sort(desc("product_price")).show(10)
+----------+--------------------+-------------+
|product_id| product_name|product_price|
+----------+--------------------+-------------+
| 208| SOLE E35 Elliptical| 1999.99|
| 199| SOLE F85 Treadmill| 1799.99|
| 66| SOLE F85 Treadmill| 1799.99|
| 496| SOLE F85 Treadmill| 1799.99|
| 1048|Spalding Beast 60...| 1099.99|
| 60| SOLE E25 Elliptical| 999.99|
| 197| SOLE E25 Elliptical| 999.99|
| 694|Callaway Women's ...| 999.99|
| 488| SOLE E25 Elliptical| 999.99|
| 695|Callaway Women's ...| 999.99|
+----------+--------------------+-------------+
only showing top 10 rows
scala> df1.orderBy(desc("product_price")).show(10)
+----------+--------------------+-------------+
|product_id| product_name|product_price|
+----------+--------------------+-------------+
| 208| SOLE E35 Elliptical| 1999.99|
| 199| SOLE F85 Treadmill| 1799.99|
| 66| SOLE F85 Treadmill| 1799.99|
| 496| SOLE F85 Treadmill| 1799.99|
| 1048|Spalding Beast 60...| 1099.99|
| 60| SOLE E25 Elliptical| 999.99|
| 197| SOLE E25 Elliptical| 999.99|
| 694|Callaway Women's ...| 999.99|
| 488| SOLE E25 Elliptical| 999.99|
| 695|Callaway Women's ...| 999.99|
+----------+--------------------+-------------+
only showing top 10 rows
scala> df.select("product_name","product_price").sort("product_name","product_price").show()
+--------------------+-------------+
| product_name|product_price|
+--------------------+-------------+
| $10 Gift Card| 10.0|
| AB Roller Evolution| 29.99|
|ASICS Men's GEL-B...| 139.99|
|ASICS Women's GEL...| 89.99|
|ASICS Women's GEL...| 89.99|
|ASICS Women's GEL...| 119.99|
|ASICS Women's GEL...| 119.99|
|ASICS Women's GEL...| 119.99|
|ASICS Women's GEL...| 139.99|
|ASICS Women's GEL...| 139.99|
|ASICS Women's GEL...| 139.99|
|Adams Golf Women'...| 499.99|
|Antigua Women's 2...| 54.99|
|BOSU Balance Trainer| 119.99|
|BOSU Balance Trainer| 119.99|
| BOSU Ballast Ball| 59.99|
|Bag Boy Beverage ...| 24.99|
|Bag Boy C3 Push Cart| 199.99|
| Bag Boy Cart Seat| 49.99|
|Bag Boy Express D...| 159.99|
+--------------------+-------------+
only showing top 20 rows
scala> df.select("product_name").distinct().count()
res22: Long = 750
scala> df.count()
res23: Long = 1345
// same product_name with > 10 variant
scala> df.groupBy("product_name").agg(count("product_name")).where (count("product_name") > 10).show()
+--------------------+-------------------+
| product_name|count(product_name)|
+--------------------+-------------------+
|Nike Women's Pro ...| 12|
|Fitbit Flex Wirel...| 13|
| Nike+ Fuelband SE| 15|
|Nike Kids' Grade ...| 11|
|Nike Women's Temp...| 12|
|Nike Women's Pro ...| 11|
|Under Armour Men'...| 20|
+--------------------+-------------------+
scala> val df = spark.sql("select * from ohm.customers")
df: org.apache.spark.sql.DataFrame = [customer_id: int, customer_fname: string ... 7 more fields]
scala> df.printSchema()
root
|-- customer_id: integer (nullable = true)
|-- customer_fname: string (nullable = true)
|-- customer_lname: string (nullable = true)
|-- customer_email: string (nullable = true)
|-- customer_password: string (nullable = true)
|-- customer_street: string (nullable = true)
|-- customer_city: string (nullable = true)
|-- customer_state: string (nullable = true)
|-- customer_zipcode: string (nullable = true)
scala> df.select("customer_fname").show(5)
+--------------+
|customer_fname|
+--------------+
| Richard|
| Mary|
| Ann|
| Mary|
| Robert|
+--------------+
only showing top 5 rows
scala> val df = Seq(
(101,"Ravi","Chennai",1000),
(102,"Siva","Bangalore",1500),
(103,"Lara","Mumbai",1350),
(104,"Kalai","Chennai",1560),
(105, "Nila","Bangalore",1650)).toDF("id","name","city","salary")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
scala> df.printSchema()
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- city: string (nullable = true)
|-- salary: integer (nullable = false)
scala> df.show()
+---+-----+---------+------+
| id| name| city|salary|
+---+-----+---------+------+
|101| Ravi| Chennai| 1000|
|102| Siva|Bangalore| 1500|
|103| Lara| Mumbai| 1350|
|104|Kalai| Chennai| 1560|
|105| Nila|Bangalore| 1650|
+---+-----+---------+------+
scala> df.groupBy("city").agg(sum("salary")).show()
+---------+-----------+
| city|sum(salary)|
+---------+-----------+
|Bangalore| 3150|
| Chennai| 2560|
| Mumbai| 1350|
+---------+-----------+
scala> df.groupBy("city").agg(min("salary") as "Minn", max("salary") as "Maxx").show()
+---------+----+----+
| city|Minn|Maxx|
+---------+----+----+
|Bangalore|1500|1650|
| Chennai|1000|1560|
| Mumbai|1350|1350|
scala> df.sort("city","name","salary").select("city","name","salary").show()
+---------+-----+------+
| city| name|salary|
+---------+-----+------+
|Bangalore| Nila| 1650|
|Bangalore| Siva| 1500|
| Chennai|Kalai| 1560|
| Chennai| Ravi| 1000|
| Mumbai| Lara| 1350|
+---------+-----+------+
scala> df.groupBy("city").agg(count("city") as "Countt").show()
+---------+------+
| city|Countt|
+---------+------+
|Bangalore| 2|
| Chennai| 2|
| Mumbai| 1|
+---------+------+
scala> df.groupBy("city").agg(avg("salary") as "Avgsalary").show()
+---------+---------+
| city|Avgsalary|
+---------+---------+
|Bangalore| 1575.0|
| Chennai| 1280.0|
| Mumbai| 1350.0|
+---------+---------+
scala> df.select("city","name","salary").orderBy("city").show()
+---------+-----+------+
| city| name|salary|
+---------+-----+------+
|Bangalore| Siva| 1500|
|Bangalore| Nila| 1650|
| Chennai| Ravi| 1000|
| Chennai|Kalai| 1560|
| Mumbai| Lara| 1350|
+---------+-----+------+
scala> df.groupBy("city").agg(min("salary") as "Minn", max("salary") as "Maxx", avg("salary") as "Avg", sum("salary") as "Summ").show()
+---------+----+----+------+----+
| city|Minn|Maxx| Avg|Summ|
+---------+----+----+------+----+
|Bangalore|1500|1650|1575.0|3150|
| Chennai|1000|1560|1280.0|2560|
| Mumbai|1350|1350|1350.0|1350|
+---------+----+----+------+----+
scala> df.groupBy("city").agg(min("salary") as "Minn", max("salary") as "Maxx", avg("salary") as "Avg", sum("salary") as "Summ").orderBy(desc("city")).show()
+---------+----+----+------+----+
| city|Minn|Maxx| Avg|Summ|
+---------+----+----+------+----+
| Mumbai|1350|1350|1350.0|1350|
| Chennai|1000|1560|1280.0|2560|
|Bangalore|1500|1650|1575.0|3150|
scala> df.filter($"city" === "Bangalore").show()
+---+----+---------+------+
| id|name| city|salary|
+---+----+---------+------+
|102|Siva|Bangalore| 1500|
|105|Nila|Bangalore| 1650|
scala> df.filter($"city" !== "Bangalore").show()
warning: there was one deprecation warning; re-run with -deprecation for details
+---+-----+-------+------+
| id| name| city|salary|
+---+-----+-------+------+
|101| Ravi|Chennai| 1000|
|103| Lara| Mumbai| 1350|
|104|Kalai|Chennai| 1560|
+---+-----+-------+------+
scala> val dfBangalore = df.filter($"city" === "Bangalore")
dfBangalore: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 2 more fields]
scala> val dfChennai = df.filter($"city" === "Chennai")
dfChennai: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 2 more fields]
scala> val dfMumbai = df.filter($"city" === "Mumbai")
dfMumbai: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 2 more fields]
scala> val dfAll = dfBangalore.unionAll(dfChennai).unionAll(dfMumbai)
warning: there were two deprecation warnings; re-run with -deprecation for details
dfAll: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 2 more fields]
scala> dfAll.select("city").distinct().show()
+---------+
| city|
+---------+
|Bangalore|
| Chennai|
| Mumbai|
+---------+
scala> dfAll.show()
+---+-----+---------+------+
| id| name| city|salary|
+---+-----+---------+------+
|102| Siva|Bangalore| 1500|
|105| Nila|Bangalore| 1650|
|101| Ravi| Chennai| 1000|
|104|Kalai| Chennai| 1560|
|103| Lara| Mumbai| 1350|
+---+-----+---------+------+
scala> dfBangalore.show()
+---+----+---------+------+
| id|name| city|salary|
+---+----+---------+------+
|102|Siva|Bangalore| 1500|
|105|Nila|Bangalore| 1650|
+---+----+---------+------+
// All - Bangalore // Exclude Bangalore from All
scala> val dfBangaloreExcluded = dfAll.except(dfBangalore)
dfBangaloreExcluded: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 2 more fields]
scala> dfBangaloreExcluded.show()
+---+-----+-------+------+
| id| name| city|salary|
+---+-----+-------+------+
|104|Kalai|Chennai| 1560|
|103| Lara| Mumbai| 1350|
|101| Ravi|Chennai| 1000|
+---+-----+-------+------+
// common rows between 2 different dataframes
scala> dfAll.intersect(dfBangalore).show()
+---+----+---------+------+
| id|name| city|salary|
+---+----+---------+------+
|105|Nila|Bangalore| 1650|
|102|Siva|Bangalore| 1500|
scala> val first3DF = dfAll.limit(3)
first3DF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 2 more fields]
scala> first3DF.show()
+---+----+---------+------+
| id|name| city|salary|
+---+----+---------+------+
|102|Siva|Bangalore| 1500|
|105|Nila|Bangalore| 1650|
|101|Ravi| Chennai| 1000|
+---+----+---------+------+
// remove a column
scala> val idRemoved = first3DF.drop($"id")
idRemoved: org.apache.spark.sql.DataFrame = [name: string, city: string ... 1 more field]
scala> idRemoved.show()
+----+---------+------+
|name| city|salary|
+----+---------+------+
|Siva|Bangalore| 1500|
|Nila|Bangalore| 1650|
|Ravi| Chennai| 1000|
+----+---------+------+
scala> val cityOnly = idRemoved.select("city")
cityOnly: org.apache.spark.sql.DataFrame = [city: string]
scala> cityOnly.show()
+---------+
| city|
+---------+
|Bangalore|
|Bangalore|
| Chennai|
+---------+
// remove duplicates
scala> cityOnly.dropDuplicates().show()
+---------+
| city|
+---------+
|Bangalore|
| Chennai|
+---------+
// we have some duplicates in the dataframe
scala> val df = Seq( ("Ravi","Bangalore"), ("Raja","Bangalore"),("Ravi","Bangalore"),("Raja","Bangalore"),("Kumar","Chennai")).toDF("name","city")
df: org.apache.spark.sql.DataFrame = [name: string, city: string]
scala> df.show()
+-----+---------+
| name| city|
+-----+---------+
| Ravi|Bangalore|
| Raja|Bangalore|
| Ravi|Bangalore|
| Raja|Bangalore|
|Kumar| Chennai|
+-----+---------+
// with duplicates count
scala> df.count()
res94: Long = 5
// count after removing duplicates
scala> df.dropDuplicates().count()
scala> df.dropDuplicates().show()
+-----+---------+
| name| city|
+-----+---------+
| Ravi|Bangalore|
| Raja|Bangalore|
|Kumar| Chennai|
+-----+---------+
scala> df.distinct().show()
+-----+---------+
| name| city|
+-----+---------+
| Ravi|Bangalore|
| Raja|Bangalore|
|Kumar| Chennai|
+-----+---------+
res96: Long = 3
scala> val df = spark.sql("Select * from ohm.products")
df: org.apache.spark.sql.DataFrame = [product_id: int, product_category_id: int ... 4 more fields]
scala> df.createOrReplaceTempView("products")
scala> spark.sql("select * from products").show(5)
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id| product_name|product_description|product_price| product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
| 1| 2|Quest Q64 10 FT. ...| | 59.98|http://images.acm...|
| 2| 2|Under Armour Men'...| | 129.99|http://images.acm...|
| 3| 2|Under Armour Men'...| | 89.99|http://images.acm...|
| 4| 2|Under Armour Men'...| | 89.99|http://images.acm...|
| 5| 2|Riddell Youth Rev...| | 199.99|http://images.acm...|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
scala> spark.sql("select distinct(product_name) from products").show(5)
+--------------------+
| product_name|
+--------------------+
|Nike Women's Pro ...|
|Fitness Gear Heav...|
|Callaway X Hot La...|
|adidas Men's 2014...|
|Top Flite Kids' 2...|
+--------------------+
only showing top 5 rows
scala> spark.sql("select product_name, count(product_name) from products group by product_name having count(product_name) > 12").show(5)
+--------------------+-------------------+
| product_name|count(product_name)|
+--------------------+-------------------+
|Fitbit Flex Wirel...| 13|
| Nike+ Fuelband SE| 15|
|Under Armour Men'...| 20|
+--------------------+-------------------+
No comments:
Post a Comment