Here pipe symbol as a separator
Input File content:
marks.txt:
---------
101|389
102|412
103|435
104|458
105|481
106|504
107|527
108|550
109|573
110|519
111|465
112|411
113|357
114|303
115|454
116|400
117|346
118|497
119|443
120|389
121|540
122|486
123|432
124|583
125|529
126|475
127|508
128|572
129|454
130|400
131|551
132|497
133|443
134|432
135|540
136|422
137|368
138|519
139|465
140|411
val studentMarks = sc.textFile("E:\\POCs\\marks.txt")
scala> studentMarks.take(5).foreach(println)
101|389
102|412
103|435
104|458
105|481
// pipe as a separator. so we need to escape it using "\\|"
scala> val mappedRDD = studentMarks.map (x => {
| val lines = x.toString().split("\\|")
| val RollNo = lines(0).toInt
| val Marks = lines(1).toInt
| (RollNo,Marks)
| })
mappedRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[13] at map at <console>:25
--------------- or -------------
// pipe as a separator. If we use '|' character we don't need to escape it
val mappedRDD = studentMarks.map (x => {
val lines = x.toString().split('|')
val RollNo = lines(0).toInt
val Marks = lines(1).toInt
(RollNo,Marks)
})
scala> mappedRDD.take(5).foreach(println)
(101,389)
(102,412)
(103,435)
(104,458)
(105,481)
//Make a Dataframe from existing RDD
scala> val df = mappedRDD.toDF("RollNo","Total")
df: org.apache.spark.sql.DataFrame = [RollNo: int, Total: int]
scala> df.printSchema
root
|-- RollNo: integer (nullable = false)
|-- Total: integer (nullable = false)
scala> df.show()
+------+-----+
|RollNo|Total|
+------+-----+
| 101| 389|
| 102| 412|
| 103| 435|
| 104| 458|
| 105| 481|
| 106| 504|
| 107| 527|
| 108| 550|
| 109| 573|
| 110| 519|
| 111| 465|
| 112| 411|
| 113| 357|
| 114| 303|
| 115| 454|
| 116| 400|
| 117| 346|
| 118| 497|
| 119| 443|
| 120| 389|
+------+-----+
only showing top 20 rows
// Rank calculation needs the following things to be imported
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window
scala> import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.functions.rank
scala> var ranked=df.withColumn("rank",rank().over(Window.orderBy($"Total".desc)))
ranked: org.apache.spark.sql.DataFrame = [RollNo: int, Total: int ... 1 more field]
Input File content:
marks.txt:
---------
101|389
102|412
103|435
104|458
105|481
106|504
107|527
108|550
109|573
110|519
111|465
112|411
113|357
114|303
115|454
116|400
117|346
118|497
119|443
120|389
121|540
122|486
123|432
124|583
125|529
126|475
127|508
128|572
129|454
130|400
131|551
132|497
133|443
134|432
135|540
136|422
137|368
138|519
139|465
140|411
val studentMarks = sc.textFile("E:\\POCs\\marks.txt")
scala> studentMarks.take(5).foreach(println)
101|389
102|412
103|435
104|458
105|481
// pipe as a separator. so we need to escape it using "\\|"
scala> val mappedRDD = studentMarks.map (x => {
| val lines = x.toString().split("\\|")
| val RollNo = lines(0).toInt
| val Marks = lines(1).toInt
| (RollNo,Marks)
| })
mappedRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[13] at map at <console>:25
--------------- or -------------
// pipe as a separator. If we use '|' character we don't need to escape it
val mappedRDD = studentMarks.map (x => {
val lines = x.toString().split('|')
val RollNo = lines(0).toInt
val Marks = lines(1).toInt
(RollNo,Marks)
})
scala> mappedRDD.take(5).foreach(println)
(101,389)
(102,412)
(103,435)
(104,458)
(105,481)
//Make a Dataframe from existing RDD
scala> val df = mappedRDD.toDF("RollNo","Total")
df: org.apache.spark.sql.DataFrame = [RollNo: int, Total: int]
scala> df.printSchema
root
|-- RollNo: integer (nullable = false)
|-- Total: integer (nullable = false)
scala> df.show()
+------+-----+
|RollNo|Total|
+------+-----+
| 101| 389|
| 102| 412|
| 103| 435|
| 104| 458|
| 105| 481|
| 106| 504|
| 107| 527|
| 108| 550|
| 109| 573|
| 110| 519|
| 111| 465|
| 112| 411|
| 113| 357|
| 114| 303|
| 115| 454|
| 116| 400|
| 117| 346|
| 118| 497|
| 119| 443|
| 120| 389|
+------+-----+
only showing top 20 rows
// Rank calculation needs the following things to be imported
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window
scala> import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.functions.rank
scala> var ranked=df.withColumn("rank",rank().over(Window.orderBy($"Total".desc)))
ranked: org.apache.spark.sql.DataFrame = [RollNo: int, Total: int ... 1 more field]
No comments:
Post a Comment