Monday, 10 August 2020

Customers and Orders - Table Joining using Spark RDD Example

scala> val custRDD = sc.textFile("hdfs://localhost:8020/user/data/customers/")
custRDD: org.apache.spark.rdd.RDD[String] = hdfs://localhost:8020/user/data/customers/ MapPartitionsRDD[7] at textFile at <console>:27

scala> custRDD.take(5).foreach(println)
1 Richard Hernandez Brownsville
2 Mary Barrett Littleton
3 Ann Smith Caguas
4 Mary Jones San Marcos
5 Robert Hudson Caguas

scala> custRDD.count()
res4: Long = 12435



scala> val ordersRDD = sc.textFile("hdfs://localhost:8020/user/data/orders/")
ordersRDD: org.apache.spark.rdd.RDD[String] = hdfs://localhost:8020/user/data/orders/ MapPartitionsRDD[9] at textFile at <console>:27

scala> ordersRDD.take(5).foreach(println)
1 2013-07-25 00:00:00.0 11599 CLOSED
2 2013-07-25 00:00:00.0 256 PENDING_PAYMENT
3 2013-07-25 00:00:00.0 12111 COMPLETE
4 2013-07-25 00:00:00.0 8827 CLOSED
5 2013-07-25 00:00:00.0 11318 COMPLETE


scala> ordersRDD.count()
res5: Long = 68893


scala> val orderTuple = ordersRDD.map (line => (line.split("\t")(2),1))
orderTuple: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[10] at map at <console>:29

scala> orderTuple.take(5)
res9: Array[(String, Int)] = Array((11599,1), (256,1), (12111,1), (8827,1), (11318,1))

scala> orderTuple.take(5).foreach(println)
(11599,1)
(256,1)
(12111,1)
(8827,1)
(11318,1)

 

scala> val reducedRDD = orderTuple.reduceByKey(_+_)
reducedRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at reduceByKey at <console>:31

scala> reducedRDD.take(5).foreach(println)
(2828,4)                                                                        
(2350,5)
(8602,8)
(11415,4)
(5302,10)

val custTuple = custRDD.map { line => \
val w = line.split("\t")  
val custid = w(0)  
val cfname = w(1)  
val clname = w(2)  
(custid,cfname + " " + clname) 
}
custTuple.take(5).foreach(println)

scala> custTuple.take(5).foreach(println)
(1,Richard Hernandez)
(2,Mary Barrett)
(3,Ann Smith)
(4,Mary Jones)
(5,Robert Hudson)

scala> val joinedRDD = custTuple.join(reducedRDD)
joinedRDD: org.apache.spark.rdd.RDD[(String, (String, Int))] = MapPartitionsRDD[15] at join at <console>:37

scala> joinedRDD.take(5).foreach(println)
(2828,(Mary Smith,4))
(2350,(Brian Perez,5))
(8602,(Timothy Gilbert,8))
(11415,(George Smith,4))
(5302,(Betty Smith,10))


scala> val sortedRDD = joinedRDD.sortBy(_._2._2,false)
sortedRDD: org.apache.spark.rdd.RDD[(String, (String, Int))] = MapPartitionsRDD[20] at sortBy at <console>:39

scala> sortedRDD.take(5).foreach(println)
(569,(Mary Frye,16))
(5897,(Mary Griffin,16))
(12431,(Mary Rios,16))
(6316,(Kyle Smith,16))
(5654,(Jerry Smith,15))


No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...