Tuesday, 11 August 2020

Read, Write Avro file using Spark

Avro package is not installed with Spark by default. We need to add that package 
 
SBT: 
// https://mvnrepository.com/artifact/org.apache.spark/spark-avro
libraryDependencies += "org.apache.spark" %% "spark-avro" % "2.4.6"

Maven:
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-avro -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-avro_2.11</artifactId>
    <version>2.4.6</version>
</dependency>

exit from Spark-shell and load the package for avro

https://mvnrepository.com/artifact/org.apache.spark/spark-avro_2.11/2.4.6

$ spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.6

Ivy Default Cache set to: /home/cloudera/.ivy2/cache
The jars for the packages stored in: /home/cloudera/.ivy2/jars
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-avro_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0e872e98-4cb1-44e8-b7fe-b4a155a2848e;1.0
confs: [default]
found org.apache.spark#spark-avro_2.11;2.4.6 in central
found org.spark-project.spark#unused;1.0.0 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.11/2.4.6/spark-avro_2.11-2.4.6.jar ...
[SUCCESSFUL ] org.apache.spark#spark-avro_2.11;2.4.6!spark-avro_2.11.jar (336ms)
downloading https://repo1.maven.org/maven2/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar ...
[SUCCESSFUL ] org.spark-project.spark#unused;1.0.0!unused.jar (153ms)
:: resolution report :: resolve 8010ms :: artifacts dl 512ms
:: modules in use:
org.apache.spark#spark-avro_2.11;2.4.6 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
---------------------------------------------------------------------
|                  |            modules            ||   artifacts   |
|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
|      default     |   2   |   2   |   2   |   0   ||   2   |   2   |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-0e872e98-4cb1-44e8-b7fe-b4a155a2848e
confs: [default]
2 artifacts copied, 0 already retrieved (193kB/28ms)


scala> import com.databricks.spark.avro._
import com.databricks.spark.avro._


scala> val dfDept = spark.sql("select * from ohm.departments")
dfDept: org.apache.spark.sql.DataFrame = [department_id: int, department_name: string]

scala> dfDept.printSchema()
root
 |-- department_id: integer (nullable = true)
 |-- department_name: string (nullable = true)


scala> dfDept.show(5)
+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            2|        Fitness|
|            3|       Footwear|
|            4|        Apparel|
|            5|           Golf|
|            6|       Outdoors|
+-------------+---------------+
only showing top 5 rows

// Make Avro file 
scala> dfDept.write.format("avro").save("/home/cloudera/sparkoutput/dept_avro")


// display the content of avro as json 
$ avro-tools tojson /home/cloudera/sparkoutput/dept_avro/part-00000-2cc1eba9-9b48-4569-9a8c-3354a38f6e0a-c000.avro | tail
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
{"department_id":{"int":2},"department_name":{"string":"Fitness"}}
{"department_id":{"int":3},"department_name":{"string":"Footwear"}}
{"department_id":{"int":4},"department_name":{"string":"Apparel"}}
{"department_id":{"int":5},"department_name":{"string":"Golf"}}
{"department_id":{"int":6},"department_name":{"string":"Outdoors"}}
{"department_id":{"int":7},"department_name":{"string":"Fan Shop"}}


// Read Avro file using spark 
scala> val dfDeptAvro = spark.read.format("avro").load("/home/cloudera/sparkoutput/dept_avro")
dfDeptAvro: org.apache.spark.sql.DataFrame = [department_id: int, department_name: string]

scala> dfDeptAvro.printSchema()
root
 |-- department_id: integer (nullable = true)
 |-- department_name: string (nullable = true)


scala> dfDeptAvro.show(5)
+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            2|        Fitness|
|            3|       Footwear|
|            4|        Apparel|
|            5|           Golf|
|            6|       Outdoors|
+-------------+---------------+
only showing top 5 rows


// Rename the columns
scala> val df = dfDeptAvro.withColumnRenamed("department_id","id").withColumnRenamed("department_name","name")
df: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> df.printSchema()
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)

// display unique records
scala> df.select("id","name").distinct()
res7: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string]

scala> df.select("id","name").distinct().show()
+---+--------+                                                                  
| id|    name|
+---+--------+
|  3|Footwear|
|  7|Fan Shop|
|  6|Outdoors|
|  2| Fitness|
|  4| Apparel|
|  5|    Golf|
+---+--------+

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...