Avro package is not installed with Spark by default. We need to add that package
SBT:
// https://mvnrepository.com/artifact/org.apache.spark/spark-avro
libraryDependencies += "org.apache.spark" %% "spark-avro" % "2.4.6"
Maven:
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-avro -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>2.4.6</version>
</dependency>
exit from Spark-shell and load the package for avro
https://mvnrepository.com/artifact/org.apache.spark/spark-avro_2.11/2.4.6
$ spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.6
Ivy Default Cache set to: /home/cloudera/.ivy2/cache
The jars for the packages stored in: /home/cloudera/.ivy2/jars
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-avro_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0e872e98-4cb1-44e8-b7fe-b4a155a2848e;1.0
confs: [default]
found org.apache.spark#spark-avro_2.11;2.4.6 in central
found org.spark-project.spark#unused;1.0.0 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.11/2.4.6/spark-avro_2.11-2.4.6.jar ...
[SUCCESSFUL ] org.apache.spark#spark-avro_2.11;2.4.6!spark-avro_2.11.jar (336ms)
downloading https://repo1.maven.org/maven2/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar ...
[SUCCESSFUL ] org.spark-project.spark#unused;1.0.0!unused.jar (153ms)
:: resolution report :: resolve 8010ms :: artifacts dl 512ms
:: modules in use:
org.apache.spark#spark-avro_2.11;2.4.6 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 2 | 2 | 2 | 0 || 2 | 2 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-0e872e98-4cb1-44e8-b7fe-b4a155a2848e
confs: [default]
2 artifacts copied, 0 already retrieved (193kB/28ms)
scala> import com.databricks.spark.avro._
import com.databricks.spark.avro._
scala> val dfDept = spark.sql("select * from ohm.departments")
dfDept: org.apache.spark.sql.DataFrame = [department_id: int, department_name: string]
scala> dfDept.printSchema()
root
|-- department_id: integer (nullable = true)
|-- department_name: string (nullable = true)
scala> dfDept.show(5)
+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
| 2| Fitness|
| 3| Footwear|
| 4| Apparel|
| 5| Golf|
| 6| Outdoors|
+-------------+---------------+
only showing top 5 rows
// Make Avro file
scala> dfDept.write.format("avro").save("/home/cloudera/sparkoutput/dept_avro")
// display the content of avro as json
$ avro-tools tojson /home/cloudera/sparkoutput/dept_avro/part-00000-2cc1eba9-9b48-4569-9a8c-3354a38f6e0a-c000.avro | tail
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
{"department_id":{"int":2},"department_name":{"string":"Fitness"}}
{"department_id":{"int":3},"department_name":{"string":"Footwear"}}
{"department_id":{"int":4},"department_name":{"string":"Apparel"}}
{"department_id":{"int":5},"department_name":{"string":"Golf"}}
{"department_id":{"int":6},"department_name":{"string":"Outdoors"}}
{"department_id":{"int":7},"department_name":{"string":"Fan Shop"}}
// Read Avro file using spark
scala> val dfDeptAvro = spark.read.format("avro").load("/home/cloudera/sparkoutput/dept_avro")
dfDeptAvro: org.apache.spark.sql.DataFrame = [department_id: int, department_name: string]
scala> dfDeptAvro.printSchema()
root
|-- department_id: integer (nullable = true)
|-- department_name: string (nullable = true)
scala> dfDeptAvro.show(5)
+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
| 2| Fitness|
| 3| Footwear|
| 4| Apparel|
| 5| Golf|
| 6| Outdoors|
+-------------+---------------+
only showing top 5 rows
// Rename the columns
scala> val df = dfDeptAvro.withColumnRenamed("department_id","id").withColumnRenamed("department_name","name")
df: org.apache.spark.sql.DataFrame = [id: int, name: string]
scala> df.printSchema()
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
// display unique records
scala> df.select("id","name").distinct()
res7: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string]
scala> df.select("id","name").distinct().show()
+---+--------+
| id| name|
+---+--------+
| 3|Footwear|
| 7|Fan Shop|
| 6|Outdoors|
| 2| Fitness|
| 4| Apparel|
| 5| Golf|
+---+--------+
No comments:
Post a Comment