Populattion of each city
customer.txt:
-------------
1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521
2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126
3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,00725
4,Mary,Jones,XXXXXXXXX,XXXXXXXXX,8324 Little Common,San Marcos,CA,92069
5,Robert,Hudson,XXXXXXXXX,XXXXXXXXX,10 Crystal River Mall ,Caguas,PR,00725
6,Mary,Smith,XXXXXXXXX,XXXXXXXXX,3151 Sleepy Quail Promenade,Passaic,NJ,07055
7,Melissa,Wilcox,XXXXXXXXX,XXXXXXXXX,9453 High Concession,Caguas,PR,00725
8,Megan,Smith,XXXXXXXXX,XXXXXXXXX,3047 Foggy Forest Plaza,Lawrence,MA,01841
9,Mary,Perez,XXXXXXXXX,XXXXXXXXX,3616 Quaking Street,Caguas,PR,00725
10,Melissa,Smith,XXXXXXXXX,XXXXXXXXX,8598 Harvest Beacon Plaza,Stafford,VA,22554
val custRDD = sc.textFile("D:\\Ex\\customer.txt")
val cityRDD = custRDD.map ( x => (x.split(",")(6),1))
val cityCountRDD = cityRDD.reduceByKey(_+_)
val sortedRDD = cityCountRDD.sortBy(x => x._2, false)
sortedRDD.take(10).foreach(println)
scala> val custRDD = sc.textFile("D:\\Ex\\customer.txt")
custRDD: org.apache.spark.rdd.RDD[String] = D:\Ex\customer.txt MapPartitionsRDD[26] at textFile at <console>:24
scala> val cityRDD = custRDD.map ( x => (x.split(",")(6),1))
cityRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[27] at map at <console>:25
scala> val cityCountRDD = cityRDD.reduceByKey(_+_)
cityCountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[28] at reduceByKey at <console>:25
scala> val sortedRDD = cityCountRDD.sortBy(x => x._2, false)
sortedRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[33] at sortBy at <console>:25
scala> sortedRDD.take(10).foreach(println)
(Caguas,1152)
(Chicago,62)
(Los Angeles,59)
(Brooklyn,47)
(Bronx,28)
(Las Vegas,27)
(Houston,26)
(Phoenix,24)
(Philadelphia,24)
(San Diego,23)
No comments:
Post a Comment