Tuesday, 29 January 2019

How to do groupBy Aggregation in Spark with Scala

Input
file2.csv:
--------------
hdfs dfs -cat /user/file2.csv
s1,d1
s1,d2
s1,d2
s1,d3
s2,d1
s2,d3
s2,d1
s3,d2
s3,d1
s1,d1
s2,d1
s3,d1
s1,d1
s2,d2
s3,d3


// autogenerated column headers
scala> val df = spark.read.format("csv").option("inferSchema","true").load("hdfs://localhost:9000/user/file2.csv")
df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string]

scala> df.show
+---+---+
|_c0|_c1|
+---+---+
| s1| d1|
| s1| d2|
| s1| d2|
| s1| d3|
| s2| d1|
| s2| d3|
| s2| d1|
| s3| d2|
| s3| d1|
| s1| d1|
| s2| d1|
| s3| d1|
| s1| d1|
| s2| d2|
| s3| d3|
+---+---+

// groupBy aggregation goes here
scala> df.groupBy("_c0","_c1").agg(count("*")).show
+---+---+--------+                                                             
|_c0|_c1|count(1)|
+---+---+--------+
| s3| d2|       1|
| s2| d2|       1|
| s1| d2|       2|
| s1| d1|       3|
| s3| d1|       2|
| s2| d1|       3|
| s3| d3|       1|
| s1| d3|       1|
| s2| d3|       1|
+---+---+--------+

// In order to make our own schema we need to import the following
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

// Making Schema
scala> val sch = StructType(StructField("School",StringType)::StructField("Department",StringType)::Nil)
sch: org.apache.spark.sql.types.StructType = StructType(StructField(School,StringType,true), StructField(Department,StringType,true))

// applying schema
scala> val df = spark.read.format("csv").option("inferSchema","true").schema(sch).load("hdfs://localhost:9000/user/file2.csv")
df: org.apache.spark.sql.DataFrame = [School: string, Department: string]

scala> df.printSchema
root
 |-- School: string (nullable = true)
 |-- Department: string (nullable = true)

scala> df.show
+------+----------+
|School|Department|
+------+----------+
|    s1|        d1|
|    s1|        d2|
|    s1|        d2|
|    s1|        d3|
|    s2|        d1|
|    s2|        d3|
|    s2|        d1|
|    s3|        d2|
|    s3|        d1|
|    s1|        d1|
|    s2|        d1|
|    s3|        d1|
|    s1|        d1|
|    s2|        d2|
|    s3|        d3|
+------+----------+

// groupBy Aggregation operation
//added alias name for 3rd column
scala> df.groupBy("School","Department").agg(count("*") as "Count").show
+------+----------+-----+
|School|Department|Count|
+------+----------+-----+
|    s3|        d2|    1|
|    s2|        d2|    1|
|    s1|        d2|    2|
|    s1|        d1|    3|
|    s3|        d1|    2|
|    s2|        d1|    3|
|    s3|        d3|    1|
|    s1|        d3|    1|
|    s2|        d3|    1|
+------+----------+-----+

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...