Friday, 25 January 2019

Rank Calculation in Dataframe using Spark with Scala

Here pipe symbol as a separator
Input File content:
marks.txt:
---------

101|389
102|412
103|435
104|458
105|481
106|504
107|527
108|550
109|573
110|519
111|465
112|411
113|357
114|303
115|454
116|400
117|346
118|497
119|443
120|389
121|540
122|486
123|432
124|583
125|529
126|475
127|508
128|572
129|454
130|400
131|551
132|497
133|443
134|432
135|540
136|422
137|368
138|519
139|465
140|411


val studentMarks = sc.textFile("E:\\POCs\\marks.txt")

scala> studentMarks.take(5).foreach(println)
101|389
102|412
103|435
104|458
105|481

// pipe as a separator. so we need to escape it using "\\|"
 scala> val mappedRDD = studentMarks.map (x => {
     |  val lines = x.toString().split("\\|")
     |  val RollNo = lines(0).toInt
     |  val Marks = lines(1).toInt
     |  (RollNo,Marks)
     |  })
mappedRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[13] at map at <console>:25

--------------- or -------------
// pipe as a separator. If we use '|' character we don't need to escape it
 val mappedRDD = studentMarks.map (x => {
      val lines = x.toString().split('|')
      val RollNo = lines(0).toInt
      val Marks = lines(1).toInt
      (RollNo,Marks)
      })
 
scala> mappedRDD.take(5).foreach(println)
(101,389)
(102,412)
(103,435)
(104,458)
(105,481)

//Make a Dataframe from existing RDD
scala> val df = mappedRDD.toDF("RollNo","Total")
df: org.apache.spark.sql.DataFrame = [RollNo: int, Total: int]

scala> df.printSchema
root
 |-- RollNo: integer (nullable = false)
 |-- Total: integer (nullable = false)


scala> df.show()
+------+-----+
|RollNo|Total|
+------+-----+
|   101|  389|
|   102|  412|
|   103|  435|
|   104|  458|
|   105|  481|
|   106|  504|
|   107|  527|
|   108|  550|
|   109|  573|
|   110|  519|
|   111|  465|
|   112|  411|
|   113|  357|
|   114|  303|
|   115|  454|
|   116|  400|
|   117|  346|
|   118|  497|
|   119|  443|
|   120|  389|
+------+-----+
only showing top 20 rows

 // Rank calculation needs the following things to be imported
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.functions.rank

scala> var ranked=df.withColumn("rank",rank().over(Window.orderBy($"Total".desc)))
ranked: org.apache.spark.sql.DataFrame = [RollNo: int, Total: int ... 1 more field]




No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...