Tuesday, 11 August 2020

Read Hive table in Spark using Spark SQL | Read, Write JSON

 Read Hive table in Spark using Spark SQL | Read, Write JSON 

// create a Dataframe for ohm.customers in Hive
scala> val dfCust = spark.sql("select * from ohm.customers")
dfCust: org.apache.spark.sql.DataFrame = [customer_id: int, customer_fname: string ... 7 more fields]

scala> dfCust.printSchema()
root
 |-- customer_id: integer (nullable = true)
 |-- customer_fname: string (nullable = true)
 |-- customer_lname: string (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- customer_password: string (nullable = true)
 |-- customer_street: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_zipcode: string (nullable = true)

// Rename the columns
scala> val df = dfCust.withColumnRenamed("customer_id","id").withColumnRenamed("customer_fname","fname").withColumnRenamed("customer_lname","lname").withColumnRenamed("customer_email","email").withColumnRenamed("customer_password","password").withColumnRenamed("customer_street","street").withColumnRenamed("customer_city","city").withColumnRenamed("customer_state","state").withColumnRenamed("customer_zipcode","zipcode")
df: org.apache.spark.sql.DataFrame = [id: int, fname: string ... 7 more fields]

scala> df.printSchema()
root
 |-- id: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- email: string (nullable = true)
 |-- password: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: string (nullable = true)

scala> df.show(10)
+---+-------+---------+---------+---------+--------------------+-----------+-----+-------+
| id|  fname|    lname|    email| password|              street|       city|state|zipcode|
+---+-------+---------+---------+---------+--------------------+-----------+-----+-------+
|  1|Richard|Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|Brownsville|   TX|  78521|
|  2|   Mary|  Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...|  Littleton|   CO|  80126|
|  3|    Ann|    Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|     Caguas|   PR|  00725|
|  4|   Mary|    Jones|XXXXXXXXX|XXXXXXXXX|  8324 Little Common| San Marcos|   CA|  92069|
|  5| Robert|   Hudson|XXXXXXXXX|XXXXXXXXX|10 Crystal River ...|     Caguas|   PR|  00725|
|  6|   Mary|    Smith|XXXXXXXXX|XXXXXXXXX|3151 Sleepy Quail...|    Passaic|   NJ|  07055|
|  7|Melissa|   Wilcox|XXXXXXXXX|XXXXXXXXX|9453 High Concession|     Caguas|   PR|  00725|
|  8|  Megan|    Smith|XXXXXXXXX|XXXXXXXXX|3047 Foggy Forest...|   Lawrence|   MA|  01841|
|  9|   Mary|    Perez|XXXXXXXXX|XXXXXXXXX| 3616 Quaking Street|     Caguas|   PR|  00725|
| 10|Melissa|    Smith|XXXXXXXXX|XXXXXXXXX|8598 Harvest Beac...|   Stafford|   VA|  22554|
+---+-------+---------+---------+---------+--------------------+-----------+-----+-------+
only showing top 10 rows

// create a json file using the data taken from dataframe
scala> df.write.format("json").save("/home/cloudera/sparkoutput/cust_json")

// display 10 lines
$ head /home/cloudera/sparkoutput/cust_json/part-00000-d87c7ca5-261d-4b90-9044-3ed4513f29c8-c000.json 

{"id":1,"fname":"Richard","lname":"Hernandez","email":"XXXXXXXXX","password":"XXXXXXXXX","street":"6303 Heather Plaza","city":"Brownsville","state":"TX","zipcode":"78521"}
{"id":2,"fname":"Mary","lname":"Barrett","email":"XXXXXXXXX","password":"XXXXXXXXX","street":"9526 Noble Embers Ridge","city":"Littleton","state":"CO","zipcode":"80126"}
{"id":3,"fname":"Ann","lname":"Smith","email":"XXXXXXXXX","password":"XXXXXXXXX","street":"3422 Blue Pioneer Bend","city":"Caguas","state":"PR","zipcode":"00725"}
{"id":4,"fname":"Mary","lname":"Jones","email":"XXXXXXXXX","password":"XXXXXXXXX","street":"8324 Little Common","city":"San Marcos","state":"CA","zipcode":"92069"}
{"id":5,"fname":"Robert","lname":"Hudson","email":"XXXXXXXXX","password":"XXXXXXXXX","street":"10 Crystal River Mall ","city":"Caguas","state":"PR","zipcode":"00725"}
{"id":6,"fname":"Mary","lname":"Smith","email":"XXXXXXXXX","password":"XXXXXXXXX","street":"3151 Sleepy Quail Promenade","city":"Passaic","state":"NJ","zipcode":"07055"}
{"id":7,"fname":"Melissa","lname":"Wilcox","email":"XXXXXXXXX","password":"XXXXXXXXX","street":"9453 High Concession","city":"Caguas","state":"PR","zipcode":"00725"}
{"id":8,"fname":"Megan","lname":"Smith","email":"XXXXXXXXX","password":"XXXXXXXXX","street":"3047 Foggy Forest Plaza","city":"Lawrence","state":"MA","zipcode":"01841"}
{"id":9,"fname":"Mary","lname":"Perez","email":"XXXXXXXXX","password":"XXXXXXXXX","street":"3616 Quaking Street","city":"Caguas","state":"PR","zipcode":"00725"}
{"id":10,"fname":"Melissa","lname":"Smith","email":"XXXXXXXXX","password":"XXXXXXXXX","street":"8598 Harvest Beacon Plaza","city":"Stafford","state":"VA","zipcode":"22554"}


// read json file and create a dataframe
scala> val df = spark.read.format("json").load("/home/cloudera/sparkoutput/cust_json/")
df: org.apache.spark.sql.DataFrame = [city: string, email: string ... 7 more fields]

scala> df.printSchema()
root
 |-- city: string (nullable = true)
 |-- email: string (nullable = true)
 |-- fname: string (nullable = true)
 |-- id: long (nullable = true)
 |-- lname: string (nullable = true)
 |-- password: string (nullable = true)
 |-- state: string (nullable = true)
 |-- street: string (nullable = true)
 |-- zipcode: string (nullable = true)


scala> df.show(5)
+-----------+---------+-------+---+---------+---------+-----+--------------------+-------+
|       city|    email|  fname| id|    lname| password|state|              street|zipcode|
+-----------+---------+-------+---+---------+---------+-----+--------------------+-------+
|Brownsville|XXXXXXXXX|Richard|  1|Hernandez|XXXXXXXXX|   TX|  6303 Heather Plaza|  78521|
|  Littleton|XXXXXXXXX|   Mary|  2|  Barrett|XXXXXXXXX|   CO|9526 Noble Embers...|  80126|
|     Caguas|XXXXXXXXX|    Ann|  3|    Smith|XXXXXXXXX|   PR|3422 Blue Pioneer...|  00725|
| San Marcos|XXXXXXXXX|   Mary|  4|    Jones|XXXXXXXXX|   CA|  8324 Little Common|  92069|
|     Caguas|XXXXXXXXX| Robert|  5|   Hudson|XXXXXXXXX|   PR|10 Crystal River ...|  00725|
+-----------+---------+-------+---+---------+---------+-----+--------------------+-------+
only showing top 5 rows



// select specific columns only
scala> df.select("id","fname","lname","city").show(5)
+---+-------+---------+-----------+
| id|  fname|    lname|       city|
+---+-------+---------+-----------+
|  1|Richard|Hernandez|Brownsville|
|  2|   Mary|  Barrett|  Littleton|
|  3|    Ann|    Smith|     Caguas|
|  4|   Mary|    Jones| San Marcos|
|  5| Robert|   Hudson|     Caguas|
+---+-------+---------+-----------+
only showing top 5 rows

// write the data frame content into json file
scala> df.select("id","fname","lname","city").write.format("json").save("/home/cloudera/sparkoutput/custjson_4cols")


// display top 10 lines 
$ head /home/cloudera/sparkoutput/custjson_4cols/part-00000-9191ead8-927e-4c45-8807-6e6ad40abb33-c000.json 
{"id":1,"fname":"Richard","lname":"Hernandez","city":"Brownsville"}
{"id":2,"fname":"Mary","lname":"Barrett","city":"Littleton"}
{"id":3,"fname":"Ann","lname":"Smith","city":"Caguas"}
{"id":4,"fname":"Mary","lname":"Jones","city":"San Marcos"}
{"id":5,"fname":"Robert","lname":"Hudson","city":"Caguas"}
{"id":6,"fname":"Mary","lname":"Smith","city":"Passaic"}
{"id":7,"fname":"Melissa","lname":"Wilcox","city":"Caguas"}
{"id":8,"fname":"Megan","lname":"Smith","city":"Lawrence"}
{"id":9,"fname":"Mary","lname":"Perez","city":"Caguas"}
{"id":10,"fname":"Melissa","lname":"Smith","city":"Stafford"}


No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...