input data:
----------
customerID, itemID, amount
44,8602,37.19
35,5368,65.89
2,3391,40.64
47,6694,14.98
29,680,13.08
91,8900,24.59
scala> val input = sc.textFile("E:\\POCs\\customer-orders.csv")
input: org.apache.spark.rdd.RDD[String] = E:\POCs\customer-orders.csv MapPartitionsRDD[12] at textFile a
scala> val mappedInput = input.map(line => {
| val fields = line.split(",")
| val customerID = fields(0).toInt
| val amount = fields(2).toFloat
| (customerID,amount )
| })
mappedInput: org.apache.spark.rdd.RDD[(Int, Float)] = MapPartitionsRDD[13] at map at <console>:25
scala> mappedInput.take(5).foreach(println)
(44,37.19)
(35,65.89)
(2,40.64)
(47,14.98)
(29,13.08)
scala> val totalByCustomer = mappedInput.reduceByKey( (x,y) => x+y)
totalByCustomer: org.apache.spark.rdd.RDD[(Int, Float)] = ShuffledRDD[14] at reduceByKey at <console>:25
scala> totalByCustomer.take(5).foreach(println)
(34,5330.7993)
(52,5245.0605)
(96,3924.23)
(4,4815.05)
(16,4979.0605)
scala> totalByCustomer.collect.foreach(println)
(34,5330.7993)
(52,5245.0605)
(96,3924.23)
(4,4815.05)
(16,4979.0605)
(82,4812.49)
(66,4681.92)
(28,5000.7104)
(54,6065.39)
(80,4727.86)
(98,4297.26)
(30,4990.72)
(14,4735.0303)
(50,4517.2695)
(36,4278.05)
(24,5259.92)
(64,5288.69)
(92,5379.281)
(74,4647.1304)
(90,5290.41)
(72,5337.4395)
(70,5368.2505)
(18,4921.27)
(12,4664.59)
(38,4898.461)
(20,4836.86)
(78,4524.51)
(10,4819.6997)
(94,4475.5703)
(84,4652.9395)
(56,4701.02)
(76,4904.2104)
(22,5019.449)
(46,5963.111)
(48,4384.3296)
(32,5496.0503)
(0,5524.9497)
(62,5253.3213)
(42,5696.8403)
(40,5186.4297)
(6,5397.8794)
(8,5517.24)
(86,4908.809)
(58,5437.7305)
(44,4756.8906)
(88,4830.55)
(60,5040.7095)
(26,5250.4004)
(68,6375.45)
(2,5994.591)
(13,4367.62)
(19,5059.4307)
(39,6193.1104)
(81,5112.71)
(71,5995.66)
(55,5298.09)
(29,5032.5303)
(79,3790.5698)
(65,5140.3496)
(11,5152.29)
(35,5155.42)
(57,4628.3994)
(51,4975.2197)
(37,4735.2)
(75,4178.5)
(45,3309.3804)
(1,4958.5996)
(89,4851.4795)
(63,5415.15)
(83,4635.8003)
(17,5032.6797)
(9,5322.6494)
(49,4394.5996)
(43,5368.83)
(99,4172.29)
(41,5637.619)
(61,5497.48)
(15,5413.5103)
(21,4707.41)
(47,4316.3)
(77,4327.7305)
(53,4945.3)
(97,5977.1895)
(25,5057.6104)
(95,4876.8394)
(59,5642.8906)
(73,6206.199)
(27,4915.8896)
(93,5265.75)
(33,5254.659)
(23,4042.65)
(67,4505.79)
(69,5123.01)
(3,4659.63)
(7,4755.0693)
(85,5503.4307)
(91,4642.2603)
(31,4765.05)
(87,5206.3994)
(5,4561.0703)
scala> val flipped = totalByCustomer.map( x => (x._2, x._1) )
flipped: org.apache.spark.rdd.RDD[(Float, Int)] = MapPartitionsRDD[15] at map at <console>:25
scala> flipped.take(5).foreach(println)
(5330.7993,34)
(5245.0605,52)
(3924.23,96)
(4815.05,4)
(4979.0605,16)
scala> val totalByCustomerSorted = flipped.sortByKey()
totalByCustomerSorted: org.apache.spark.rdd.RDD[(Float, Int)] = ShuffledRDD[18] at sortByKey at <console>:25
scala> totalByCustomerSorted.take(5).foreach(println)
(3309.3804,45)
(3790.5698,79)
(3924.23,96)
(4042.65,23)
(4172.29,99)
scala> val results = totalByCustomerSorted.collect()
results: Array[(Float, Int)] = Array((3309.3804,45), (3790.5698,79), (3924.23,96), (4042.65,23), (4172.29,99), (4178.5,75), (4278.05,36), (4297.26,98)
, (4316.3,47), (4327.7305,77), (4367.62,13), (4384.3296,48), (4394.5996,49), (4475.5703,94), (4505.79,67), (4517.2695,50), (4524.51,78), (4561.0703,5)
, (4628.3994,57), (4635.8003,83), (4642.2603,91), (4647.1304,74), (4652.9395,84), (4659.63,3), (4664.59,12), (4681.92,66), (4701.02,56), (4707.41,21),
(4727.86,80), (4735.0303,14), (4735.2,37), (4755.0693,7), (4756.8906,44), (4765.05,31), (4812.49,82), (4815.05,4), (4819.6997,10), (4830.55,88), (483
6.86,20), (4851.4795,89), (4876.8394,95), (4898.461,38), (4904.2104,76), (4908.809,86), (4915.8896,27), (4921.27,18), (4945.3,53), (4958.5996,1), (497
5.2197,51), (4979.0605,16), (4990.72,30), (5000...
scala> results.foreach(println)
(3309.3804,45)
(3790.5698,79)
(3924.23,96)
(4042.65,23)
(4172.29,99)
(4178.5,75)
(4278.05,36)
(4297.26,98)
(4316.3,47)
(4327.7305,77)
(4367.62,13)
(4384.3296,48)
(4394.5996,49)
(4475.5703,94)
(4505.79,67)
(4517.2695,50)
(4524.51,78)
(4561.0703,5)
(4628.3994,57)
(4635.8003,83)
(4642.2603,91)
(4647.1304,74)
(4652.9395,84)
(4659.63,3)
(4664.59,12)
(4681.92,66)
(4701.02,56)
(4707.41,21)
(4727.86,80)
(4735.0303,14)
(4735.2,37)
(4755.0693,7)
(4756.8906,44)
(4765.05,31)
(4812.49,82)
(4815.05,4)
(4819.6997,10)
(4830.55,88)
(4836.86,20)
(4851.4795,89)
(4876.8394,95)
(4898.461,38)
(4904.2104,76)
(4908.809,86)
(4915.8896,27)
(4921.27,18)
(4945.3,53)
(4958.5996,1)
(4975.2197,51)
(4979.0605,16)
(4990.72,30)
(5000.7104,28)
(5019.449,22)
(5032.5303,29)
(5032.6797,17)
(5040.7095,60)
(5057.6104,25)
(5059.4307,19)
(5112.71,81)
(5123.01,69)
(5140.3496,65)
(5152.29,11)
(5155.42,35)
(5186.4297,40)
(5206.3994,87)
(5245.0605,52)
(5250.4004,26)
(5253.3213,62)
(5254.659,33)
(5259.92,24)
(5265.75,93)
(5288.69,64)
(5290.41,90)
(5298.09,55)
(5322.6494,9)
(5330.7993,34)
(5337.4395,72)
(5368.2505,70)
(5368.83,43)
(5379.281,92)
(5397.8794,6)
(5413.5103,15)
(5415.15,63)
(5437.7305,58)
(5496.0503,32)
(5497.48,61)
(5503.4307,85)
(5517.24,8)
(5524.9497,0)
(5637.619,41)
(5642.8906,59)
(5696.8403,42)
(5963.111,46)
(5977.1895,97)
(5994.591,2)
(5995.66,71)
(6065.39,54)
(6193.1104,39)
(6206.199,73)
(6375.45,68)
Subscribe to:
Post Comments (Atom)
Flume - Simple Demo
// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...
-
How to fetch Spark Application Id programmaticall while running the Spark Job? scala> spark.sparkContext.applicationId res124: String = l...
-
input data: ---------- customerID, itemID, amount 44,8602,37.19 35,5368,65.89 2,3391,40.64 47,6694,14.98 29,680,13.08 91,8900,24.59 ...
-
pattern matching is similar to switch statements in C#, Java no fall-through - at least one condition matched no breaks object PatternExa { ...
No comments:
Post a Comment