Input Data in a file : colllogdata.txt
--------------------------------------
ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:05DROPPED 80526900577757919463
ec59cea2-5006-448f-a032-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:07DROPPED 98861773759916790556
ec59cea2-5006-448f-a033-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:45SUCCESS 86186279969886177375
ec59cea2-5006-448f-a034-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:53DROPPED 98765156164894949494
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED 54545454546469496477
ec59cea2-5006-448f-a036-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:05SUCCESS 12354678902153698431
ec59cea2-5006-448f-a037-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80556456458478787877
ec59cea2-5006-448f-a038-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80809056095676236992
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED 44554584848449644469
ec59cea2-5006-448f-a040-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 96090652158087080806
ec59cea2-5006-448f-a041-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 89797946465879874615
ec59cea2-5006-448f-a042-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05SUCCESS 45454545457978978979
ec59cea2-5006-448f-a043-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 74584564564564564656
ec59cea2-5006-448f-a044-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 98794894945648947898
ec59cea2-5006-448f-a045-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05SUCCESS 84645645605646064646
ec59cea2-5006-448f-a046-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 78545456456456456456
ec59cea2-5006-448f-a047-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 57554548979797979797
ec59cea2-5006-448f-a048-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 87898640989489089409
ec59cea2-5006-448f-a049-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 75884848478978978979
ec59cea2-5006-448f-a050-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 74894086489489489489
scala> val rd1 = sc.textFile("E:\\POCs\\calllogdata.txt")
rd1: org.apache.spark.rdd.RDD[String] = E:\POCs\calllogdata.txt MapPartitionsRDD[1] at textFile at <console>:24
// regular expression to extract status from the line
// regular expression to extract 2 different phone numbers from the line
scala> val result_rdd = rd1.map( x => {
| val status = pattern_status.findFirstIn(x).get
| val pnos = pattern_phnos.findFirstIn(x).get
| val phno1 = pnos.slice(0,10)
| val phno2 = pnos.slice(10,20)
| (status,phno1,phno2)
| })
result_rdd: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[2] at map at <console>:33
scala> result_rdd.collect.foreach(println)
(DROPPED,8052690057,7757919463)
(DROPPED,9886177375,9916790556)
(SUCCESS,8618627996,9886177375)
(DROPPED,9876515616,4894949494)
(FAILED,5454545454,6469496477)
(SUCCESS,1235467890,2153698431)
(DROPPED,8055645645,8478787877)
(DROPPED,8080905609,5676236992)
(FAILED,4455458484,8449644469)
(DROPPED,9609065215,8087080806)
(DROPPED,8979794646,5879874615)
(SUCCESS,4545454545,7978978979)
(DROPPED,7458456456,4564564656)
(DROPPED,9879489494,5648947898)
(SUCCESS,8464564560,5646064646)
(DROPPED,7854545645,6456456456)
(DROPPED,5755454897,9797979797)
(SUCCESS,8789864098,9489089409)
(SUCCESS,7588484847,8978978979)
(SUCCESS,7489408648,9489489489)
scala> val rdd_dropped = result_rdd.map ( x => {
| val status = x._1 == "DROPPED"
| val ph1 = x._2
| val ph2 = x._3
| (status,ph1,ph2)
| })
rdd_dropped: org.apache.spark.rdd.RDD[(Boolean, String, String)] = MapPartitionsRDD[3] at map at <console>:31
scala> rdd_dropped.collect.foreach(println)
(true,8052690057,7757919463)
(true,9886177375,9916790556)
(false,8618627996,9886177375)
(true,9876515616,4894949494)
(false,5454545454,6469496477)
(false,1235467890,2153698431)
(true,8055645645,8478787877)
(true,8080905609,5676236992)
(false,4455458484,8449644469)
(true,9609065215,8087080806)
(true,8979794646,5879874615)
(false,4545454545,7978978979)
(true,7458456456,4564564656)
(true,9879489494,5648947898)
(false,8464564560,5646064646)
(true,7854545645,6456456456)
(true,5755454897,9797979797)
(false,8789864098,9489089409)
(false,7588484847,8978978979)
(false,7489408648,9489489489)
scala> val rdd_dropped = result_rdd.map ( x => {
val status = x._1 == "DROPPED"
val ph1 = x._2
val ph2 = x._3
if (status == true) { (status,ph1,ph2) }
})
scala> rdd_dropped.collect.foreach(println)
(true,8052690057,7757919463)
(true,9886177375,9916790556)
()
(true,9876515616,4894949494)
()
()
(true,8055645645,8478787877)
(true,8080905609,5676236992)
()
(true,9609065215,8087080806)
(true,8979794646,5879874615)
()
(true,7458456456,4564564656)
(true,9879489494,5648947898)
()
(true,7854545645,6456456456)
(true,5755454897,9797979797)
()
()
()
--------------------------------------
ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:05DROPPED 80526900577757919463
ec59cea2-5006-448f-a032-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:07DROPPED 98861773759916790556
ec59cea2-5006-448f-a033-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:45SUCCESS 86186279969886177375
ec59cea2-5006-448f-a034-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:53DROPPED 98765156164894949494
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED 54545454546469496477
ec59cea2-5006-448f-a036-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:05SUCCESS 12354678902153698431
ec59cea2-5006-448f-a037-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80556456458478787877
ec59cea2-5006-448f-a038-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80809056095676236992
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED 44554584848449644469
ec59cea2-5006-448f-a040-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 96090652158087080806
ec59cea2-5006-448f-a041-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 89797946465879874615
ec59cea2-5006-448f-a042-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05SUCCESS 45454545457978978979
ec59cea2-5006-448f-a043-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 74584564564564564656
ec59cea2-5006-448f-a044-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 98794894945648947898
ec59cea2-5006-448f-a045-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05SUCCESS 84645645605646064646
ec59cea2-5006-448f-a046-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 78545456456456456456
ec59cea2-5006-448f-a047-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 57554548979797979797
ec59cea2-5006-448f-a048-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 87898640989489089409
ec59cea2-5006-448f-a049-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 75884848478978978979
ec59cea2-5006-448f-a050-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 74894086489489489489
scala> val rd1 = sc.textFile("E:\\POCs\\calllogdata.txt")
rd1: org.apache.spark.rdd.RDD[String] = E:\POCs\calllogdata.txt MapPartitionsRDD[1] at textFile at <console>:24
// regular expression to extract status from the line
scala> val pattern_status = "[A-Z]{6,7}".r
pattern_status: scala.util.matching.Regex = [A-Z]{6,7}// regular expression to extract 2 different phone numbers from the line
scala> val pattern_phnos = "[0-9]{20}".r
pattern_phnos: scala.util.matching.Regex = [0-9]{20}scala> val result_rdd = rd1.map( x => {
| val status = pattern_status.findFirstIn(x).get
| val pnos = pattern_phnos.findFirstIn(x).get
| val phno1 = pnos.slice(0,10)
| val phno2 = pnos.slice(10,20)
| (status,phno1,phno2)
| })
result_rdd: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[2] at map at <console>:33
scala> result_rdd.collect.foreach(println)
(DROPPED,8052690057,7757919463)
(DROPPED,9886177375,9916790556)
(SUCCESS,8618627996,9886177375)
(DROPPED,9876515616,4894949494)
(FAILED,5454545454,6469496477)
(SUCCESS,1235467890,2153698431)
(DROPPED,8055645645,8478787877)
(DROPPED,8080905609,5676236992)
(FAILED,4455458484,8449644469)
(DROPPED,9609065215,8087080806)
(DROPPED,8979794646,5879874615)
(SUCCESS,4545454545,7978978979)
(DROPPED,7458456456,4564564656)
(DROPPED,9879489494,5648947898)
(SUCCESS,8464564560,5646064646)
(DROPPED,7854545645,6456456456)
(DROPPED,5755454897,9797979797)
(SUCCESS,8789864098,9489089409)
(SUCCESS,7588484847,8978978979)
(SUCCESS,7489408648,9489489489)
scala> val rdd_dropped = result_rdd.map ( x => {
| val status = x._1 == "DROPPED"
| val ph1 = x._2
| val ph2 = x._3
| (status,ph1,ph2)
| })
rdd_dropped: org.apache.spark.rdd.RDD[(Boolean, String, String)] = MapPartitionsRDD[3] at map at <console>:31
scala> rdd_dropped.collect.foreach(println)
(true,8052690057,7757919463)
(true,9886177375,9916790556)
(false,8618627996,9886177375)
(true,9876515616,4894949494)
(false,5454545454,6469496477)
(false,1235467890,2153698431)
(true,8055645645,8478787877)
(true,8080905609,5676236992)
(false,4455458484,8449644469)
(true,9609065215,8087080806)
(true,8979794646,5879874615)
(false,4545454545,7978978979)
(true,7458456456,4564564656)
(true,9879489494,5648947898)
(false,8464564560,5646064646)
(true,7854545645,6456456456)
(true,5755454897,9797979797)
(false,8789864098,9489089409)
(false,7588484847,8978978979)
(false,7489408648,9489489489)
scala> val rdd_dropped = result_rdd.map ( x => {
val status = x._1 == "DROPPED"
val ph1 = x._2
val ph2 = x._3
if (status == true) { (status,ph1,ph2) }
})
scala> rdd_dropped.collect.foreach(println)
(true,8052690057,7757919463)
(true,9886177375,9916790556)
()
(true,9876515616,4894949494)
()
()
(true,8055645645,8478787877)
(true,8080905609,5676236992)
()
(true,9609065215,8087080806)
(true,8979794646,5879874615)
()
(true,7458456456,4564564656)
(true,9879489494,5648947898)
()
(true,7854545645,6456456456)
(true,5755454897,9797979797)
()
()
()
No comments:
Post a Comment