Tuesday, 15 January 2019

Spark - mapValues Example

Key / Value RDDs
----------------
Friends By Age


scala> val myList = List( (33,385),(33,2),(55,221),(55,250),(30,534),(30,500))
myList: List[(Int, Int)] = List((33,385), (33,2), (55,221), (55,250), (30,534), (30,500))

scala> val rdd = sc.makeRDD(myList)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[39] at makeRDD at <console>:26

scala> val result = rdd.mapValues(x => (x,1))
result: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[40] at mapValues at <console>:25

scala> result.collect.foreach(println)
(33,(385,1))
(33,(2,1))
(55,(221,1))
(55,(250,1))
(30,(534,1))
(30,(500,1))

scala> val res = result.reduceByKey ( (x,y) => (x._1+y._1, x._2+y._2))
res: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = ShuffledRDD[41] at reduceByKey at <console>:25

scala> res.collect.foreach(println)
(33,(387,2))
(30,(1034,2))
(55,(471,2))

scala> val fi = res.mapValues(x => x._1 / x._2)
fi: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[42] at mapValues at <console>:25

scala> fi.foreach(println)
(55,235)
(30,517)
(33,193)




Key : Age, Value : Number of friends
Input Data:
-----------
id,name,age,numberOfFriends
0,Will,33,385
1,Jean-Luc,33,2
2,Hugh,55,221
3,Deanna,40,465
4,Quark,68,21

//Parsing - Mapping
//Extract necessary fields from each line
(age,numberOfFriends)
(33,385)
(33,2)
(55,221)
(40,465)
(68,21)

//Add 1

(33,(385,1))
(33,(2,1))
(55,(221,1))
(40,(465,1))
(68,(21,1))

//Count up sum of friends and number of entries per age
reduceByKey( (x,y) => x._1+y._1, x._2 + y._2)

(33,(387,2))

val averagesByAge = totalsByAge.map(x => x._1  / x._2)
(33,(387,2)) => (33,193.5)

results.sorted.foreach(println)



scala> val lines = sc.textFile("E:\\POCs\\fakefriends.csv")
lines: org.apache.spark.rdd.RDD[String] = E:\POCs\fakefriends.csv MapPartitionsRDD[34] at textFile at <console>:24

scala> val rdd = lines.map(line => {
     |       // Split by commas
     |       val fields = line.split(",")
     |       // Extract the age and numFriends fields, and convert to integers
     |       val age = fields(2).toInt
     |       val numFriends = fields(3).toInt
     |       // Create a tuple that is our result.
     |       (age, numFriends)
     |   }
     | )
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[35] at map at <console>:25

scala> rdd.take(5)
res23: Array[(Int, Int)] = Array((33,385), (26,2), (55,221), (40,465), (68,21))


scala> val totalsByAge = rdd.mapValues(x => (x,1)).reduceByKey( (x,y) => (x._1+y._1,x._2+y._2))
totalsByAge: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = ShuffledRDD[37] at reduceByKey at <console>:25

scala> totalsByAge.take(5)
res24: Array[(Int, (Int, Int))] = Array((34,(1473,6)), (52,(3747,11)), (56,(1840,6)), (66,(2488,9)), (22,(1445,7)))


scala>     val averagesByAge = totalsByAge.mapValues(x => x._1 / x._2)
averagesByAge: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[38] at mapValues at <console>:25

scala> averagesByAge.take(5)
res25: Array[(Int, Int)] = Array((34,245), (52,340), (56,306), (66,276), (22,206))



No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...