val df = spark.read.format("csv").option("inferSchema","True").option("header","true").load("/FileStore/tables/retailer/data/customer.csv")
df.printSchema
root
|-- c_customer_sk: integer (nullable = true)
|-- c_customer_id: string (nullable = true)
|-- c_current_cdemo_sk: integer (nullable = true)
|-- c_current_hdemo_sk: integer (nullable = true)
|-- c_current_addr_sk: integer (nullable = true)
|-- c_first_shipto_date_sk: integer (nullable = true)
|-- c_first_sales_date_sk: integer (nullable = true)
|-- c_salutation: string (nullable = true)
|-- c_first_name: string (nullable = true)
|-- c_last_name: string (nullable = true)
|-- c_preferred_cust_flag: string (nullable = true)
|-- c_birth_day: integer (nullable = true)
|-- c_birth_month: integer (nullable = true)
|-- c_birth_year: integer (nullable = true)
|-- c_birth_country: string (nullable = true)
|-- c_login: string (nullable = true)
|-- c_email_address: string (nullable = true)
|-- c_last_review_date: double (nullable = true)
df.columns.foreach(println)
c_customer_sk
c_customer_id
c_current_cdemo_sk
c_current_hdemo_sk
c_current_addr_sk
c_first_shipto_date_sk
c_first_sales_date_sk
c_salutation
c_first_name
c_last_name
c_preferred_cust_flag
c_birth_day
c_birth_month
c_birth_year
c_birth_country
c_login
c_email_address
c_last_review_date
df.columns.length
18
df.select("c_customer_id","c_first_name","c_last_name","c_birth_year","c_birth_month","c_birth_day").show(3,truncate=false)
+----------------+--------------------+------------------------------+------------+-------------+-----------+
|c_customer_id |c_first_name |c_last_name |c_birth_year|c_birth_month|c_birth_day|
+----------------+--------------------+------------------------------+------------+-------------+-----------+
|AAAAAAAABAAAAAAA|Javier |Lewis |1936 |12 |9 |
|AAAAAAAACAAAAAAA|Amy |Moses |1966 |4 |9 |
|AAAAAAAADAAAAAAA|Latisha |Hamilton |1979 |9 |18 |
+----------------+--------------------+------------------------------+------------+-------------+-----------+
df.select("c_customer_id","c_first_name","c_last_name","c_birth_year","c_birth_month","c_birth_day").show(3)
import org.apache.spark.sql.functions.{col, column}
df.select(col("c_last_name"),column("c_first_name"), $"c_birth_year").show(3)
+--------------------+--------------------+------------+
| c_last_name| c_first_name|c_birth_year|
+--------------------+--------------------+------------+
|Lewis ...|Javier | 1936|
|Moses ...|Amy | 1966|
|Hamilton ...|Latisha | 1979|
+--------------------+--------------------+------------+
val customerWithBday = df.select("c_customer_id","c_first_name","c_last_name","c_birth_year","c_birth_month","c_birth_day")
customerWithBday.printSchema()
root
|-- c_customer_id: string (nullable = true)
|-- c_first_name: string (nullable = true)
|-- c_last_name: string (nullable = true)
|-- c_birth_year: integer (nullable = true)
|-- c_birth_month: integer (nullable = true)
|-- c_birth_day: integer (nullable = true)
customerWithBday.schema
res21: org.apache.spark.sql.types.StructType = StructType(
StructField(c_customer_id,StringType,true),
StructField(c_first_name,StringType,true),
StructField(c_last_name,StringType,true),
StructField(c_birth_year,IntegerType,true),
StructField(c_birth_month,IntegerType,true),
StructField(c_birth_day,IntegerType,true))
%fs head /FileStore/tables/retailer/data/customer_address.dat
ca_address_sk|ca_address_id|ca_street_number|ca_street_name|ca_street_type|ca_suite_number|ca_city|ca_county|ca_state|ca_zip|ca_country|ca_gmt_offset|ca_location_type
1|AAAAAAAABAAAAAAA|18 |Jackson |Parkway |Suite 280 |Fairfield|Maricopa County|AZ|86192 |United States|-7.00|condo
2|AAAAAAAACAAAAAAA|362 |Washington 6th|RD |Suite 80 |Fairview|Taos County|NM|85709 |United States|-7.00|condo
/FileStore/tables/retailer/data/customer_address.dat
val custAddDF = spark.read.format("csv"). option("header","true").option("sep","|").option("inferSchema","true").load("/FileStore/tables/retailer/data/customer_address.dat")
custAddDF.printSchema()
root
|-- ca_address_sk: integer (nullable = true)
|-- ca_address_id: string (nullable = true)
|-- ca_street_number: double (nullable = true) -- wrong
|-- ca_street_name: string (nullable = true)
|-- ca_street_type: string (nullable = true)
|-- ca_suite_number: string (nullable = true)
|-- ca_city: string (nullable = true)
|-- ca_county: string (nullable = true)
|-- ca_state: string (nullable = true)
|-- ca_zip: double (nullable = true)
|-- ca_country: string (nullable = true)
|-- ca_gmt_offset: double (nullable = true) -- change it into decimal (5,2)
|-- ca_location_type: string (nullable = true)
val custAddSchemaDDL = "ca_address_sk long, ca_address_id string, ca_street_number string, ca_street_name string, " +
"ca_street_type string, ca_suite_number string, ca_city string, " +
"ca_county string, ca_state string, ca_zip string, ca_country string, ca_gmt_offset decimal(5,2), ca_location_type string"
val custAddDF = spark.read.format("csv").schema(custAddSchemaDDL).option("header","true").option("sep","|").option("inferSchema","true").load("/FileStore/tables/retailer/data/customer_address.dat")
custAddDF.printSchema()
root
|-- ca_address_sk: long (nullable = true)
|-- ca_address_id: string (nullable = true)
|-- ca_street_number: string (nullable = true)
|-- ca_street_name: string (nullable = true)
|-- ca_street_type: string (nullable = true)
|-- ca_suite_number: string (nullable = true)
|-- ca_city: string (nullable = true)
|-- ca_county: string (nullable = true)
|-- ca_state: string (nullable = true)
|-- ca_zip: string (nullable = true)
|-- ca_country: string (nullable = true)
|-- ca_gmt_offset: decimal(5,2) (nullable = true)
|-- ca_location_type: string (nullable = true)
custAddDF.schema.toDDL
res38: String = `ca_address_sk` BIGINT,`ca_address_id` STRING,`ca_street_number` STRING,`ca_street_name` STRING,`ca_street_type` STRING,`ca_suite_number` STRING,`ca_city` STRING,`ca_county` STRING,`ca_state` STRING,`ca_zip` STRING,`ca_country` STRING,`ca_gmt_offset` DECIMAL(5,2),`ca_location_type` STRING
custAddDF.schema
res39: org.apache.spark.sql.types.StructType = StructType(StructField(ca_address_sk,LongType,true), StructField(ca_address_id,StringType,true), StructField(ca_street_number,StringType,true), StructField(ca_street_name,StringType,true), StructField(ca_street_type,StringType,true), StructField(ca_suite_number,StringType,true), StructField(ca_city,StringType,true), StructField(ca_county,StringType,true), StructField(ca_state,StringType,true), StructField(ca_zip,StringType,true), StructField(ca_country,StringType,true), StructField(ca_gmt_offset,DecimalType(5,2),true), StructField(ca_location_type,StringType,true))
val incomeBandDF = spark.read.format("csv").schema("ib_lower_band_sk long, ib_lower_bound int, ib_upper_bound int").option("sep","|").load("/FileStore/tables/retailer/data/income_band.dat")
incomeBandDF.printSchema()
root
|-- ib_lower_band_sk: long (nullable = true)
|-- ib_lower_bound: integer (nullable = true)
|-- ib_upper_bound: integer (nullable = true)
incomeBandDF.show(5)
+----------------+--------------+--------------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|
+----------------+--------------+--------------+
| 1| 0| 10000|
| 2| 10001| 20000|
| 3| 20001| 30000|
| 4| 30001| 40000|
| 5| 40001| 50000|
+----------------+--------------+--------------+
// create a new column
val incomeBandwithGroups = incomeBandDF.withColumn("isFirstIncomeGroup",incomeBandDF.col("ib_upper_bound") <= 60000)
incomeBandwithGroups.show(5)
+----------------+--------------+--------------+------------------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|isFirstIncomeGroup|
+----------------+--------------+--------------+------------------+
| 1| 0| 10000| true|
| 2| 10001| 20000| true|
| 3| 20001| 30000| true|
| 4| 30001| 40000| true|
| 5| 40001| 50000| true|
+----------------+--------------+--------------+------------------+
// create static column with constant value we need to use lit
import org.apache.spark.sql.functions.{col, lit, when}
val incomeBandwithGroups = incomeBandDF.
withColumn("isFirstIncomeGroup",incomeBandDF.col("ib_upper_bound") <= 60000).
withColumn("isSecodIncomeGroup",incomeBandDF("ib_upper_bound") > 60000 and incomeBandDF("ib_upper_bound") <= 12000).
withColumn("isThirdIncomeGroup",incomeBandDF("ib_upper_bound") > 12000 and incomeBandDF("ib_upper_bound") < 200000).
withColumn("demo",lit("demoValue"))
incomeBandwithGroups.show(5)
+----------------+--------------+--------------+------------------+------------------+------------------+---------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|isFirstIncomeGroup|isSecodIncomeGroup|isThirdIncomeGroup| demo|
+----------------+--------------+--------------+------------------+------------------+------------------+---------+
| 1| 0| 10000| true| false| false|demoValue|
| 2| 10001| 20000| true| false| true|demoValue|
| 3| 20001| 30000| true| false| true|demoValue|
| 4| 30001| 40000| true| false| true|demoValue|
| 5| 40001| 50000| true| false| true|demoValue|
+----------------+--------------+--------------+------------------+------------------+------------------+---------+
// Rename an existing columns
val incomeClasses = incomeBandwithGroups.
withColumnRenamed("isThirdIncomeGroup","isHighIncomeClass").
withColumnRenamed("isFirstIncomeGroup","isStandardIncomeClass").
withColumnRenamed("isSecodIncomeGroup","isMediumIncomeClass")
incomeClasses.show(5)
+----------------+--------------+--------------+---------------------+-------------------+-----------------+---------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|isStandardIncomeClass|isMediumIncomeClass|isHighIncomeClass| demo|
+----------------+--------------+--------------+---------------------+-------------------+-----------------+---------+
| 1| 0| 10000| true| false| false|demoValue|
| 2| 10001| 20000| true| false| true|demoValue|
| 3| 20001| 30000| true| false| true|demoValue|
| 4| 30001| 40000| true| false| true|demoValue|
| 5| 40001| 50000| true| false| true|demoValue|
+----------------+--------------+--------------+---------------------+-------------------+-----------------+---------+
only showing top 5 rows
// Remove columns
val onlyMediumDF = incomeClasses.drop("demo","isStandardIncomeClass", "isHighIncomeClass")
onlyMediumDF.show(5)
+----------------+--------------+--------------+-------------------+
|ib_lower_band_sk|ib_lower_bound|ib_upper_bound|isMediumIncomeClass|
+----------------+--------------+--------------+-------------------+
| 1| 0| 10000| false|
| 2| 10001| 20000| false|
| 3| 20001| 30000| false|
| 4| 30001| 40000| false|
| 5| 40001| 50000| false|
+----------------+--------------+--------------+-------------------+
val custDF = spark.read.format("csv").option("inferSchema",true).option("header",true).option("sep","|").load("/FileStore/tables/retailer/data/customer.dat")
custDF.count()
res75: Long = 100000 -- total records
// validate date range
custDF.filter($"c_birth_day" > 0 and $"c_birth_day" <= 31 ).count()
res77: Long = 96539 -- valid records
validRecordsDF.select("c_customer_id", "c_first_name","c_birth_day", "c_birth_month", "c_birth_year").show(5)
+----------------+--------------------+-----------+-------------+------------+
| c_customer_id| c_first_name|c_birth_day|c_birth_month|c_birth_year|
+----------------+--------------------+-----------+-------------+------------+
|AAAAAAAABAAAAAAA|Javier | 9| 12| 1936|
|AAAAAAAACAAAAAAA|Amy | 9| 4| 1966|
|AAAAAAAADAAAAAAA|Latisha | 18| 9| 1979|
|AAAAAAAAEAAAAAAA|Michael | 7| 6| 1983|
|AAAAAAAAFAAAAAAA|Robert | 8| 5| 1956|
+----------------+--------------------+-----------+-------------+------------+
// validate dates and months
val validRecordsDF = custDF.filter($"c_birth_day" > 0 and $"c_birth_day" <= 31 and $"c_birth_month" > 0 and $"c_birth_month" <= 12 and 'c_birth_year' > 0)
validRecordsDF.count()
94791
validRecordsDF.count()
94791
Keys:
customer.c_current_addr_sk
customer_address.ca_address_sk
val custDF = spark.read.format("csv").option("inferSchema",true).
option("header",true).option("sep","|").load("/FileStore/tables/retailer/data/customer.dat")
val custAddDF = spark.read.format("csv").option("inferSchema",true).option("header",true).option("sep","|").
load("/FileStore/tables/retailer/data/customer_address.dat")
val joinExpression = custDF.col("c_current_addr_sk") === custAddDF.col("ca_address_sk")
val custwithAddDF = custDF.join(custAddDF,joinExpression,"inner").select("c_customer_id","ca_address_sk","c_first_name","c_last_name")
custwithAddDF.show(5)
+----------------+-------------+--------------------+--------------------+
| c_customer_id|ca_address_sk| c_first_name| c_last_name|
+----------------+-------------+--------------------+--------------------+
|AAAAAAAABAAAAAAA| 32946|Javier |Lewis ...|
|AAAAAAAACAAAAAAA| 31655|Amy |Moses ...|
|AAAAAAAADAAAAAAA| 48572|Latisha |Hamilton ...|
|AAAAAAAAEAAAAAAA| 39558|Michael |White ...|
|AAAAAAAAFAAAAAAA| 36368|Robert |Moran ...|
+----------------+-------------+--------------------+--------------------+
custAddDF.printSchema
root
|-- ca_address_sk: integer (nullable = true)
|-- ca_address_id: string (nullable = true)
|-- ca_street_number: double (nullable = true)
|-- ca_street_name: string (nullable = true)
|-- ca_street_type: string (nullable = true)
|-- ca_suite_number: string (nullable = true)
|-- ca_city: string (nullable = true)
|-- ca_county: string (nullable = true)
|-- ca_state: string (nullable = true)
|-- ca_zip: double (nullable = true)
|-- ca_country: string (nullable = true)
|-- ca_gmt_offset: double (nullable = true)
|-- ca_location_type: string (nullable = true
custDF.join(custAddDF,joinExpression,"inner").select("c_customer_id","c_first_name","c_last_name", "ca_street_name","ca_city").show(5)
+----------------+--------------------+--------------------+--------------+-----------+
| c_customer_id| c_first_name| c_last_name|ca_street_name| ca_city|
+----------------+--------------------+--------------------+--------------+-----------+
|AAAAAAAABAAAAAAA|Javier |Lewis ...| Chestnut Main|Spring Hill|
|AAAAAAAACAAAAAAA|Amy |Moses ...| 8th Oak| Antioch|
|AAAAAAAADAAAAAAA|Latisha |Hamilton ...|Park Jefferson|Cedar Grove|
|AAAAAAAAEAAAAAAA|Michael |White ...|Cedar Sycamore| Lakewood|
|AAAAAAAAFAAAAAAA|Robert |Moran ...| Miller Main| Waterloo|
+----------------+--------------------+--------------------+--------------+-----------+
custDF.count()
res89: Long = 100000
import org.apache.spark.sql.functions.count
custDF.select(count("*")).show()
100000
custDF.select(count("c_first_name")).show()
96508 -- null values ignored
custDF.filter($"c_first_name".isNotNull).count()
96508 -- null values ignored
// Display the count of all c_first_name is NULL
custDF.filter($"c_first_name".isNull).count()
3492
//countDistinct
import org.apache.spark.sql.functions.countDistinct
custDF.select(countDistinct("c_first_name")).show()
4131
custDF.select("c_first_name").distinct().count()
res98: Long = 4132