Saturday, 15 August 2020

Row Object into RDD into DataFrame with Schema in Spark with Scala

//Row Object into RDD into DataFrame with Schema in Spark with Scala

import org.apache.spark.sql.types._ 
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import scala.collection.mutable.ListBuffer
import org.apache.spark.util.IntParam


// Inmemory Sequence object
val Employee = Seq(
Row("Anbu","Sudha","F","sudhadhanukkodi@gmail.com",10000),
Row("Sankara","Narayanan","M","sankarmcts@gmail.com",20000), 
Row("Vijay","Balaji","M","vijaybalaji2001@gmail.com",25000),
Row("Aishvarya","Sudha","F","aishvaryasudha@gmail.com",35000))
// Make an RDD
val EmployeeRDD = spark.sparkContext.parallelize(Employee)


// Define the Schema 
val EmployeeSchema = StructType(
StructField("FirstName",StringType,true)::
StructField("LastName",StringType,true)::
StructField("Gender",StringType,true)::
StructField("Email",StringType,true)::
StructField("Salary",IntegerType,true)::Nil)
// Apply the Schema for an existing RDD
val empDF = spark.createDataFrame(EmployeeRDD,EmployeeSchema)
 

scala> empDF.show(false)
+---------+---------+------+-------------------------+------+
|FirstName|LastName |Gender|Email                    |Salary|
+---------+---------+------+-------------------------+------+
|Anbu     |Sudha    |F     |sudhadhanukkodi@gmail.com|10000 |
|Sankara  |Narayanan|M     |sankarmcts@gmail.com     |20000 |
|Vijay    |Balaji   |M     |vijaybalaji2001@gmail.com|25000 |
|Aishvarya|Sudha    |F     |aishvaryasudha@gmail.com |35000 |
+---------+---------+------+-------------------------+------+


// Combine FirstName + LastName to make Name fields
// Delete the columns : FirstName, LastName 
scala> val df =  empDF.withColumn("Name", concat_ws(" ",empDF("FirstName"),empDF("LastName"))).drop("FirstName").drop("LastName")
df: org.apache.spark.sql.DataFrame = [Gender: string, Email: string ... 2 more fields]

scala> df.show()
+------+--------------------+------+-----------------+
|Gender|               Email|Salary|             Name|
+------+--------------------+------+-----------------+
|     F|sudhadhanukkodi@g...| 10000|       Anbu Sudha|
|     M|sankarmcts@gmail.com| 20000|Sankara Narayanan|
|     M|vijaybalaji2001@g...| 25000|     Vijay Balaji|
|     F|aishvaryasudha@gm...| 35000|  Aishvarya Sudha|
+------+--------------------+------+-----------------+
 
 
 // Total amount of Salary for all the employees 
scala> df.agg(sum("Salary").alias("TotalSal")).show()
+--------+
|TotalSal|
+--------+
|   90000|
+--------+


// Find the total of Salaries based on Gender. Total Male Salary, Total Femal Salary
scala> df.groupBy("Gender").agg(sum("Salary").alias("Sal")).show()
+------+-----+
|Gender|  Sal|
+------+-----+
|     F|45000|
|     M|45000|
+------+-----+

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...