Saturday, 26 January 2019

Handling JSON in Dataframe using Spark with Scala

Input file:
olympix.json
-------------
{"athelete":"Inge de Bruijn","age":30,"country":"Netherlands","year":"2004","closing":"08-29-04","sport":"Swimming","gold":1,"silver":1,"bronze":2,"total":4}
{"athelete":"Ryan Lochte","age":24,"country":"United States","year":"2008","closing":"08-24-08","sport":"Swimming","gold":2,"silver":0,"bronze":2,"total":4}
{"athelete":"Libby Lenton-Trickett","age":23,"country":"Australia","year":"2008","closing":"08-24-08","sport":"Swimming","gold":2,"silver":1,"bronze":1,"total":4}
{"athelete":"Kirsty Coventry","age":24,"country":"Zimbabwe","year":"2008","closing":"08-24-08","sport":"Swimming","gold":1,"silver":3,"bronze":0,"total":4}
{"athelete":"Sun Yang","age":20,"country":"China","year":"2012","closing":"08-12-12","sport":"Swimming","gold":2,"silver":1,"bronze":1,"total":4}
{"athelete":"Marit Bjørgen","age":29,"country":"Norway","year":"2010","closing":"02-28-10","sport":"Cross Country Skiing","gold":3,"silver":1,"bronze":1,"total":5}
{"athelete":"Nastia Liukin","age":18,"country":"United States","year":"2008","closing":"08-24-08","sport":"Gymnastics","gold":1,"silver":3,"bronze":1,"total":5}
{"athelete":"Cindy Klassen","age":26,"country":"Canada","year":"2006","closing":"02-26-06","sport":"Speed Skating","gold":1,"silver":2,"bronze":2,"total":5}
{"athelete":"Dara Torres","age":33,"country":"United States","year":"2000","closing":"10-01-00","sport":"Swimming","gold":2,"silver":0,"bronze":3,"total":5}
{"athelete":"Ian Thorpe","age":17,"country":"Australia","year":"2000","closing":"10-01-00","sport":"Swimming","gold":3,"silver":2,"bronze":0,"total":5}
{"athelete":"Natalie Coughlin","age":21,"country":"United States","year":"2004","closing":"08-29-04","sport":"Swimming","gold":2,"silver":2,"bronze":1,"total":5}
{"athelete":"Allison Schmitt","age":22,"country":"United States","year":"2012","closing":"08-12-12","sport":"Swimming","gold":3,"silver":1,"bronze":1,"total":5}
{"athelete":"Ryan Lochte","age":27,"country":"United States","year":"2012","closing":"08-12-12","sport":"Swimming","gold":2,"silver":2,"bronze":1,"total":5}
{"athelete":"Missy Franklin","age":17,"country":"United States","year":"2012","closing":"08-12-12","sport":"Swimming","gold":4,"silver":0,"bronze":1,"total":5}
{"athelete":"Alicia Coutts","age":24,"country":"Australia","year":"2012","closing":"08-12-12","sport":"Swimming","gold":1,"silver":3,"bronze":1,"total":5}
{"athelete":"Aleksey Nemov","age":24,"country":"Russia","year":"2000","closing":"10-01-00","sport":"Gymnastics","gold":2,"silver":1,"bronze":3,"total":6}
{"athelete":"Natalie Coughlin","age":25,"country":"United States","year":"2008","closing":"08-24-08","sport":"Swimming","gold":1,"silver":2,"bronze":3,"total":6}
{"athelete":"Michael Phelps","age":27,"country":"United States","year":"2012","closing":"08-12-12","sport":"Swimming","gold":4,"silver":2,"bronze":0,"total":6}
{"athelete":"Michael Phelps","age":19,"country":"United States","year":"2004","closing":"08-29-04","sport":"Swimming","gold":6,"silver":0,"bronze":2,"total":8}
{"athelete":"Michael Phelps","age":23,"country":"United States","year":"2008","closing":"08-24-08","sport":"Swimming","gold":8,"silver":0,"bronze":0,"total":8}

//load
scala> val dfJson = sqlContext.read.format("json").load("file:///home/hadoop/Desktop/olympic.json")
dfJson: org.apache.spark.sql.DataFrame = [age: bigint, athelete: string ... 8 more fields]

scala> dfJson.show()
+---+--------------------+------+--------+-------------+----+------+--------------------+-----+----+
|age|            athelete|bronze| closing|      country|gold|silver|               sport|total|year|
+---+--------------------+------+--------+-------------+----+------+--------------------+-----+----+
| 30|      Inge de Bruijn|     2|08-29-04|  Netherlands|   1|     1|            Swimming|    4|2004|
| 24|         Ryan Lochte|     2|08-24-08|United States|   2|     0|            Swimming|    4|2008|
| 23|Libby Lenton-Tric...|     1|08-24-08|    Australia|   2|     1|            Swimming|    4|2008|
| 24|     Kirsty Coventry|     0|08-24-08|     Zimbabwe|   1|     3|            Swimming|    4|2008|
| 20|            Sun Yang|     1|08-12-12|        China|   2|     1|            Swimming|    4|2012|
| 29|       Marit Bjørgen|     1|02-28-10|       Norway|   3|     1|Cross Country Skiing|    5|2010|
| 18|       Nastia Liukin|     1|08-24-08|United States|   1|     3|          Gymnastics|    5|2008|
| 26|       Cindy Klassen|     2|02-26-06|       Canada|   1|     2|       Speed Skating|    5|2006|
| 33|         Dara Torres|     3|10-01-00|United States|   2|     0|            Swimming|    5|2000|
| 17|          Ian Thorpe|     0|10-01-00|    Australia|   3|     2|            Swimming|    5|2000|
| 21|    Natalie Coughlin|     1|08-29-04|United States|   2|     2|            Swimming|    5|2004|
| 22|     Allison Schmitt|     1|08-12-12|United States|   3|     1|            Swimming|    5|2012|
| 27|         Ryan Lochte|     1|08-12-12|United States|   2|     2|            Swimming|    5|2012|
| 17|      Missy Franklin|     1|08-12-12|United States|   4|     0|            Swimming|    5|2012|
| 24|       Alicia Coutts|     1|08-12-12|    Australia|   1|     3|            Swimming|    5|2012|
| 24|       Aleksey Nemov|     3|10-01-00|       Russia|   2|     1|          Gymnastics|    6|2000|
| 25|    Natalie Coughlin|     3|08-24-08|United States|   1|     2|            Swimming|    6|2008|
| 27|      Michael Phelps|     0|08-12-12|United States|   4|     2|            Swimming|    6|2012|
| 19|      Michael Phelps|     2|08-29-04|United States|   6|     0|            Swimming|    8|2004|
| 23|      Michael Phelps|     0|08-24-08|United States|   8|     0|            Swimming|    8|2008|
+---+--------------------+------+--------+-------------+----+------+--------------------+-----+----+

//Transformation
scala> val dfJsonFiltered = dfJson.where("year=2012")
dfJsonFiltered: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [age: bigint, athelete: string ... 8 more fields]

scala> dfJsonFiltered.show()
+---+---------------+------+--------+-------------+----+------+--------+-----+----+
|age|       athelete|bronze| closing|      country|gold|silver|   sport|total|year|
+---+---------------+------+--------+-------------+----+------+--------+-----+----+
| 20|       Sun Yang|     1|08-12-12|        China|   2|     1|Swimming|    4|2012|
| 22|Allison Schmitt|     1|08-12-12|United States|   3|     1|Swimming|    5|2012|
| 27|    Ryan Lochte|     1|08-12-12|United States|   2|     2|Swimming|    5|2012|
| 17| Missy Franklin|     1|08-12-12|United States|   4|     0|Swimming|    5|2012|
| 24|  Alicia Coutts|     1|08-12-12|    Australia|   1|     3|Swimming|    5|2012|
| 27| Michael Phelps|     0|08-12-12|United States|   4|     2|Swimming|    6|2012|
+---+---------------+------+--------+-------------+----+------+--------+-----+----+

//Transformation multiple conditions
scala> val dfJsonFiltered = dfJson.where("country='United States' and year=2012")
dfJsonFiltered: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [age: bigint, athelete: string ... 8 more fields]

scala> dfJsonFiltered.show()
+---+---------------+------+--------+-------------+----+------+--------+-----+----+
|age|       athelete|bronze| closing|      country|gold|silver|   sport|total|year|
+---+---------------+------+--------+-------------+----+------+--------+-----+----+
| 22|Allison Schmitt|     1|08-12-12|United States|   3|     1|Swimming|    5|2012|
| 27|    Ryan Lochte|     1|08-12-12|United States|   2|     2|Swimming|    5|2012|
| 17| Missy Franklin|     1|08-12-12|United States|   4|     0|Swimming|    5|2012|
| 27| Michael Phelps|     0|08-12-12|United States|   4|     2|Swimming|    6|2012|
+---+---------------+------+--------+-------------+----+------+--------+-----+----+

//Write into file system
scala> dfJsonFiltered.write.format("json").save("file:///home/hadoop/Desktop/jsonOut")


// see the output
cat part-00000-a3e96343-772a-46d8-8cd7-97015817c212-c000.json
{"age":22,"athelete":"Allison Schmitt","bronze":1,"closing":"08-12-12","country":"United States","gold":3,"silver":1,"sport":"Swimming","total":5,"year":"2012"}
{"age":27,"athelete":"Ryan Lochte","bronze":1,"closing":"08-12-12","country":"United States","gold":2,"silver":2,"sport":"Swimming","total":5,"year":"2012"}
{"age":17,"athelete":"Missy Franklin","bronze":1,"closing":"08-12-12","country":"United States","gold":4,"silver":0,"sport":"Swimming","total":5,"year":"2012"}
{"age":27,"athelete":"Michael Phelps","bronze":0,"closing":"08-12-12","country":"United States","gold":4,"silver":2,"sport":"Swimming","total":6,"year":"2012"}

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...