Wednesday, 22 July 2020

Pivot Example program in Spark using Scala

$ hdfs dfs -cat /SparkFiles/pivotexa.csv
Id,Subject,Marks
100,English,80
100,Physics,66
100,Maths,68
101,English,81
101,Physics,63
101,Maths,64
102,English,80
102,Physics,65
102,Maths,66
103,English,90
103,Physics,91
103,Maths,92


scala> val df = spark.read.format("csv").option("delimiter",",").option("header","True").option("inferSchema","True").load("hdfs://localhost:9000/SparkFiles/pivotexa.csv")

scala> df.printSchema()
root
 |-- Id: integer (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Marks: integer (nullable = true)


scala> df.show()
+---+-------+-----+
| Id|Subject|Marks|
+---+-------+-----+
|100|English|   80|
|100|Physics|   66|
|100|  Maths|   68|
|101|English|   81|
|101|Physics|   63|
|101|  Maths|   64|
|102|English|   80|
|102|Physics|   65|
|102|  Maths|   66|
|103|English|   90|
|103|Physics|   91|
|103|  Maths|   92|
+---+-------+-----+


// Pivot process goes here
scala> val df1 = df.groupBy("Id").pivot("Subject").max("Marks")
df1: org.apache.spark.sql.DataFrame = [Id: int, English: int ... 2 more fields]

scala> df1.show()
+---+-------+-----+-------+                                                     
| Id|English|Maths|Physics|
+---+-------+-----+-------+
|100|     80|   68|     66|
|101|     81|   64|     63|
|102|     80|   66|     65|
|103|     90|   92|     91|
+---+-------+-----+-------+


scala> val df2 = df1.withColumn("Total",col("English") + col("Maths") + col("Physics"))
df2: org.apache.spark.sql.DataFrame = [Id: int, English: int ... 3 more fields]

scala> df2.show()
+---+-------+-----+-------+-----+
| Id|English|Maths|Physics|Total|
+---+-------+-----+-------+-----+
|100|     80|   68|     66|  214|
|101|     81|   64|     63|  208|
|102|     80|   66|     65|  211|
|103|     90|   92|     91|  273|
+---+-------+-----+-------+-----+

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...