Tuesday, 18 August 2020

Customer, CustomerAddress, IncomeBand - DataSets in Spark with Scala

val df = spark.read.format("csv").option("inferSchema","True").option("header","true").load("/FileStore/tables/retailer/data/customer.csv") 

df.printSchema

root
 |-- c_customer_sk: integer (nullable = true)
 |-- c_customer_id: string (nullable = true)
 |-- c_current_cdemo_sk: integer (nullable = true)
 |-- c_current_hdemo_sk: integer (nullable = true)
 |-- c_current_addr_sk: integer (nullable = true)
 |-- c_first_shipto_date_sk: integer (nullable = true)
 |-- c_first_sales_date_sk: integer (nullable = true)
 |-- c_salutation: string (nullable = true)
 |-- c_first_name: string (nullable = true)
 |-- c_last_name: string (nullable = true)
 |-- c_preferred_cust_flag: string (nullable = true)
 |-- c_birth_day: integer (nullable = true)
 |-- c_birth_month: integer (nullable = true)
 |-- c_birth_year: integer (nullable = true)
 |-- c_birth_country: string (nullable = true)
 |-- c_login: string (nullable = true)
 |-- c_email_address: string (nullable = true)
 |-- c_last_review_date: double (nullable = true)
 
 df.columns.foreach(println)
 
 
c_customer_sk
c_customer_id
c_current_cdemo_sk
c_current_hdemo_sk
c_current_addr_sk
c_first_shipto_date_sk
c_first_sales_date_sk
c_salutation
c_first_name
c_last_name
c_preferred_cust_flag
c_birth_day
c_birth_month
c_birth_year
c_birth_country
c_login
c_email_address
c_last_review_date

df.columns.length
18

df.select("c_customer_id","c_first_name","c_last_name","c_birth_year","c_birth_month","c_birth_day").show(3,truncate=false)


+----------------+--------------------+------------------------------+------------+-------------+-----------+
|c_customer_id   |c_first_name        |c_last_name                   |c_birth_year|c_birth_month|c_birth_day|
+----------------+--------------------+------------------------------+------------+-------------+-----------+
|AAAAAAAABAAAAAAA|Javier              |Lewis                         |1936        |12           |9          |
|AAAAAAAACAAAAAAA|Amy                 |Moses                         |1966        |4            |9          |
|AAAAAAAADAAAAAAA|Latisha             |Hamilton                      |1979        |9            |18         |
+----------------+--------------------+------------------------------+------------+-------------+-----------+



df.select("c_customer_id","c_first_name","c_last_name","c_birth_year","c_birth_month","c_birth_day").show(3)


import org.apache.spark.sql.functions.{col, column}

df.select(col("c_last_name"),column("c_first_name"), $"c_birth_year").show(3)


+--------------------+--------------------+------------+
|         c_last_name|        c_first_name|c_birth_year|
+--------------------+--------------------+------------+
|Lewis            ...|Javier              |        1936|
|Moses            ...|Amy                 |        1966|
|Hamilton         ...|Latisha             |        1979|
+--------------------+--------------------+------------+


val customerWithBday = df.select("c_customer_id","c_first_name","c_last_name","c_birth_year","c_birth_month","c_birth_day")


customerWithBday.printSchema()

root
 |-- c_customer_id: string (nullable = true)
 |-- c_first_name: string (nullable = true)
 |-- c_last_name: string (nullable = true)
 |-- c_birth_year: integer (nullable = true)
 |-- c_birth_month: integer (nullable = true)
 |-- c_birth_day: integer (nullable = true)
 
 
 customerWithBday.schema
 
 
 res21: org.apache.spark.sql.types.StructType = StructType(
StructField(c_customer_id,StringType,true), 
StructField(c_first_name,StringType,true), 
StructField(c_last_name,StringType,true), 
StructField(c_birth_year,IntegerType,true), 
StructField(c_birth_month,IntegerType,true), 
StructField(c_birth_day,IntegerType,true))
%fs head /FileStore/tables/retailer/data/customer_address.dat

ca_address_sk|ca_address_id|ca_street_number|ca_street_name|ca_street_type|ca_suite_number|ca_city|ca_county|ca_state|ca_zip|ca_country|ca_gmt_offset|ca_location_type
1|AAAAAAAABAAAAAAA|18        |Jackson |Parkway        |Suite 280 |Fairfield|Maricopa County|AZ|86192     |United States|-7.00|condo               
2|AAAAAAAACAAAAAAA|362       |Washington 6th|RD             |Suite 80  |Fairview|Taos County|NM|85709     |United States|-7.00|condo  

 
 
 /FileStore/tables/retailer/data/customer_address.dat
 
 
 val custAddDF = spark.read.format("csv"). option("header","true").option("sep","|").option("inferSchema","true").load("/FileStore/tables/retailer/data/customer_address.dat")
 
 custAddDF.printSchema()
 
 root
 |-- ca_address_sk: integer (nullable = true)
 |-- ca_address_id: string (nullable = true)
 |-- ca_street_number: double (nullable = true)  -- wrong 
 |-- ca_street_name: string (nullable = true)
 |-- ca_street_type: string (nullable = true)
 |-- ca_suite_number: string (nullable = true)
 |-- ca_city: string (nullable = true)
 |-- ca_county: string (nullable = true)
 |-- ca_state: string (nullable = true)
 |-- ca_zip: double (nullable = true)
 |-- ca_country: string (nullable = true)
 |-- ca_gmt_offset: double (nullable = true) -- change it into decimal (5,2)
 |-- ca_location_type: string (nullable = true)
 
  
 
val custAddSchemaDDL  = "ca_address_sk long, ca_address_id string, ca_street_number string, ca_street_name string, "  +
"ca_street_type string, ca_suite_number string, ca_city string, " +
"ca_county string, ca_state string, ca_zip string, ca_country string, ca_gmt_offset decimal(5,2), ca_location_type string"

val custAddDF = spark.read.format("csv").schema(custAddSchemaDDL).option("header","true").option("sep","|").option("inferSchema","true").load("/FileStore/tables/retailer/data/customer_address.dat")

custAddDF.printSchema()

root
 |-- ca_address_sk: long (nullable = true)
 |-- ca_address_id: string (nullable = true)
 |-- ca_street_number: string (nullable = true)
 |-- ca_street_name: string (nullable = true)
 |-- ca_street_type: string (nullable = true)
 |-- ca_suite_number: string (nullable = true)
 |-- ca_city: string (nullable = true)
 |-- ca_county: string (nullable = true)
 |-- ca_state: string (nullable = true)
 |-- ca_zip: string (nullable = true)
 |-- ca_country: string (nullable = true)
 |-- ca_gmt_offset: decimal(5,2) (nullable = true)
 |-- ca_location_type: string (nullable = true)
 
 
 custAddDF.schema.toDDL
 
 res38: String = `ca_address_sk` BIGINT,`ca_address_id` STRING,`ca_street_number` STRING,`ca_street_name` STRING,`ca_street_type` STRING,`ca_suite_number` STRING,`ca_city` STRING,`ca_county` STRING,`ca_state` STRING,`ca_zip` STRING,`ca_country` STRING,`ca_gmt_offset` DECIMAL(5,2),`ca_location_type` STRING



custAddDF.schema

res39: org.apache.spark.sql.types.StructType = StructType(StructField(ca_address_sk,LongType,true), StructField(ca_address_id,StringType,true), StructField(ca_street_number,StringType,true), StructField(ca_street_name,StringType,true), StructField(ca_street_type,StringType,true), StructField(ca_suite_number,StringType,true), StructField(ca_city,StringType,true), StructField(ca_county,StringType,true), StructField(ca_state,StringType,true), StructField(ca_zip,StringType,true), StructField(ca_country,StringType,true), StructField(ca_gmt_offset,DecimalType(5,2),true), StructField(ca_location_type,StringType,true))


val incomeBandDF = spark.read.format("csv").schema("ib_lower_band_sk long, ib_lower_bound int, ib_upper_bound int").option("sep","|").load("/FileStore/tables/retailer/data/income_band.dat")

incomeBandDF.printSchema()

root
 |-- ib_lower_band_sk: long (nullable = true)
 |-- ib_lower_bound: integer (nullable = true)
 |-- ib_upper_bound: integer (nullable = true)
 
 
 incomeBandDF.show(5)
 
 +----------------+--------------+--------------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|
+----------------+--------------+--------------+
|               1|             0|         10000|
|               2|         10001|         20000|
|               3|         20001|         30000|
|               4|         30001|         40000|
|               5|         40001|         50000|
+----------------+--------------+--------------+

// create a new column 
val incomeBandwithGroups = incomeBandDF.withColumn("isFirstIncomeGroup",incomeBandDF.col("ib_upper_bound") <= 60000)

incomeBandwithGroups.show(5)

+----------------+--------------+--------------+------------------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|isFirstIncomeGroup|
+----------------+--------------+--------------+------------------+
|               1|             0|         10000|              true|
|               2|         10001|         20000|              true|
|               3|         20001|         30000|              true|
|               4|         30001|         40000|              true|
|               5|         40001|         50000|              true|
+----------------+--------------+--------------+------------------+


// create static column with constant value we need to use lit
import org.apache.spark.sql.functions.{col, lit, when}

val incomeBandwithGroups = incomeBandDF.
withColumn("isFirstIncomeGroup",incomeBandDF.col("ib_upper_bound") <= 60000).
withColumn("isSecodIncomeGroup",incomeBandDF("ib_upper_bound") > 60000 and incomeBandDF("ib_upper_bound") <= 12000).
withColumn("isThirdIncomeGroup",incomeBandDF("ib_upper_bound") > 12000 and incomeBandDF("ib_upper_bound") < 200000).
withColumn("demo",lit("demoValue"))
incomeBandwithGroups.show(5)

+----------------+--------------+--------------+------------------+------------------+------------------+---------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|isFirstIncomeGroup|isSecodIncomeGroup|isThirdIncomeGroup|     demo|
+----------------+--------------+--------------+------------------+------------------+------------------+---------+
|               1|             0|         10000|              true|             false|             false|demoValue|
|               2|         10001|         20000|              true|             false|              true|demoValue|
|               3|         20001|         30000|              true|             false|              true|demoValue|
|               4|         30001|         40000|              true|             false|              true|demoValue|
|               5|         40001|         50000|              true|             false|              true|demoValue|
+----------------+--------------+--------------+------------------+------------------+------------------+---------+
// Rename an existing columns

val incomeClasses = incomeBandwithGroups.
withColumnRenamed("isThirdIncomeGroup","isHighIncomeClass").
withColumnRenamed("isFirstIncomeGroup","isStandardIncomeClass").
withColumnRenamed("isSecodIncomeGroup","isMediumIncomeClass")
incomeClasses.show(5)

+----------------+--------------+--------------+---------------------+-------------------+-----------------+---------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|isStandardIncomeClass|isMediumIncomeClass|isHighIncomeClass|     demo|
+----------------+--------------+--------------+---------------------+-------------------+-----------------+---------+
|               1|             0|         10000|                 true|              false|            false|demoValue|
|               2|         10001|         20000|                 true|              false|             true|demoValue|
|               3|         20001|         30000|                 true|              false|             true|demoValue|
|               4|         30001|         40000|                 true|              false|             true|demoValue|
|               5|         40001|         50000|                 true|              false|             true|demoValue|
+----------------+--------------+--------------+---------------------+-------------------+-----------------+---------+
only showing top 5 rows


// Remove columns
val onlyMediumDF = incomeClasses.drop("demo","isStandardIncomeClass", "isHighIncomeClass")

onlyMediumDF.show(5)


+----------------+--------------+--------------+-------------------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|isMediumIncomeClass|
+----------------+--------------+--------------+-------------------+
|               1|             0|         10000|              false|
|               2|         10001|         20000|              false|
|               3|         20001|         30000|              false|
|               4|         30001|         40000|              false|
|               5|         40001|         50000|              false|
+----------------+--------------+--------------+-------------------+


val custDF = spark.read.format("csv").option("inferSchema",true).option("header",true).option("sep","|").load("/FileStore/tables/retailer/data/customer.dat")

custDF.count()
res75: Long = 100000  -- total records 


// validate date range 
custDF.filter($"c_birth_day" > 0 and $"c_birth_day" <= 31 ).count()

res77: Long = 96539  -- valid records 



validRecordsDF.select("c_customer_id", "c_first_name","c_birth_day", "c_birth_month", "c_birth_year").show(5)


+----------------+--------------------+-----------+-------------+------------+
|   c_customer_id|        c_first_name|c_birth_day|c_birth_month|c_birth_year|
+----------------+--------------------+-----------+-------------+------------+
|AAAAAAAABAAAAAAA|Javier              |          9|           12|        1936|
|AAAAAAAACAAAAAAA|Amy                 |          9|            4|        1966|
|AAAAAAAADAAAAAAA|Latisha             |         18|            9|        1979|
|AAAAAAAAEAAAAAAA|Michael             |          7|            6|        1983|
|AAAAAAAAFAAAAAAA|Robert              |          8|            5|        1956|
+----------------+--------------------+-----------+-------------+------------+


// validate dates and months 
val validRecordsDF = custDF.filter($"c_birth_day" > 0 and $"c_birth_day" <= 31 and $"c_birth_month" > 0 and $"c_birth_month" <= 12 and 'c_birth_year' > 0)

validRecordsDF.count()
94791

validRecordsDF.count()
94791


Keys:
customer.c_current_addr_sk  
customer_address.ca_address_sk


val custDF = spark.read.format("csv").option("inferSchema",true).
              option("header",true).option("sep","|").load("/FileStore/tables/retailer/data/customer.dat")

val custAddDF = spark.read.format("csv").option("inferSchema",true).option("header",true).option("sep","|").
              load("/FileStore/tables/retailer/data/customer_address.dat")

val joinExpression = custDF.col("c_current_addr_sk") === custAddDF.col("ca_address_sk")
val custwithAddDF  = custDF.join(custAddDF,joinExpression,"inner").select("c_customer_id","ca_address_sk","c_first_name","c_last_name")




 custwithAddDF.show(5)
 +----------------+-------------+--------------------+--------------------+
|   c_customer_id|ca_address_sk|        c_first_name|         c_last_name|
+----------------+-------------+--------------------+--------------------+
|AAAAAAAABAAAAAAA|        32946|Javier              |Lewis            ...|
|AAAAAAAACAAAAAAA|        31655|Amy                 |Moses            ...|
|AAAAAAAADAAAAAAA|        48572|Latisha             |Hamilton         ...|
|AAAAAAAAEAAAAAAA|        39558|Michael             |White            ...|
|AAAAAAAAFAAAAAAA|        36368|Robert              |Moran            ...|
+----------------+-------------+--------------------+--------------------+



custAddDF.printSchema

root
 |-- ca_address_sk: integer (nullable = true)
 |-- ca_address_id: string (nullable = true)
 |-- ca_street_number: double (nullable = true)
 |-- ca_street_name: string (nullable = true)
 |-- ca_street_type: string (nullable = true)
 |-- ca_suite_number: string (nullable = true)
 |-- ca_city: string (nullable = true)
 |-- ca_county: string (nullable = true)
 |-- ca_state: string (nullable = true)
 |-- ca_zip: double (nullable = true)
 |-- ca_country: string (nullable = true)
 |-- ca_gmt_offset: double (nullable = true)
 |-- ca_location_type: string (nullable = true
 
 
 custDF.join(custAddDF,joinExpression,"inner").select("c_customer_id","c_first_name","c_last_name", "ca_street_name","ca_city").show(5)
 
 +----------------+--------------------+--------------------+--------------+-----------+
|   c_customer_id|        c_first_name|         c_last_name|ca_street_name|    ca_city|
+----------------+--------------------+--------------------+--------------+-----------+
|AAAAAAAABAAAAAAA|Javier              |Lewis            ...| Chestnut Main|Spring Hill|
|AAAAAAAACAAAAAAA|Amy                 |Moses            ...|       8th Oak|    Antioch|
|AAAAAAAADAAAAAAA|Latisha             |Hamilton         ...|Park Jefferson|Cedar Grove|
|AAAAAAAAEAAAAAAA|Michael             |White            ...|Cedar Sycamore|   Lakewood|
|AAAAAAAAFAAAAAAA|Robert              |Moran            ...|   Miller Main|   Waterloo|
+----------------+--------------------+--------------------+--------------+-----------+



custDF.count()
res89: Long = 100000


import org.apache.spark.sql.functions.count
custDF.select(count("*")).show()

100000



custDF.select(count("c_first_name")).show()
96508 -- null values ignored




custDF.filter($"c_first_name".isNotNull).count()
96508 -- null values ignored

// Display the count of all c_first_name is NULL
custDF.filter($"c_first_name".isNull).count()
3492


//countDistinct

import org.apache.spark.sql.functions.countDistinct

custDF.select(countDistinct("c_first_name")).show()

4131


custDF.select("c_first_name").distinct().count()
res98: Long = 4132

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...