Key / Value RDDs
----------------
Friends By Age
scala> val myList = List( (33,385),(33,2),(55,221),(55,250),(30,534),(30,500))
myList: List[(Int, Int)] = List((33,385), (33,2), (55,221), (55,250), (30,534), (30,500))
scala> val rdd = sc.makeRDD(myList)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[39] at makeRDD at <console>:26
scala> val result = rdd.mapValues(x => (x,1))
result: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[40] at mapValues at <console>:25
scala> result.collect.foreach(println)
(33,(385,1))
(33,(2,1))
(55,(221,1))
(55,(250,1))
(30,(534,1))
(30,(500,1))
scala> val res = result.reduceByKey ( (x,y) => (x._1+y._1, x._2+y._2))
res: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = ShuffledRDD[41] at reduceByKey at <console>:25
scala> res.collect.foreach(println)
(33,(387,2))
(30,(1034,2))
(55,(471,2))
scala> val fi = res.mapValues(x => x._1 / x._2)
fi: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[42] at mapValues at <console>:25
scala> fi.foreach(println)
(55,235)
(30,517)
(33,193)
Key : Age, Value : Number of friends
Input Data:
-----------
id,name,age,numberOfFriends
0,Will,33,385
1,Jean-Luc,33,2
2,Hugh,55,221
3,Deanna,40,465
4,Quark,68,21
//Parsing - Mapping
//Extract necessary fields from each line
(age,numberOfFriends)
(33,385)
(33,2)
(55,221)
(40,465)
(68,21)
//Add 1
(33,(385,1))
(33,(2,1))
(55,(221,1))
(40,(465,1))
(68,(21,1))
//Count up sum of friends and number of entries per age
reduceByKey( (x,y) => x._1+y._1, x._2 + y._2)
(33,(387,2))
val averagesByAge = totalsByAge.map(x => x._1 / x._2)
(33,(387,2)) => (33,193.5)
results.sorted.foreach(println)
scala> val lines = sc.textFile("E:\\POCs\\fakefriends.csv")
lines: org.apache.spark.rdd.RDD[String] = E:\POCs\fakefriends.csv MapPartitionsRDD[34] at textFile at <console>:24
scala> val rdd = lines.map(line => {
| // Split by commas
| val fields = line.split(",")
| // Extract the age and numFriends fields, and convert to integers
| val age = fields(2).toInt
| val numFriends = fields(3).toInt
| // Create a tuple that is our result.
| (age, numFriends)
| }
| )
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[35] at map at <console>:25
scala> rdd.take(5)
res23: Array[(Int, Int)] = Array((33,385), (26,2), (55,221), (40,465), (68,21))
scala> val totalsByAge = rdd.mapValues(x => (x,1)).reduceByKey( (x,y) => (x._1+y._1,x._2+y._2))
totalsByAge: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = ShuffledRDD[37] at reduceByKey at <console>:25
scala> totalsByAge.take(5)
res24: Array[(Int, (Int, Int))] = Array((34,(1473,6)), (52,(3747,11)), (56,(1840,6)), (66,(2488,9)), (22,(1445,7)))
scala> val averagesByAge = totalsByAge.mapValues(x => x._1 / x._2)
averagesByAge: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[38] at mapValues at <console>:25
scala> averagesByAge.take(5)
res25: Array[(Int, Int)] = Array((34,245), (52,340), (56,306), (66,276), (22,206))
Subscribe to:
Post Comments (Atom)
Flume - Simple Demo
// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...
-
How to fetch Spark Application Id programmaticall while running the Spark Job? scala> spark.sparkContext.applicationId res124: String = l...
-
input data: ---------- customerID, itemID, amount 44,8602,37.19 35,5368,65.89 2,3391,40.64 47,6694,14.98 29,680,13.08 91,8900,24.59 ...
-
pattern matching is similar to switch statements in C#, Java no fall-through - at least one condition matched no breaks object PatternExa { ...
No comments:
Post a Comment