Saturday, 26 January 2019

Load, Transformation, Action, Write operations for Flat Text file Vs CSV Files in Spark with Scala

Flat text file read, transformation, action, write
--------------------------------------------------
cat > file1.txt
spark is big data tech
hadoop is also a big data tech
hdfs is a storage system
yarn is a computing system
do you want to continue^C

scala> val rdd1 = sc.textFile("/home/hadoop/Desktop/file1.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /home/hadoop/Desktop/file1.txt MapPartitionsRDD[81] at textFile at <console>:26

scala> rdd1.collect.foreach(println)
spark is big data tech
hadoop is also a big data tech
hdfs is a storage system
yarn is a computing system

//Transformation
scala> val rdd2 = rdd1.filter( x => x.contains("system"))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[82] at filter at <console>:27

//Action
scala> rdd2.collect.foreach(println)
hdfs is a storage system
yarn is a computing system

//write the output into file system
scala> rdd2.saveAsTextFile("file:///home/hadoop/Desktop/rddOutput")



CSV file Read, Transformation,Action, Write
---------------------------------------------
cat > myfile.csv
1,2,USA
2,3,UK
3,4,Australia
4,5,Japan^C

scala> val myfileRDD1 = sc.textFile("/home/hadoop/Desktop/myfile.csv")
myfileRDD1: org.apache.spark.rdd.RDD[String] = /home/hadoop/Desktop/myfile.csv MapPartitionsRDD[94] at textFile at <console>:26


// Fitler transformation
scala> val myfileRDD2 = myfileRDD1.filter ( x => x.contains("UK"))
myfileRDD2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[95] at filter at <console>:27

// Action
scala> myfileRDD2.collect.foreach(println)
2,3,UK


scala> val myfileCSV = sqlContext.read.format("csv").load("/home/hadoop/Desktop/myfile.csv")

scala> myfileCSV.show();
+---+---+---------+
|_c0|_c1|      _c2|
+---+---+---------+
|  1|  2|      USA |
|  2|  3|       UK  |
|  3|  4|Australia|
+---+---+---------+

//Transformation
scala> val myfileCSV2 = myfileCSV.where("_c2='UK'")
myfileCSV2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: string, _c1: string ... 1 more field]

//Action
scala> myfileCSV2.show()
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|  2|  3| UK  |
+---+---+---+

//write the output into file system
scala> myfileCSV2.write.format("csv").save("file:////home/hadoop/Desktop/csvOutput")

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...