//Row Object into RDD into DataFrame with Schema in Spark with Scala
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import scala.collection.mutable.ListBuffer
import org.apache.spark.util.IntParam
// Inmemory Sequence object
val Employee = Seq(
Row("Anbu","Sudha","F","sudhadhanukkodi@gmail.com",10000),
Row("Sankara","Narayanan","M","sankarmcts@gmail.com",20000),
Row("Vijay","Balaji","M","vijaybalaji2001@gmail.com",25000),
Row("Aishvarya","Sudha","F","aishvaryasudha@gmail.com",35000))
// Make an RDD
val EmployeeRDD = spark.sparkContext.parallelize(Employee)
// Define the Schema
val EmployeeSchema = StructType(
StructField("FirstName",StringType,true)::
StructField("LastName",StringType,true)::
StructField("Gender",StringType,true)::
StructField("Email",StringType,true)::
StructField("Salary",IntegerType,true)::Nil)
// Apply the Schema for an existing RDD
val empDF = spark.createDataFrame(EmployeeRDD,EmployeeSchema)
scala> empDF.show(false)
+---------+---------+------+-------------------------+------+
|FirstName|LastName |Gender|Email |Salary|
+---------+---------+------+-------------------------+------+
|Anbu |Sudha |F |sudhadhanukkodi@gmail.com|10000 |
|Sankara |Narayanan|M |sankarmcts@gmail.com |20000 |
|Vijay |Balaji |M |vijaybalaji2001@gmail.com|25000 |
|Aishvarya|Sudha |F |aishvaryasudha@gmail.com |35000 |
+---------+---------+------+-------------------------+------+
// Combine FirstName + LastName to make Name fields
// Delete the columns : FirstName, LastName
scala> val df = empDF.withColumn("Name", concat_ws(" ",empDF("FirstName"),empDF("LastName"))).drop("FirstName").drop("LastName")
df: org.apache.spark.sql.DataFrame = [Gender: string, Email: string ... 2 more fields]
scala> df.show()
+------+--------------------+------+-----------------+
|Gender| Email|Salary| Name|
+------+--------------------+------+-----------------+
| F|sudhadhanukkodi@g...| 10000| Anbu Sudha|
| M|sankarmcts@gmail.com| 20000|Sankara Narayanan|
| M|vijaybalaji2001@g...| 25000| Vijay Balaji|
| F|aishvaryasudha@gm...| 35000| Aishvarya Sudha|
+------+--------------------+------+-----------------+
// Total amount of Salary for all the employees
scala> df.agg(sum("Salary").alias("TotalSal")).show()
+--------+
|TotalSal|
+--------+
| 90000|
+--------+
// Find the total of Salaries based on Gender. Total Male Salary, Total Femal Salary
scala> df.groupBy("Gender").agg(sum("Salary").alias("Sal")).show()
+------+-----+
|Gender| Sal|
+------+-----+
| F|45000|
| M|45000|
+------+-----+
No comments:
Post a Comment