Saturday, 26 January 2019

How to Handle TAB, Comma Separated files in Spark with Scala

// Handling Comma Separated Files
Input
student.csv:
----------------
cat > student.csv
id,name,course,year
1,David,Hadoop,2018
2,Raju,Scala,2017
3,Priya,Spark,2015
4,Kalai,Scala,2019
5,Nila,Spark,2017
6,Arivu,Scala,2019^C


scala> val df_Student = sqlContext.read.format("csv").load("/home/hadoop/Desktop/student.csv")

// Header information is missing but autogenerated column names : _c0, _c1, _c2, _c3 are present there
scala> df_Student.show()
+---+-----+------+----+
|_c0|  _c1|   _c2| _c3|
+---+-----+------+----+
| id| name|course|year|
|  1|David|Hadoop|2018|
|  2| Raju| Scala|2017|
|  3|Priya| Spark|2015|
|  4|Kalai| Scala|2019|
|  5| Nila| Spark|2017|
+---+-----+------+----+

// with header information
scala> val df_Student = sqlContext.read.format("csv").option("header","true").load("/home/hadoop/Desktop/student.csv")

scala> df_Student.show()
+---+-----+------+----+
| id| name|course|year|
+---+-----+------+----+
|  1|David|Hadoop|2018|
|  2| Raju| Scala|2017|
|  3|Priya| Spark|2015|
|  4|Kalai| Scala|2019|
|  5| Nila| Spark|2017|
+---+-----+------+----+

// Here all columns as string
scala> df_Student.printSchema
root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- year: string (nullable = true)

// we added 'inferSchema'
scala> val df_Student = sqlContext.read.format("csv").option("header","true").option("inferSchema","true").load("/home/hadoop/Desktop/student.csv")

// now id,year columns are integer, and name,course are string columns
scala> df_Student.printSchema
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- year: integer (nullable = true)

//Transformation
scala> val filteredDF = df_Student.where("course='Scala'")
filteredDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 2 more fields]

scala> filteredDF.show()
+---+-----+------+----+
| id| name|course|year|
+---+-----+------+----+
|  2| Raju| Scala|2017|
|  4|Kalai| Scala|2019|
+---+-----+------+----+

scala> val filteredDF = df_Student.where("course='Scala' and year=2019")
filteredDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 2 more fields]

scala> filteredDF.show()
+---+-----+------+----+
| id| name|course|year|
+---+-----+------+----+
|  4|Kalai| Scala|2019|
+---+-----+------+----+

scala> filteredDF.write.format("json").save("file:///home/hadoop/Desktop/jsonOut2019")

hadoop@hadoop:~/Desktop/jsonOut2019$ cat part-00000-150f03cd-55fa-42f5-b55b-de5f030b84bb-c000.json
{"id":4,"name":"Kalai","course":"Scala","year":2019}


// Handling Tab separated files
Input
employee.csv:
---------------------
// Tab separated file
cat > employee.csv
id dept salary
101         testing 3000
102 dev           4000
103        hr              3500


scala> val dfEmployee = sqlContext.read.format("csv").option("header","true").option("inferSchema","true").load("/home/hadoop/Desktop/employee.csv")

// Tab separated file but the output is unorganized
scala> dfEmployee.show()
+----------------+
|  id dept salary|
+----------------+
|101 testing 3000|
|    102 dev 4000|
|     103 hr 3500|
+----------------+

// Schema doesn't have all the columns because we didn't specify TAB as separator
scala> dfEmployee.printSchema
root
 |-- id dept salary: string (nullable = true)


 // Here we specify delimiter as TAB
 scala> val df_employee = sqlContext.read.format("csv").
     | option("header","true").
     | option("inferSchema","true").
     | option("delimiter","\t").
     | load("/home/hadoop/Desktop/employee.csv")

scala> df_employee.printSchema
root
 |-- id: integer (nullable = true)
 |-- dept: string (nullable = true)
 |-- salary: integer (nullable = true)

scala> df_employee.show()
+---+-------+------+
| id|   dept|salary|
+---+-------+------+
|101|testing|  3000|
|102|    dev|  4000|
|103|     hr|  3500|
+---+-------+------+

scala> val filteredDF = df_employee.where("salary < 4000")
filteredDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, dept: string ... 1 more field]

scala> filteredDF.show()
+---+-------+------+
| id|   dept|salary|
+---+-------+------+
|101|testing|  3000|
|103|     hr|  3500|
+---+-------+------+

//Transformation : Filter with multiple conditions
scala> val filteredDF = df_employee.where("salary < 4000 and dept ='hr'")
filteredDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, dept: string ... 1 more field]

scala> filteredDF.show()
+---+----+------+
| id|dept|salary|
+---+----+------+
|103|  hr|  3500|
+---+----+------+

//Write into file system
scala> filteredDF.write.format("csv").save("/home/hadoop/Desktop/employeeCSVOut")

// display the output
cat part-00000-5659d6e3-6395-446a-a851-b18559f24d9f-c000.csv
103,hr,3500

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...