Sunday, 20 January 2019

How to remove header rows of a file content using Spark with Scala?

Here we have a text file with 2 different header lines. Using zipWithIndex function we will
introduce a new autogenerated sequence column at the end

input file has 2 different header lines
auto.csv:
---------
mpg,cylinders,displacement,horsepower,weight,acceleration,year,origin,name
DOUBLE;INT;DOUBLE;DOUBLE;DOUBLE;DOUBLE;DOUBLE;DOUBLE;STRING
18,8,307,130,3504,12,70,1,chevrolet chevelle malibu
15,8,350,165,3693,11.5,70,1,buick skylark 320
18,8,318,150,3436,11,70,1,plymouth satellite
16,8,304,150,3433,12,70,1,amc rebel sst

How to exclude (filter) first 2 lines from the rdd?
I mean how to remove the following lines
mpg,cylinders,displacement,horsepower,weight,acceleration,year,origin,name
DOUBLE;INT;DOUBLE;DOUBLE;DOUBLE;DOUBLE;DOUBLE;DOUBLE;STRING

// load the file into RDD (c1)
scala> val c1 = sc.textFile("E:\\POCs\\Auto.csv")
c1: org.apache.spark.rdd.RDD[String] = E:\POCs\Auto.csv MapPartitionsRDD[6] at textFile at <console>:24

scala> c1.take(5).foreach(println)
mpg,cylinders,displacement,horsepower,weight,acceleration,year,origin,name
DOUBLE;INT;DOUBLE;DOUBLE;DOUBLE;DOUBLE;DOUBLE;DOUBLE;STRING
18,8,307,130,3504,12,70,1,chevrolet chevelle malibu
15,8,350,165,3693,11.5,70,1,buick skylark 320


// Add an autogenerated long column at the end with sequence values
// make a tuple for each lines
scala> val c2 = c1.zipWithIndex()
c2: org.apache.spark.rdd.RDD[(String, Long)] = ZippedWithIndexRDD[7] at zipWithIndex at <console>:25

// see 0,1,2,3,4.... at the end and now every line is tuple
scala> c2.take(5).foreach(println)
(mpg,cylinders,displacement,horsepower,weight,acceleration,year,origin,name,0)
(DOUBLE;INT;DOUBLE;DOUBLE;DOUBLE;DOUBLE;DOUBLE;DOUBLE;STRING,1)
(18,8,307,130,3504,12,70,1,chevrolet chevelle malibu,2)
(15,8,350,165,3693,11.5,70,1,buick skylark 320,3)
(18,8,318,150,3436,11,70,1,plymouth satellite,4)
(DROPPED,7854545645,6456456456)
scala> D,5755454897,9797979797)

// first line is end with ,0
// second line is end with ,1
// our condition is x._2 > 1
scala> val c3 = c2.filter ( x => x._2 > 1)
c3: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[8] at filter at <console>:25

// here first and second lines are eliminated (header rows removed)
scala> c3.take(5).foreach(println)
(18,8,307,130,3504,12,70,1,chevrolet chevelle malibu,2)
(15,8,350,165,3693,11.5,70,1,buick skylark 320,3)
(18,8,318,150,3436,11,70,1,plymouth satellite,4)
(16,8,304,150,3433,12,70,1,amc rebel sst,5)
(17,8,302,140,3449,10.5,70,1,ford torino,6)

// now remove the last column (0,1,2,3,4) and make String instead of tuple
scala> val c4 = c3.map (x => x._1)
c4: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at map at <console>:25

// here the last (auto generated column is removed)
scala> c4.take(5).foreach(println)
18,8,307,130,3504,12,70,1,chevrolet chevelle malibu
15,8,350,165,3693,11.5,70,1,buick skylark 320
18,8,318,150,3436,11,70,1,plymouth satellite
16,8,304,150,3433,12,70,1,amc rebel sst
17,8,302,140,3449,10.5,70,1,ford torino




/// entire line is just a single string
scala> c4
res13: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at map at <console>:25


// entire line is Array of String
scala> c5
res14: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[10] at map at <console>:25


scala> c4.take(5)
res15: Array[String] = Array(18,8,307,130,3504,12,70,1,chevrolet chevelle malibu, 15,8,350,165,3693,11.5,70,1,buick skylark 320, 18,8,318,150,3436,11,
70,1,plymouth satellite, 16,8,304,150,3433,12,70,1,amc rebel sst, 17,8,302,140,3449,10.5,70,1,ford torino)

scala> c5.take(5)
res16: Array[Array[String]] = Array(Array(18, 8, 307, 130, 3504, 12, 70, 1, chevrolet chevelle malibu), Array(15, 8, 350, 165, 3693, 11.5, 70, 1, buic
k skylark 320), Array(18, 8, 318, 150, 3436, 11, 70, 1, plymouth satellite), Array(16, 8, 304, 150, 3433, 12, 70, 1, amc rebel sst), Array(17, 8, 302,
 140, 3449, 10.5, 70, 1, ford torino))


// convert Array[String] into Tuple 
scala> val c6 = c5.map { x =>
     |        val f1 = x(8).toString
     |        val f2 = x(0).toFloat
     |        val f3 = x(1).toInt
     |        val f4 = x(2).toFloat
     |        val f5 = x(3).toFloat
     |        val f6 = x(4).toDouble
     |        val f7 = x(5).toFloat
     |        val f8 = x(6).toInt
     |        val f9 = x(7)
     |      (f1,f2,f3,f4,f5,f6,f7,f8,f9)
     |      }
c6: org.apache.spark.rdd.RDD[(String, Float, Int, Float, Float, Double, Float, Int, String)] = MapParti

scala> c6.take(5).foreach(println)
(chevrolet chevelle malibu,18.0,8,307.0,130.0,3504.0,12.0,70,1)
(buick skylark 320,15.0,8,350.0,165.0,3693.0,11.5,70,1)
(plymouth satellite,18.0,8,318.0,150.0,3436.0,11.0,70,1)
(amc rebel sst,16.0,8,304.0,150.0,3433.0,12.0,70,1)
(ford torino,17.0,8,302.0,140.0,3449.0,10.5,70,1)


// Make Data Frame without Column Headers
scala> val df = c6.toDF
df: org.apache.spark.sql.DataFrame = [_1: string, _2: float ... 7 more fields]


// Make Data Frame from RDD
scala> val df = c6.toDF("Car","MPG","Cylinders","Displacement","Horsepower","Weight","Acceleration","Model","Origin")
df: org.apache.spark.sql.DataFrame = [Car: string, MPG: float ... 7 more fields]



How to access Array elements in Scala
----------------------------------------
scala> val myArray = Array(1,3,2,4,3,5,2)
myArray: Array[Int] = Array(1, 3, 2, 4, 3, 5, 2)

scala> myArray(0)
res6: Int = 1

scala> myArray(6)
res7: Int = 2

How to access Tuple elements in Scala?
----------------------------------------
scala> val t = (34,23,132,54)
t: (Int, Int, Int, Int) = (34,23,132,54)

scala> t._1
res8: Int = 34

scala> t._3
res9: Int = 132

scala> t._4
res10: Int = 54

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...