scala> val df = spark.sql("select * from ohm.orders")
df: org.apache.spark.sql.DataFrame = [order_id: int, order_date: bigint ... 2 more fields]
scala> df.printSchema()
root
|-- order_id: integer (nullable = true)
|-- order_date: long (nullable = true)
|-- order_customer_id: integer (nullable = true)
|-- order_status: string (nullable = true)
scala> val dfOrders = df.withColumnRenamed("order_id","id").withColumnRenamed("order_date","dt").withColumnRenamed("order_customer_id","cust_id").withColumnRenamed("order_status","status")
dfOrders: org.apache.spark.sql.DataFrame = [id: int, dt: bigint ... 2 more fields]
scala> dfOrders.printSchema()
root
|-- id: integer (nullable = true)
|-- dt: long (nullable = true)
|-- cust_id: integer (nullable = true)
|-- status: string (nullable = true)
scala> df.show(5)
+--------+-------------+-----------------+---------------+
|order_id| order_date|order_customer_id| order_status|
+--------+-------------+-----------------+---------------+
| 1|1374735600000| 11599| CLOSED|
| 2|1374735600000| 256|PENDING_PAYMENT|
| 3|1374735600000| 12111| COMPLETE|
| 4|1374735600000| 8827| CLOSED|
| 5|1374735600000| 11318| COMPLETE|
+--------+-------------+-----------------+---------------+
// Dataframe into CSV
scala> df.write.format("csv").save("/home/cloudera/sparkoutput/orders_csv")
// display the csv file content in terminal - top 10 lines
$ cat /home/cloudera/sparkoutput/orders_csv/part-00000-1f2f2adb-849e-49c0-bdef-8706cf1bcfb9-c000.csv | head
1,1374735600000,11599,CLOSED
2,1374735600000,256,PENDING_PAYMENT
3,1374735600000,12111,COMPLETE
4,1374735600000,8827,CLOSED
5,1374735600000,11318,COMPLETE
6,1374735600000,7130,COMPLETE
7,1374735600000,4530,COMPLETE
8,1374735600000,2911,PROCESSING
9,1374735600000,5657,PENDING_PAYMENT
10,1374735600000,5648,PENDING_PAYMENT
// Column Renaming
scala> val df = dfOrdersCSV.withColumnRenamed("_c0","order_id").withColumnRenamed("_c1","dt").withColumnRenamed("_c2","cust_id").withColumnRenamed("_c3","status")
df: org.apache.spark.sql.DataFrame = [order_id: int, dt: bigint ... 2 more fields]
scala> df.printSchema()
root
|-- order_id: integer (nullable = true)
|-- dt: long (nullable = true)
|-- cust_id: integer (nullable = true)
|-- status: string (nullable = true)
scala> df.show(5)
+--------+-------------+-------+---------------+
|order_id| dt|cust_id| status|
+--------+-------------+-------+---------------+
| 1|1374735600000| 11599| CLOSED|
| 2|1374735600000| 256|PENDING_PAYMENT|
| 3|1374735600000| 12111| COMPLETE|
| 4|1374735600000| 8827| CLOSED|
| 5|1374735600000| 11318| COMPLETE|
+--------+-------------+-------+---------------+
only showing top 5 rows
// Read CSV file using Spark -- schema not inferred here - so, we will be getting string for all the fields
scala> val dfOrdersCSV = spark.read.format("csv").load("/home/cloudera/sparkoutput/orders_csv/")
dfOrdersCSV: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 2 more fields]
scala> dfOrdersCSV.printSchema()
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
// Here we do inferSchema - so corresponding data types applied - but we have not got headers
scala> val dfOrdersCSV = spark.read.format("csv").option("inferSchema","True").load("/home/cloudera/sparkoutput/orders_csv/")
dfOrdersCSV: org.apache.spark.sql.DataFrame = [_c0: int, _c1: bigint ... 2 more fields]
scala> dfOrdersCSV.printSchema()
root
|-- _c0: integer (nullable = true)
|-- _c1: long (nullable = true)
|-- _c2: integer (nullable = true)
|-- _c3: string (nullable = true)
// We applied headers option then also we didnt get headers - because no headers present in that CSV
scala> val dfOrdersCSV = spark.read.format("csv").option("inferSchema","True").option("headers","true").load("/home/cloudera/sparkoutput/orders_csv/")
dfOrdersCSV: org.apache.spark.sql.DataFrame = [_c0: int, _c1: bigint ... 2 more fields]
scala> dfOrdersCSV.printSchema()
root
|-- _c0: integer (nullable = true)
|-- _c1: long (nullable = true)
|-- _c2: integer (nullable = true)
|-- _c3: string (nullable = true)
scala> val df = dfOrdersCSV.withColumnRenamed("_c0","order_id").withColumnRenamed("_c1","dt").withColumnRenamed("_c2","cust_id").withColumnRenamed("_c3","status")
df: org.apache.spark.sql.DataFrame = [order_id: int, dt: bigint ... 2 more fields]
scala> df.printSchema()
root
|-- order_id: integer (nullable = true)
|-- dt: long (nullable = true)
|-- cust_id: integer (nullable = true)
|-- status: string (nullable = true)
scala> df.show(5)
+--------+-------------+-------+---------------+
|order_id| dt|cust_id| status|
+--------+-------------+-------+---------------+
| 1|1374735600000| 11599| CLOSED|
| 2|1374735600000| 256|PENDING_PAYMENT|
| 3|1374735600000| 12111| COMPLETE|
| 4|1374735600000| 8827| CLOSED|
| 5|1374735600000| 11318| COMPLETE|
+--------+-------------+-------+---------------+
only showing top 5 rows
No comments:
Post a Comment