val itemDF = spark.read.format("csv").options(Map("header"->"true", "delimiter"->"|", "inferSchema"->"true")).load("/FileStore/tables/retailer/data/item.dat")
itemDF.printSchema()
root
|-- i_item_sk: integer (nullable = true)
|-- i_item_id: string (nullable = true)
|-- i_rec_start_date: string (nullable = true)
|-- i_rec_end_date: string (nullable = true)
|-- i_item_desc: string (nullable = true)
|-- i_current_price: double (nullable = true)
|-- i_wholesale_cost: double (nullable = true)
|-- i_brand_id: integer (nullable = true)
|-- i_brand: string (nullable = true)
|-- i_class_id: integer (nullable = true)
|-- i_class: string (nullable = true)
|-- i_category_id: integer (nullable = true)
|-- i_category: string (nullable = true)
|-- i_manufact_id: integer (nullable = true)
|-- i_manufact: string (nullable = true)
|-- i_size: string (nullable = true)
|-- i_formulation: string (nullable = true)
|-- i_color: string (nullable = true)
|-- i_units: string (nullable = true)
|-- i_container: string (nullable = true)
|-- i_manager_id: integer (nullable = true)
|-- i_product_name: string (nullable = true)
import org.apache.spark.sql.functions._
itemDF.select(min("i_wholesale_cost").alias("Minn"), max("i_wholesale_cost").alias("Maxx")).show()
+----+-----+
|Minn| Maxx|
+----+-----+
|0.02|87.36|
+----+-----+
Or
import org.apache.spark.sql.functions._
val minMaxDF = itemDF.select(min("i_wholesale_cost"),max("i_wholesale_cost")).withColumnRenamed("min(i_wholesale_cost)","Minn").withColumnRenamed("max(i_wholesale_cost)","Maxx")
minMaxDF.show()
+----+-----+
|Minn| Maxx|
+----+-----+
|0.02|87.36|
+----+-----+
// Filter condition
display(itemDF.filter($"i_wholesale_cost" === 0.02))
import org.apache.spark.sql.functions._
val minWSCostDF = itemDF.select(min("i_wholesale_cost").alias("Minn"))
minWSCostDF.show()
+----+
|Minn|
+----+
|0.02|
+----+
val cheapestItemDF = itemDF.join(minWSCostDF,itemDF.col("i_wholesale_cost") === minWSCostDF.col("Minn"),"inner")
cheapestItemDF.select("i_item_desc","i_current_price","i_category").show()
+--------------------+---------------+--------------------+
| i_item_desc|i_current_price| i_category|
+--------------------+---------------+--------------------+
|Forces can testif...| 0.09|Books ...|
|Forces can testif...| 9.21|Sports ...|
+--------------------+---------------+--------------------+
import org.apache.spark.sql.functions._
itemDF.select(max("i_current_price").alias("MaxCurrentPrice")).show()
+---------------+
|MaxCurrentPrice|
+---------------+
| 99.99|
+---------------+
val storeSalesDF = spark.read.format("csv").option("header","true").option("sep","|").option("inferSchema","true").load("/FileStore/tables/retailer/data/store_sales.dat")
storeSalesDF.count()
2880404
storeSalesDF.printSchema
storeSalesDF.printSchema
root
|-- ss_sold_date_sk: integer (nullable = true)
|-- ss_sold_time_sk: integer (nullable = true)
|-- ss_item_sk: integer (nullable = true)
|-- ss_customer_sk: integer (nullable = true)
|-- ss_cdemo_sk: integer (nullable = true)
|-- ss_hdemo_sk: integer (nullable = true)
|-- ss_addr_sk: integer (nullable = true)
|-- ss_store_sk: integer (nullable = true)
|-- ss_promo_sk: integer (nullable = true)
|-- ss_ticket_number: integer (nullable = true)
|-- ss_quantity: integer (nullable = true)
|-- ss_wholesale_cost: double (nullable = true)
|-- ss_list_price: double (nullable = true)
|-- ss_sales_price: double (nullable = true)
|-- ss_ext_discount_amt: double (nullable = true)
|-- ss_ext_sales_price: double (nullable = true)
|-- ss_ext_wholesale_cost: double (nullable = true)
|-- ss_ext_list_price: double (nullable = true)
|-- ss_ext_tax: double (nullable = true)
|-- ss_coupon_amt: double (nullable = true)
|-- ss_net_paid: double (nullable = true)
|-- ss_net_paid_inc_tax: double (nullable = true)
|-- ss_net_profit: double (nullable = true)
import org.apache.spark.sql.functions.sum
storeSalesDF.select(sum("ss_net_paid_inc_tax").alias("NetIncomeTaxPaid"), sum("ss_net_profit").alias("NetProfit")).show()
+-------------------+--------------------+
| NetIncomeTaxPaid| NetProfit|
+-------------------+--------------------+
|4.954755825529975E9|-2.276100670919993E9|
+-------------------+--------------------+
import org.apache.spark.sql.functions.{sum,sumDistinct}
storeSalesDF.select(sumDistinct("ss_quantity"),sum("ss_quantity")).show()
+-------------------------+----------------+
|sum(DISTINCT ss_quantity)|sum(ss_quantity)|
+-------------------------+----------------+
| 5050| 138943711|
+-------------------------+----------------+
import org.apache.spark.sql.functions.{avg,mean}
storeSalesDF.select(avg("ss_quantity"),mean("ss_quantity")).show()
+-----------------+-----------------+
| avg(ss_quantity)| avg(ss_quantity)|
+-----------------+-----------------+
|50.51749085953793|50.51749085953793|
+-----------------+-----------------+
import org.apache.spark.sql.functions.{avg,mean,min,max}
storeSalesDF.select(
avg("ss_quantity").as("Average_Purchases"),
mean("ss_quantity").as("Mean_Purchases"),
sum("ss_quantity") / count("ss_quantity"),
min("ss_quantity").as("Min_Purchases"),
max("ss_quantity").alias("Max_Purchases")).show()
+-----------------+-----------------+---------------------------------------+-------------+-------------+
|Average_Purchases| Mean_Purchases|(sum(ss_quantity) / count(ss_quantity))|Min_Purchases|Max_Purchases|
+-----------------+-----------------+---------------------------------------+-------------+-------------+
|50.51749085953793|50.51749085953793| 50.51749085953793| 1| 100|
+-----------------+-----------------+---------------------------------------+-------------+-------------+
No comments:
Post a Comment