hadoop@hadoop:~/Downloads$ hdfs dfs -cat hdfs://localhost:9000/SparkFiles/inputwithCommas.csv
Name|Age|Education
Sankar|44|DEEE,BCA,Msc(IT)
Anbu|42|BLitt,MA,MPhil,B.Ed
Veeraiah|43|Bsc
Naga|33|DME,BE
scala> val df = spark.read.option("delimiter","|").option("header","True").format("csv").load("hdfs://localhost:9000/SparkFiles/inputwithCommas.csv")
df: org.apache.spark.sql.DataFrame = [Name: string, Age: string ... 1 more field]
scala> df.show()
+--------+---+-------------------+
| Name|Age| Education|
+--------+---+-------------------+
| Sankar| 44| DEEE,BCA,Msc(IT)|
| Anbu| 42|BLitt,MA,MPhil,B.Ed|
|Veeraiah| 43| Bsc|
| Naga| 33| DME,BE|
+--------+---+-------------------+
// Explode :: split comma comma into multiple rows
scala> val df1 = df.withColumn("Qualification",explode(split(col("Education"),",")))
df1: org.apache.spark.sql.DataFrame = [Name: string, Age: string ... 2 more fields]
scala> df1.show()
+--------+---+-------------------+-------------+
| Name|Age| Education|Qualification|
+--------+---+-------------------+-------------+
| Sankar| 44| DEEE,BCA,Msc(IT)| DEEE|
| Sankar| 44| DEEE,BCA,Msc(IT)| BCA|
| Sankar| 44| DEEE,BCA,Msc(IT)| Msc(IT)|
| Anbu| 42|BLitt,MA,MPhil,B.Ed| BLitt|
| Anbu| 42|BLitt,MA,MPhil,B.Ed| MA|
| Anbu| 42|BLitt,MA,MPhil,B.Ed| MPhil|
| Anbu| 42|BLitt,MA,MPhil,B.Ed| B.Ed|
|Veeraiah| 43| Bsc| Bsc|
| Naga| 33| DME,BE| DME|
| Naga| 33| DME,BE| BE|
+--------+---+-------------------+-------------+
// drop an existing column
scala> val df2 = df1.drop(col("Education"))
df2: org.apache.spark.sql.DataFrame = [Name: string, Age: string ... 1 more field]
scala> df2.show()
+--------+---+-------------+
| Name|Age|Qualification|
+--------+---+-------------+
| Sankar| 44| DEEE|
| Sankar| 44| BCA|
| Sankar| 44| Msc(IT)|
| Anbu| 42| BLitt|
| Anbu| 42| MA|
| Anbu| 42| MPhil|
| Anbu| 42| B.Ed|
|Veeraiah| 43| Bsc|
| Naga| 33| DME|
| Naga| 33| BE|
+--------+---+-------------+
No comments:
Post a Comment