Thursday, 31 January 2019

Handling XML, JSON, CSV files in Spark with Scala

// Here we have 3 different files .csv, .json, .xml (But same content in different formats)
hadoop@hadoop:~/Desktop/vow$ hdfs dfs -ls /user/countries/
Found 3 items
-rw-r--r--   1 hadoop supergroup      13643 2019-01-31 12:48 /user/countries/country-capitals.csv
-rw-r--r--   1 hadoop supergroup      40247 2019-01-31 12:48 /user/countries/country-capitals.json
-rw-r--r--   1 hadoop supergroup      76081 2019-01-31 12:48 /user/countries/country-capitals.xml

country-capitals.xml:
---------------------
<?xml version="1.0" encoding="UTF-8"?>
<Countries>
 <Country>
      <CapitalLatitude>64.15</CapitalLatitude>
      <CapitalLongitude>-21.950000</CapitalLongitude>
      <CapitalName>Reykjavik</CapitalName>
      <ContinentName>Europe</ContinentName>
      <CountryCode>IS</CountryCode>
      <CountryName>Iceland</CountryName>
   </Country>
   <Country>
      <CapitalLatitude>28.6</CapitalLatitude>
      <CapitalLongitude>77.200000</CapitalLongitude>
      <CapitalName>New Delhi</CapitalName>
      <ContinentName>Asia</ContinentName>
      <CountryCode>IN</CountryCode>
      <CountryName>India</CountryName>
   </Country>
</Countries>

country-capitals.json:
----------------------
[
    {
      "CountryName":"Iceland",
      "CapitalName":"Reykjavik",
      "CapitalLatitude":"64.15",
      "CapitalLongitude":"-21.950000",
      "CountryCode":"IS",
      "ContinentName":"Europe"
   },
   {
      "CountryName":"India",
      "CapitalName":"New Delhi",
      "CapitalLatitude":"28.6",
      "CapitalLongitude":"77.200000",
      "CountryCode":"IN",
      "ContinentName":"Asia"
   }
 ]

country-capitals.csv:
----------------------
CountryName,CapitalName,CapitalLatitude,CapitalLongitude,CountryCode,ContinentName
Iceland,Reykjavik,64.15,-21.950000,IS,Europe
India,New Delhi,28.6,77.200000,IN,Asia




 Browse : https://mvnrepository.com
 search for : spark xml, spark csv, spark avro there and copy all scala version 2.11.12

 Here is the dependency packages for csv, xml, avro
<!-- https://mvnrepository.com/artifact/com.databricks/spark-csv -->
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.11</artifactId>
    <version>1.5.0</version>
</dependency>


<!-- https://mvnrepository.com/artifact/com.databricks/spark-xml -->
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-xml_2.11</artifactId>
    <version>0.5.0</version>
</dependency>


<!-- https://mvnrepository.com/artifact/com.databricks/spark-avro -->
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-avro_2.11</artifactId>
    <version>4.0.0</version>
</dependency>


make a string like ==> groupId:artifactId:version,groupId:artifactId:version,groupId:artifactId:version,

Run the spark-shell with --packages option :

spark-shell --packages com.databricks:spark-csv_2.11:1.5.0,com.databricks:spark-xml_2.11:0.5.0,com.databricks:spark-avro_2.11:4.0.0

// Read XML and make DataFrame
scala> val dfCountriesXML = spark.read.format("xml").option("rowTag","Country").load("hdfs://localhost:9000/user/countries/country-capitals.xml")
dfCountriesXML: org.apache.spark.sql.DataFrame = [CapitalLatitude: string, CapitalLongitude: double ... 4 more fields]

scala> dfCountriesXML.show(5)
+------------------+----------------+------------------+-------------+-----------+--------------------+
|   CapitalLatitude|CapitalLongitude|       CapitalName|ContinentName|CountryCode|         CountryName|
+------------------+----------------+------------------+-------------+-----------+--------------------+
|              9.55|           44.05|          Hargeisa|       Africa|       NULL|          Somaliland|
|        -54.283333|           -36.5| King Edward Point|   Antarctica|         GS|South Georgia and...|
|            -49.35|       70.216667|Port-aux-Français|   Antarctica|         TF|French Southern a...|
|31.766666666666666|       35.233333|         Jerusalem|         Asia|         PS|           Palestine|
|         60.116667|            19.9|         Mariehamn|       Europe|         AX|       Aland Islands|
+------------------+----------------+------------------+-------------+-----------+--------------------+
only showing top 5 rows


 // Making Dataframe - Reading CSV with Schema
scala> val dfCountriesCSV = spark.read.format("csv").option("header","true").option("inferSchema","true").load("hdfs://localhost:9000/user/countries/country-capitals.csv")


scala> dfCountriesCSV.show(3)
+--------------------+-----------------+---------------+----------------+-----------+-------------+
|         CountryName|      CapitalName|CapitalLatitude|CapitalLongitude|CountryCode|ContinentName|
+--------------------+-----------------+---------------+----------------+-----------+-------------+
|          Somaliland|         Hargeisa|           9.55|           44.05|       NULL|       Africa|
|South Georgia and...|King Edward Point|     -54.283333|           -36.5|         GS|   Antarctica|
|French Southern a...|Port-aux-Français|         -49.35|       70.216667|         TF|   Antarctica|
+--------------------+-----------------+---------------+----------------+-----------+-------------+
only showing top 3 rows


scala> dfCountriesCSV.printSchema
root
 |-- CountryName: string (nullable = true)
 |-- CapitalName: string (nullable = true)
 |-- CapitalLatitude: string (nullable = true)
 |-- CapitalLongitude: double (nullable = true)
 |-- CountryCode: string (nullable = true)
 |-- ContinentName: string (nullable = true)

// Make DataFrame - Reading json file
 scala> val dfCountriesJSON = spark.read.format("json").option("header","true").option("inferSchema","true").load("hdfs://localhost:9000/user/countries/country-capitals.json")
dfCountriesJSON: org.apache.spark.sql.DataFrame = [CapitalLatitude: string, CapitalLongitude: string ... 4 more fields]

scala> dfCountriesJSON.show(3)
+---------------+----------------+------------------+-------------+-----------+--------------------+
|CapitalLatitude|CapitalLongitude|       CapitalName|ContinentName|CountryCode|         CountryName|
+---------------+----------------+------------------+-------------+-----------+--------------------+
|           9.55|       44.050000|          Hargeisa|       Africa|       NULL|          Somaliland|
|     -54.283333|      -36.500000| King Edward Point|   Antarctica|         GS|South Georgia and...|
|         -49.35|       70.216667|Port-aux-Français|   Antarctica|         TF|French Southern a...|
+---------------+----------------+------------------+-------------+-----------+--------------------+
only showing top 3 rows


scala> dfCountriesJSON.printSchema
root
 |-- CapitalLatitude: string (nullable = true)
 |-- CapitalLongitude: string (nullable = true)
 |-- CapitalName: string (nullable = true)
 |-- ContinentName: string (nullable = true)
 |-- CountryCode: string (nullable = true)
 |-- CountryName: string (nullable = true)

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...