Friday, 25 January 2019

Remove Header Row in Spark with Scala

Input file:
emp.csv
----------------
empno,ename,designation,manager,hire_date,sal,deptno
7788,SCOTT,ANALYST,7566,12/9/1982,3000,20
7369,SMITH,CLERK,7902,12/17/1980,800,20
7839,KING,PRESIDENT,NULL,11/17/1981,5000,10
7499,ALLEN,SALESMAN,7698,2/20/1981,1600,30
7844,TURNER,SALESMAN,7698,9/8/1981,1500,30
7521,WARD,SALESMAN,7698,2/22/1981,1250,30
7876,ADAMS,CLERK,7788,1/12/1983,1100,20
7566,TURNER,MANAGER,7839,4/2/1981,2975,20
7900,JAMES,CLERK,7698,12/3/1981,950,30
7654,MARTIN,SALESMAN,7698,9/28/1981,1250,30
7902,FORD,ANALYST,7566,12/3/1981,3000,20
7698,MILLER,MANAGER,7839,5/1/1981,2850,30
7934,MILLER,CLERK,7782,1/23/1982,1300,10
7782,CLARK,MANAGER,7839,6/9/1981,2450,10

// make an RDD with file content including header row
scala> val empRDD = sc.textFile("E:\\POCs\\emp.csv")
empRDD: org.apache.spark.rdd.RDD[String] = E:\POCs\emp.csv MapPartitionsRDD[6] at textFile at <console>:24

// display it
scala> empRDD.foreach(println)
empno,ename,designation,manager,hire_date,sal,deptno
7788,SCOTT,ANALYST,7566,12/9/1982,3000,20
7369,SMITH,CLERK,7902,12/17/1980,800,20
7839,KING,PRESIDENT,NULL,11/17/1981,5000,10
7499,ALLEN,SALESMAN,7698,2/20/1981,1600,30
7844,TURNER,SALESMAN,7698,9/8/1981,1500,30
7521,WARD,SALESMAN,7698,2/22/1981,1250,30
7876,ADAMS,CLERK,7788,1/12/1983,1100,20
7566,TURNER,MANAGER,7839,4/2/1981,2975,20
7900,JAMES,CLERK,7698,12/3/1981,950,30
7654,MARTIN,SALESMAN,7698,9/28/1981,1250,30
7902,FORD,ANALYST,7566,12/3/1981,3000,20
7698,MILLER,MANAGER,7839,5/1/1981,2850,30
7934,MILLER,CLERK,7782,1/23/1982,1300,10
7782,CLARK,MANAGER,7839,6/9/1981,2450,10

// grab first row which means header row
scala> val header = empRDD.first()
header: String = empno,ename,designation,manager,hire_date,sal,deptno

// make a new RDD without header row
scala> val withoutHeader = empRDD.filter (line => line != header)
withoutHeader: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at filter at <console>:27

// display it
scala> withoutHeader.foreach(println)
7369,SMITH,CLERK,7902,12/17/1980,800,20
7788,SCOTT,ANALYST,7566,12/9/1982,3000,20
7839,KING,PRESIDENT,NULL,11/17/1981,5000,10
7499,ALLEN,SALESMAN,7698,2/20/1981,1600,30
7844,TURNER,SALESMAN,7698,9/8/1981,1500,30
7521,WARD,SALESMAN,7698,2/22/1981,1250,30
7876,ADAMS,CLERK,7788,1/12/1983,1100,20
7566,TURNER,MANAGER,7839,4/2/1981,2975,20
7900,JAMES,CLERK,7698,12/3/1981,950,30
7654,MARTIN,SALESMAN,7698,9/28/1981,1250,30
7902,FORD,ANALYST,7566,12/3/1981,3000,20
7698,MILLER,MANAGER,7839,5/1/1981,2850,30
7934,MILLER,CLERK,7782,1/23/1982,1300,10
7782,CLARK,MANAGER,7839,6/9/1981,2450,10


// Data Frame syntax to mention headers

scala> val dataFrame = spark.read.format("CSV").option("header","true").load("E://POCs/emp.csv")
dataFrame: org.apache.spark.sql.DataFrame = [empno: string, ename: string ... 5 more fields]


// dipslay it

dataFrame.show()


No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...