Thursday, 30 July 2020

Scenario based Spark Programs


#1. Get all the employees whose year of joining is between 2017 and 2018 and salary is >50.

Customer table

cust_id cust_name yoj
1 X 2019-01-01 12:02:00.0
2 y 2018-01-01 12:02:00.0
3 Z 2017-01-01 12:02:00.0
Department table

id amount
1 100
1 200


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._


val sparkSession=SparkSession
.builder()
.master("local[*]")
.appName("Interview Questions")
.getOrCreate()
import sparkSession.implicits._

val customerdf=Seq((1,"X","2019-01-01 12:02:00.0")
,(2,"y","2018-01-01 12:02:00.0")
,(3,"Z","2017-01-01 12:02:00.0")
,(4,"F","2017-01-01 12:02:00.0")
,(5,"G","2017-01-01 10:02:00.0")).toDF("cust_id","cust_name","yoj")
 
val deptdf=Seq((1,100)
,(1,200)
,(2,300)
,(3,50)
,(4,50)).toDF("id","amount")
 
val cust= customerdf.withColumn("datetime", date_format(col("yoj").cast(TimestampType), "yyyy-MM-dd HH:mm:ss").cast(TimestampType))
val filteredcust=cust.filter(year($"yoj").between(2017,2019))//.show

val filtereddept=deptdf.filter($"amount">50)
val result=filteredcust.join(filtereddept,col("cust_id")===col("id"),"inner").show(truncate = false)

+-------+---------+---------------------+-------------------+---+------+
|cust_id|cust_name|yoj                  |datetime           |id |amount|
+-------+---------+---------------------+-------------------+---+------+
|1      |X        |2019-01-01 12:02:00.0|2019-01-01 12:02:00|1  |200   |
|1      |X        |2019-01-01 12:02:00.0|2019-01-01 12:02:00|1  |100   |
|2      |y        |2018-01-01 12:02:00.0|2018-01-01 12:02:00|2  |300   |
+-------+---------+---------------------+-------------------+---+------+

#2. Count the number of people residing in each city, there are 2 tables employee(empid,name,citycode) and city(citycode,cityname)
Employee table :

empid empname empcitycode
1 a 1
2 b 2
3 c 1
4 d 1
5 e 3
City Table :

CityCode CityName
1 delhi
2 punjab
3 mumbai

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._


val empdf = Seq((1, "a", 1), 
(2, "b", 2), 
(3, "c", 1), 
(4, "d", 1), 
(5, "e", 3)).toDF("empid", "empname", "empcitycode")
val citydf = Seq((1, "delhi"), 
(2, "punjab"), 
(3, "mumbai")).toDF("CityCode", "CityName")
val joineddf = empdf.join(citydf, $"empcitycode" === $"CityCode", "inner")
//DataframesWay

val window = Window.partitionBy($"empcitycode")
val result = joineddf.withColumn("citywiseresult", count($"empid") over window).select("CityName", "citywiseresult").distinct.show

+--------+--------------+
|CityName|citywiseresult|
+--------+--------------+
|   delhi|             3|
|  mumbai|             1|
|  punjab|             1|
+--------+--------------+




#3. Get all the records from left table which are not present in right table
Table Left :

id left
0 zero
1 one
Table Right :

id right
0 zero
2 two
3 three

val left = Seq((0, "zero"), 
   (1, "one")).toDF("id", "left")
   
val right = Seq((0, "zero"), 
(2, "two"), 
(3, "three")).toDF("id", "right")
left.join(right,Seq("id"),"left_anti").show


+---+----+
| id|left|
+---+----+
|  1| one|
+---+----+


RDD Approach:

#4. Find avg salary department wise using rdd approach
Table Dept :

DEPT EMP_NAME SALARY
1 RAM 10
2 Shyam 30
1 Vijay 10

val inputRdd = sc.parallelize(Seq("1,'RAM',10",
  "2,'Shyam',30",
  "1,'Vijay',10"))
inputRdd.map(record=>record.split(",")).map(record=>(record(0).toInt,(record(2).toInt,1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>x._1/x._2).collect.foreach(println)

(1,10)
(2,30)



#5. Find the lowest salary in each department
Table Dept :

id amount
1 100
1 200
2 300
2 50
3 50

import sparkSession.implicits._
val df=Seq((1,100),
   (1,200),
   (2,300),
   (2,50),
   (3,50)).toDF("id","amount")
 //Dataframes way
 val window=Window.partitionBy($"id").orderBy($"amount".asc,$"id".asc)
 val df1=df.withColumn("row_number",row_number().over(window))
 df1.select("id","amount").where("row_number=1").show()
 
 
 +---+------+
| id|amount|
+---+------+
|  1|   100|
|  3|    50|
|  2|    50|
+---+------+

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...