scala> val custRDD = sc.textFile("hdfs://localhost:8020/user/data/customers/")
custRDD: org.apache.spark.rdd.RDD[String] = hdfs://localhost:8020/user/data/customers/ MapPartitionsRDD[7] at textFile at <console>:27
scala> custRDD.take(5).foreach(println)
1 Richard Hernandez Brownsville
2 Mary Barrett Littleton
3 Ann Smith Caguas
4 Mary Jones San Marcos
5 Robert Hudson Caguas
scala> custRDD.count()
res4: Long = 12435
scala> val ordersRDD = sc.textFile("hdfs://localhost:8020/user/data/orders/")
ordersRDD: org.apache.spark.rdd.RDD[String] = hdfs://localhost:8020/user/data/orders/ MapPartitionsRDD[9] at textFile at <console>:27
scala> ordersRDD.take(5).foreach(println)
1 2013-07-25 00:00:00.0 11599 CLOSED
2 2013-07-25 00:00:00.0 256 PENDING_PAYMENT
3 2013-07-25 00:00:00.0 12111 COMPLETE
4 2013-07-25 00:00:00.0 8827 CLOSED
5 2013-07-25 00:00:00.0 11318 COMPLETE
scala> ordersRDD.count()
res5: Long = 68893
scala> val orderTuple = ordersRDD.map (line => (line.split("\t")(2),1))
orderTuple: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[10] at map at <console>:29
scala> orderTuple.take(5)
res9: Array[(String, Int)] = Array((11599,1), (256,1), (12111,1), (8827,1), (11318,1))
scala> orderTuple.take(5).foreach(println)
(11599,1)
(256,1)
(12111,1)
(8827,1)
(11318,1)
scala> val reducedRDD = orderTuple.reduceByKey(_+_)
reducedRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at reduceByKey at <console>:31
scala> reducedRDD.take(5).foreach(println)
(2828,4)
(2350,5)
(8602,8)
(11415,4)
(5302,10)
val custTuple = custRDD.map { line => \
val w = line.split("\t")
val custid = w(0)
val cfname = w(1)
val clname = w(2)
(custid,cfname + " " + clname)
}
custTuple.take(5).foreach(println)
scala> custTuple.take(5).foreach(println)
(1,Richard Hernandez)
(2,Mary Barrett)
(3,Ann Smith)
(4,Mary Jones)
(5,Robert Hudson)
scala> val joinedRDD = custTuple.join(reducedRDD)
joinedRDD: org.apache.spark.rdd.RDD[(String, (String, Int))] = MapPartitionsRDD[15] at join at <console>:37
scala> joinedRDD.take(5).foreach(println)
(2828,(Mary Smith,4))
(2350,(Brian Perez,5))
(8602,(Timothy Gilbert,8))
(11415,(George Smith,4))
(5302,(Betty Smith,10))
scala> val sortedRDD = joinedRDD.sortBy(_._2._2,false)
sortedRDD: org.apache.spark.rdd.RDD[(String, (String, Int))] = MapPartitionsRDD[20] at sortBy at <console>:39
scala> sortedRDD.take(5).foreach(println)
(569,(Mary Frye,16))
(5897,(Mary Griffin,16))
(12431,(Mary Rios,16))
(6316,(Kyle Smith,16))
(5654,(Jerry Smith,15))
No comments:
Post a Comment