// Handling Comma Separated Files
Input
student.csv:
----------------
cat > student.csv
id,name,course,year
1,David,Hadoop,2018
2,Raju,Scala,2017
3,Priya,Spark,2015
4,Kalai,Scala,2019
5,Nila,Spark,2017
6,Arivu,Scala,2019^C
scala> val df_Student = sqlContext.read.format("csv").load("/home/hadoop/Desktop/student.csv")
// Header information is missing but autogenerated column names : _c0, _c1, _c2, _c3 are present there
scala> df_Student.show()
+---+-----+------+----+
|_c0| _c1| _c2| _c3|
+---+-----+------+----+
| id| name|course|year|
| 1|David|Hadoop|2018|
| 2| Raju| Scala|2017|
| 3|Priya| Spark|2015|
| 4|Kalai| Scala|2019|
| 5| Nila| Spark|2017|
+---+-----+------+----+
// with header information
scala> val df_Student = sqlContext.read.format("csv").option("header","true").load("/home/hadoop/Desktop/student.csv")
scala> df_Student.show()
+---+-----+------+----+
| id| name|course|year|
+---+-----+------+----+
| 1|David|Hadoop|2018|
| 2| Raju| Scala|2017|
| 3|Priya| Spark|2015|
| 4|Kalai| Scala|2019|
| 5| Nila| Spark|2017|
+---+-----+------+----+
// Here all columns as string
scala> df_Student.printSchema
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- course: string (nullable = true)
|-- year: string (nullable = true)
// we added 'inferSchema'
scala> val df_Student = sqlContext.read.format("csv").option("header","true").option("inferSchema","true").load("/home/hadoop/Desktop/student.csv")
// now id,year columns are integer, and name,course are string columns
scala> df_Student.printSchema
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- course: string (nullable = true)
|-- year: integer (nullable = true)
//Transformation
scala> val filteredDF = df_Student.where("course='Scala'")
filteredDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 2 more fields]
scala> filteredDF.show()
+---+-----+------+----+
| id| name|course|year|
+---+-----+------+----+
| 2| Raju| Scala|2017|
| 4|Kalai| Scala|2019|
+---+-----+------+----+
scala> val filteredDF = df_Student.where("course='Scala' and year=2019")
filteredDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 2 more fields]
scala> filteredDF.show()
+---+-----+------+----+
| id| name|course|year|
+---+-----+------+----+
| 4|Kalai| Scala|2019|
+---+-----+------+----+
scala> filteredDF.write.format("json").save("file:///home/hadoop/Desktop/jsonOut2019")
hadoop@hadoop:~/Desktop/jsonOut2019$ cat part-00000-150f03cd-55fa-42f5-b55b-de5f030b84bb-c000.json
{"id":4,"name":"Kalai","course":"Scala","year":2019}
// Handling Tab separated files
Input
employee.csv:
---------------------
// Tab separated file
cat > employee.csv
id dept salary
101 testing 3000
102 dev 4000
103 hr 3500
scala> val dfEmployee = sqlContext.read.format("csv").option("header","true").option("inferSchema","true").load("/home/hadoop/Desktop/employee.csv")
// Tab separated file but the output is unorganized
scala> dfEmployee.show()
+----------------+
| id dept salary|
+----------------+
|101 testing 3000|
| 102 dev 4000|
| 103 hr 3500|
+----------------+
// Schema doesn't have all the columns because we didn't specify TAB as separator
scala> dfEmployee.printSchema
root
|-- id dept salary: string (nullable = true)
// Here we specify delimiter as TAB
scala> val df_employee = sqlContext.read.format("csv").
| option("header","true").
| option("inferSchema","true").
| option("delimiter","\t").
| load("/home/hadoop/Desktop/employee.csv")
scala> df_employee.printSchema
root
|-- id: integer (nullable = true)
|-- dept: string (nullable = true)
|-- salary: integer (nullable = true)
scala> df_employee.show()
+---+-------+------+
| id| dept|salary|
+---+-------+------+
|101|testing| 3000|
|102| dev| 4000|
|103| hr| 3500|
+---+-------+------+
scala> val filteredDF = df_employee.where("salary < 4000")
filteredDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, dept: string ... 1 more field]
scala> filteredDF.show()
+---+-------+------+
| id| dept|salary|
+---+-------+------+
|101|testing| 3000|
|103| hr| 3500|
+---+-------+------+
//Transformation : Filter with multiple conditions
scala> val filteredDF = df_employee.where("salary < 4000 and dept ='hr'")
filteredDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, dept: string ... 1 more field]
scala> filteredDF.show()
+---+----+------+
| id|dept|salary|
+---+----+------+
|103| hr| 3500|
+---+----+------+
//Write into file system
scala> filteredDF.write.format("csv").save("/home/hadoop/Desktop/employeeCSVOut")
// display the output
cat part-00000-5659d6e3-6395-446a-a851-b18559f24d9f-c000.csv
103,hr,3500
Subscribe to:
Post Comments (Atom)
Flume - Simple Demo
// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...
-
How to fetch Spark Application Id programmaticall while running the Spark Job? scala> spark.sparkContext.applicationId res124: String = l...
-
input data: ---------- customerID, itemID, amount 44,8602,37.19 35,5368,65.89 2,3391,40.64 47,6694,14.98 29,680,13.08 91,8900,24.59 ...
-
pattern matching is similar to switch statements in C#, Java no fall-through - at least one condition matched no breaks object PatternExa { ...
No comments:
Post a Comment