scala> val empRDD = sc.textFile("E:\\POCs\\emp_data.csv")
empRDD: org.apache.spark.rdd.RDD[String] = E:\POCs\emp_data.csv MapPartitionsRDD[35] at textFile at <console>:24
scala> val header = empRDD.first()
header: String = empno,ename,designation,manager,hire_date,sal,deptno
scala> val withoutHeader = empRDD.filter (line => line != header)
withoutHeader: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[36] at filter at <console>:27
scala> withoutHeader.foreach(println)
7369,SMITH,CLERK,7902,12/17/1980,800,20
7788,SCOTT,ANALYST,7566,12/9/1982,3000,20
7499,ALLEN,SALESMAN,7698,2/20/1981,1600,30
7839,KING,PRESIDENT,NULL,11/17/1981,5000,10
7521,WARD,SALESMAN,7698,2/22/1981,1250,30
7844,TURNER,SALESMAN,7698,9/8/1981,1500,30
7566,TURNER,MANAGER,7839,4/2/1981,2975,20
7654,MARTIN,SALESMAN,7698,9/28/1981,1250,30
7698,MILLER,MANAGER,7839,5/1/1981,2850,30
7782,CLARK,MANAGER,7839,6/9/1981,2450,10
7876,ADAMS,CLERK,7788,1/12/1983,1100,20
7900,JAMES,CLERK,7698,12/3/1981,950,30
7902,FORD,ANALYST,7566,12/3/1981,3000,20
7934,MILLER,CLERK,7782,1/23/1982,1300,10
scala> val salList = withoutHeader.map ( line => {
| val fields = line.split(",")
| val sal = fields(5).toDouble
| (sal)
| }
| )
salList: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[37] at map at <console>:25
// second Highest salary
scala> val secondHighestSal = maxSal.zipWithIndex()
secondHighestSal: org.apache.spark.rdd.RDD[(Double, Long)] = ZippedWithIndexRDD[56] at zipWithIndex at <console>:25
scala> secondHighestSal.collect.foreach(println)
(5000.0,0)
(3000.0,1)
(2975.0,2)
(2850.0,3)
(2450.0,4)
(1600.0,5)
(1500.0,6)
(1300.0,7)
(1250.0,8)
(1100.0,9)
(950.0,10)
(800.0,11)
scala> val result2ndHighestSal = secondHighestSal.filter( x => x._2 == 1)
result2ndHighestSal: org.apache.spark.rdd.RDD[(Double, Long)] = MapPartitionsRDD[57] at filter at <console>:25
scala> result2ndHighestSal.foreach(println)
(3000.0,1)
scala> salList.max()
res9: Double = 5000.0
/// find maximum salary
scala> val maxSal = salList.distinct.sortBy( x => x.toDouble,false,1)
maxSal: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[43] at sortBy at <consol
scala> maxSal.foreach(println)
5000.0
3000.0
2975.0
2850.0
2450.0
1600.0
1500.0
1300.0
1250.0
1100.0
950.0
800.0
scala> maxSal.take(1)
res13: Array[Double] = Array(5000.0)
scala> maxSal.take(1).foreach(println)
5000.0
// find minimum salary
scala> val minSal = salList.distinct.sortBy(x => x.toDouble,true,1)
minSal: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[55] at sortBy at <console>:25
scala> minSal
res15: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[55] at sortBy at <console>:25
scala> minSal.take(1)
res16: Array[Double] = Array(800.0)
scala> minSal.take(1).foreach(println)
800.0
empRDD: org.apache.spark.rdd.RDD[String] = E:\POCs\emp_data.csv MapPartitionsRDD[35] at textFile at <console>:24
scala> val header = empRDD.first()
header: String = empno,ename,designation,manager,hire_date,sal,deptno
scala> val withoutHeader = empRDD.filter (line => line != header)
withoutHeader: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[36] at filter at <console>:27
scala> withoutHeader.foreach(println)
7369,SMITH,CLERK,7902,12/17/1980,800,20
7788,SCOTT,ANALYST,7566,12/9/1982,3000,20
7499,ALLEN,SALESMAN,7698,2/20/1981,1600,30
7839,KING,PRESIDENT,NULL,11/17/1981,5000,10
7521,WARD,SALESMAN,7698,2/22/1981,1250,30
7844,TURNER,SALESMAN,7698,9/8/1981,1500,30
7566,TURNER,MANAGER,7839,4/2/1981,2975,20
7654,MARTIN,SALESMAN,7698,9/28/1981,1250,30
7698,MILLER,MANAGER,7839,5/1/1981,2850,30
7782,CLARK,MANAGER,7839,6/9/1981,2450,10
7876,ADAMS,CLERK,7788,1/12/1983,1100,20
7900,JAMES,CLERK,7698,12/3/1981,950,30
7902,FORD,ANALYST,7566,12/3/1981,3000,20
7934,MILLER,CLERK,7782,1/23/1982,1300,10
scala> val salList = withoutHeader.map ( line => {
| val fields = line.split(",")
| val sal = fields(5).toDouble
| (sal)
| }
| )
salList: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[37] at map at <console>:25
// second Highest salary
scala> val secondHighestSal = maxSal.zipWithIndex()
secondHighestSal: org.apache.spark.rdd.RDD[(Double, Long)] = ZippedWithIndexRDD[56] at zipWithIndex at <console>:25
scala> secondHighestSal.collect.foreach(println)
(5000.0,0)
(3000.0,1)
(2975.0,2)
(2850.0,3)
(2450.0,4)
(1600.0,5)
(1500.0,6)
(1300.0,7)
(1250.0,8)
(1100.0,9)
(950.0,10)
(800.0,11)
scala> val result2ndHighestSal = secondHighestSal.filter( x => x._2 == 1)
result2ndHighestSal: org.apache.spark.rdd.RDD[(Double, Long)] = MapPartitionsRDD[57] at filter at <console>:25
scala> result2ndHighestSal.foreach(println)
(3000.0,1)
scala> salList.max()
res9: Double = 5000.0
/// find maximum salary
scala> val maxSal = salList.distinct.sortBy( x => x.toDouble,false,1)
maxSal: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[43] at sortBy at <consol
scala> maxSal.foreach(println)
5000.0
3000.0
2975.0
2850.0
2450.0
1600.0
1500.0
1300.0
1250.0
1100.0
950.0
800.0
scala> maxSal.take(1)
res13: Array[Double] = Array(5000.0)
scala> maxSal.take(1).foreach(println)
5000.0
// find minimum salary
scala> val minSal = salList.distinct.sortBy(x => x.toDouble,true,1)
minSal: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[55] at sortBy at <console>:25
scala> minSal
res15: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[55] at sortBy at <console>:25
scala> minSal.take(1)
res16: Array[Double] = Array(800.0)
scala> minSal.take(1).foreach(println)
800.0
No comments:
Post a Comment