--------------------------------------------------
cat > file1.txt
spark is big data tech
hadoop is also a big data tech
hdfs is a storage system
yarn is a computing system
do you want to continue^C
scala> val rdd1 = sc.textFile("/home/hadoop/Desktop/file1.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /home/hadoop/Desktop/file1.txt MapPartitionsRDD[81] at textFile at <console>:26
scala> rdd1.collect.foreach(println)
spark is big data tech
hadoop is also a big data tech
hdfs is a storage system
yarn is a computing system
//Transformation
scala> val rdd2 = rdd1.filter( x => x.contains("system"))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[82] at filter at <console>:27
//Action
scala> rdd2.collect.foreach(println)
hdfs is a storage system
yarn is a computing system
//write the output into file system
scala> rdd2.saveAsTextFile("file:///home/hadoop/Desktop/rddOutput")
CSV file Read, Transformation,Action, Write
---------------------------------------------
cat > myfile.csv
1,2,USA
2,3,UK
3,4,Australia
4,5,Japan^C
scala> val myfileRDD1 = sc.textFile("/home/hadoop/Desktop/myfile.csv")
myfileRDD1: org.apache.spark.rdd.RDD[String] = /home/hadoop/Desktop/myfile.csv MapPartitionsRDD[94] at textFile at <console>:26
// Fitler transformation
scala> val myfileRDD2 = myfileRDD1.filter ( x => x.contains("UK"))
myfileRDD2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[95] at filter at <console>:27
// Action
scala> myfileRDD2.collect.foreach(println)
2,3,UK
scala> val myfileCSV = sqlContext.read.format("csv").load("/home/hadoop/Desktop/myfile.csv")
scala> myfileCSV.show();
+---+---+---------+
|_c0|_c1| _c2|
+---+---+---------+
| 1| 2| USA |
| 2| 3| UK |
| 3| 4|Australia|
+---+---+---------+
//Transformation
scala> val myfileCSV2 = myfileCSV.where("_c2='UK'")
myfileCSV2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: string, _c1: string ... 1 more field]
//Action
scala> myfileCSV2.show()
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
| 2| 3| UK |
+---+---+---+
//write the output into file system
scala> myfileCSV2.write.format("csv").save("file:////home/hadoop/Desktop/csvOutput")
No comments:
Post a Comment