Tuesday, 11 August 2020

Read, Write CSV files using Spark

scala> val df = spark.sql("select * from ohm.orders")
df: org.apache.spark.sql.DataFrame = [order_id: int, order_date: bigint ... 2 more fields]

scala> df.printSchema()
root
 |-- order_id: integer (nullable = true)
 |-- order_date: long (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)
 

scala> val dfOrders = df.withColumnRenamed("order_id","id").withColumnRenamed("order_date","dt").withColumnRenamed("order_customer_id","cust_id").withColumnRenamed("order_status","status")
dfOrders: org.apache.spark.sql.DataFrame = [id: int, dt: bigint ... 2 more fields]

scala> dfOrders.printSchema()
root
 |-- id: integer (nullable = true)
 |-- dt: long (nullable = true)
 |-- cust_id: integer (nullable = true)
 |-- status: string (nullable = true)

scala> df.show(5)
+--------+-------------+-----------------+---------------+
|order_id|   order_date|order_customer_id|   order_status|
+--------+-------------+-----------------+---------------+
|       1|1374735600000|            11599|         CLOSED|
|       2|1374735600000|              256|PENDING_PAYMENT|
|       3|1374735600000|            12111|       COMPLETE|
|       4|1374735600000|             8827|         CLOSED|
|       5|1374735600000|            11318|       COMPLETE|
+--------+-------------+-----------------+---------------+

// Dataframe into CSV
scala> df.write.format("csv").save("/home/cloudera/sparkoutput/orders_csv")

// display the csv file content in terminal - top 10 lines
$ cat /home/cloudera/sparkoutput/orders_csv/part-00000-1f2f2adb-849e-49c0-bdef-8706cf1bcfb9-c000.csv | head
1,1374735600000,11599,CLOSED
2,1374735600000,256,PENDING_PAYMENT
3,1374735600000,12111,COMPLETE
4,1374735600000,8827,CLOSED
5,1374735600000,11318,COMPLETE
6,1374735600000,7130,COMPLETE
7,1374735600000,4530,COMPLETE
8,1374735600000,2911,PROCESSING
9,1374735600000,5657,PENDING_PAYMENT
10,1374735600000,5648,PENDING_PAYMENT


// Column Renaming
scala> val df = dfOrdersCSV.withColumnRenamed("_c0","order_id").withColumnRenamed("_c1","dt").withColumnRenamed("_c2","cust_id").withColumnRenamed("_c3","status")
df: org.apache.spark.sql.DataFrame = [order_id: int, dt: bigint ... 2 more fields]

scala> df.printSchema()
root
 |-- order_id: integer (nullable = true)
 |-- dt: long (nullable = true)
 |-- cust_id: integer (nullable = true)
 |-- status: string (nullable = true)


scala> df.show(5)
+--------+-------------+-------+---------------+
|order_id|           dt|cust_id|         status|
+--------+-------------+-------+---------------+
|       1|1374735600000|  11599|         CLOSED|
|       2|1374735600000|    256|PENDING_PAYMENT|
|       3|1374735600000|  12111|       COMPLETE|
|       4|1374735600000|   8827|         CLOSED|
|       5|1374735600000|  11318|       COMPLETE|
+--------+-------------+-------+---------------+
only showing top 5 rows


// Read CSV file using Spark -- schema not inferred here - so, we will be getting  string for all the fields
scala> val dfOrdersCSV = spark.read.format("csv").load("/home/cloudera/sparkoutput/orders_csv/")
dfOrdersCSV: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 2 more fields]

scala> dfOrdersCSV.printSchema()
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)


// Here we do inferSchema - so corresponding data types applied - but we have not got headers
scala> val dfOrdersCSV = spark.read.format("csv").option("inferSchema","True").load("/home/cloudera/sparkoutput/orders_csv/")
dfOrdersCSV: org.apache.spark.sql.DataFrame = [_c0: int, _c1: bigint ... 2 more fields]

scala> dfOrdersCSV.printSchema()
root
 |-- _c0: integer (nullable = true)
 |-- _c1: long (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)


// We applied headers option then also we didnt get headers - because no headers present in that CSV
scala> val dfOrdersCSV = spark.read.format("csv").option("inferSchema","True").option("headers","true").load("/home/cloudera/sparkoutput/orders_csv/")
dfOrdersCSV: org.apache.spark.sql.DataFrame = [_c0: int, _c1: bigint ... 2 more fields]

scala> dfOrdersCSV.printSchema()
root
 |-- _c0: integer (nullable = true)
 |-- _c1: long (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)
 
 
 scala> val df = dfOrdersCSV.withColumnRenamed("_c0","order_id").withColumnRenamed("_c1","dt").withColumnRenamed("_c2","cust_id").withColumnRenamed("_c3","status")
df: org.apache.spark.sql.DataFrame = [order_id: int, dt: bigint ... 2 more fields]

scala> df.printSchema()
root
 |-- order_id: integer (nullable = true)
 |-- dt: long (nullable = true)
 |-- cust_id: integer (nullable = true)
 |-- status: string (nullable = true)


scala> df.show(5)
+--------+-------------+-------+---------------+
|order_id|           dt|cust_id|         status|
+--------+-------------+-------+---------------+
|       1|1374735600000|  11599|         CLOSED|
|       2|1374735600000|    256|PENDING_PAYMENT|
|       3|1374735600000|  12111|       COMPLETE|
|       4|1374735600000|   8827|         CLOSED|
|       5|1374735600000|  11318|       COMPLETE|
+--------+-------------+-------+---------------+
only showing top 5 rows

 
 



No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...