Tuesday, 18 August 2020

Item, StoreSales DataSets - Spark with Scala Programming


val itemDF = spark.read.format("csv").options(Map("header"->"true", "delimiter"->"|", "inferSchema"->"true")).load("/FileStore/tables/retailer/data/item.dat")
 
itemDF.printSchema()

root
 |-- i_item_sk: integer (nullable = true)
 |-- i_item_id: string (nullable = true)
 |-- i_rec_start_date: string (nullable = true)
 |-- i_rec_end_date: string (nullable = true)
 |-- i_item_desc: string (nullable = true)
 |-- i_current_price: double (nullable = true)
 |-- i_wholesale_cost: double (nullable = true)
 |-- i_brand_id: integer (nullable = true)
 |-- i_brand: string (nullable = true)
 |-- i_class_id: integer (nullable = true)
 |-- i_class: string (nullable = true)
 |-- i_category_id: integer (nullable = true)
 |-- i_category: string (nullable = true)
 |-- i_manufact_id: integer (nullable = true)
 |-- i_manufact: string (nullable = true)
 |-- i_size: string (nullable = true)
 |-- i_formulation: string (nullable = true)
 |-- i_color: string (nullable = true)
 |-- i_units: string (nullable = true)
 |-- i_container: string (nullable = true)
 |-- i_manager_id: integer (nullable = true)
 |-- i_product_name: string (nullable = true)
 
 
import org.apache.spark.sql.functions._
itemDF.select(min("i_wholesale_cost").alias("Minn"), max("i_wholesale_cost").alias("Maxx")).show()

+----+-----+
|Minn| Maxx|
+----+-----+
|0.02|87.36|
+----+-----+

Or

import org.apache.spark.sql.functions._

val minMaxDF = itemDF.select(min("i_wholesale_cost"),max("i_wholesale_cost")).withColumnRenamed("min(i_wholesale_cost)","Minn").withColumnRenamed("max(i_wholesale_cost)","Maxx")
minMaxDF.show()

+----+-----+
|Minn| Maxx|
+----+-----+
|0.02|87.36|
+----+-----+


// Filter condition
display(itemDF.filter($"i_wholesale_cost" === 0.02))



import org.apache.spark.sql.functions._
val minWSCostDF = itemDF.select(min("i_wholesale_cost").alias("Minn"))

minWSCostDF.show()

+----+
|Minn|
+----+
|0.02|
+----+

val cheapestItemDF = itemDF.join(minWSCostDF,itemDF.col("i_wholesale_cost") === minWSCostDF.col("Minn"),"inner")

cheapestItemDF.select("i_item_desc","i_current_price","i_category").show()

+--------------------+---------------+--------------------+
|         i_item_desc|i_current_price|          i_category|
+--------------------+---------------+--------------------+
|Forces can testif...|           0.09|Books            ...|
|Forces can testif...|           9.21|Sports           ...|
+--------------------+---------------+--------------------+

 

import org.apache.spark.sql.functions._
itemDF.select(max("i_current_price").alias("MaxCurrentPrice")).show()

+---------------+
|MaxCurrentPrice|
+---------------+
|          99.99|
+---------------+


val storeSalesDF = spark.read.format("csv").option("header","true").option("sep","|").option("inferSchema","true").load("/FileStore/tables/retailer/data/store_sales.dat")

storeSalesDF.count()
2880404

storeSalesDF.printSchema


storeSalesDF.printSchema
root
 |-- ss_sold_date_sk: integer (nullable = true)
 |-- ss_sold_time_sk: integer (nullable = true)
 |-- ss_item_sk: integer (nullable = true)
 |-- ss_customer_sk: integer (nullable = true)
 |-- ss_cdemo_sk: integer (nullable = true)
 |-- ss_hdemo_sk: integer (nullable = true)
 |-- ss_addr_sk: integer (nullable = true)
 |-- ss_store_sk: integer (nullable = true)
 |-- ss_promo_sk: integer (nullable = true)
 |-- ss_ticket_number: integer (nullable = true)
 |-- ss_quantity: integer (nullable = true)
 |-- ss_wholesale_cost: double (nullable = true)
 |-- ss_list_price: double (nullable = true)
 |-- ss_sales_price: double (nullable = true)
 |-- ss_ext_discount_amt: double (nullable = true)
 |-- ss_ext_sales_price: double (nullable = true)
 |-- ss_ext_wholesale_cost: double (nullable = true)
 |-- ss_ext_list_price: double (nullable = true)
 |-- ss_ext_tax: double (nullable = true)
 |-- ss_coupon_amt: double (nullable = true)
 |-- ss_net_paid: double (nullable = true)
 |-- ss_net_paid_inc_tax: double (nullable = true)
 |-- ss_net_profit: double (nullable = true)
 
 
 import org.apache.spark.sql.functions.sum
 
 storeSalesDF.select(sum("ss_net_paid_inc_tax").alias("NetIncomeTaxPaid"), sum("ss_net_profit").alias("NetProfit")).show()
 
+-------------------+--------------------+
|   NetIncomeTaxPaid|           NetProfit|
+-------------------+--------------------+
|4.954755825529975E9|-2.276100670919993E9|
+-------------------+--------------------+

import org.apache.spark.sql.functions.{sum,sumDistinct}

storeSalesDF.select(sumDistinct("ss_quantity"),sum("ss_quantity")).show()

+-------------------------+----------------+
|sum(DISTINCT ss_quantity)|sum(ss_quantity)|
+-------------------------+----------------+
|                     5050|       138943711|
+-------------------------+----------------+



import org.apache.spark.sql.functions.{avg,mean}
storeSalesDF.select(avg("ss_quantity"),mean("ss_quantity")).show()

+-----------------+-----------------+
| avg(ss_quantity)| avg(ss_quantity)|
+-----------------+-----------------+
|50.51749085953793|50.51749085953793|
+-----------------+-----------------+

import org.apache.spark.sql.functions.{avg,mean,min,max}

storeSalesDF.select(
avg("ss_quantity").as("Average_Purchases"),
mean("ss_quantity").as("Mean_Purchases"),
sum("ss_quantity") / count("ss_quantity"),
min("ss_quantity").as("Min_Purchases"),
max("ss_quantity").alias("Max_Purchases")).show()
+-----------------+-----------------+---------------------------------------+-------------+-------------+
|Average_Purchases|   Mean_Purchases|(sum(ss_quantity) / count(ss_quantity))|Min_Purchases|Max_Purchases|
+-----------------+-----------------+---------------------------------------+-------------+-------------+
|50.51749085953793|50.51749085953793|                      50.51749085953793|            1|          100|
+-----------------+-----------------+---------------------------------------+-------------+-------------+

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...