input file: (weblog.txt)
------------
3.94.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.33.140.62 - 21475 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.31.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
3.91.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.32.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.35.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
3.93.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
12.38.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
12.38.140.62 - 4712 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
13.94.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
11.38.140.62 - 4712 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
12.38.140.62 - 4712 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
scala> val r1 = sc.textFile("E:\\POCs\\weblog.txt")
r1: org.apache.spark.rdd.RDD[String] = E:\POCs\weblog.txt MapPartitionsRDD[8] at textFile at <console>:24
scala> r1.count
res9: Long = 12
scala> val ip_pattern = "[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}".r
ip_pattern: scala.util.matching.Regex = [0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}
scala> val port_pattern = " - [0-9]{1,5} ".r
port_pattern: scala.util.matching.Regex = - [0-9]{1,5}
scala> val r2 = r1.map { x =>
| val ip = ip_pattern.findFirstIn(x).get
| val port = port_pattern.findFirstIn(x).get
| (ip,port)
| }
scala> r2.count
res10: Long = 12
scala> r2.take(10).foreach(println)
(3.94.78.5, - 69827 )
(19.33.140.62, - 21475 )
(19.31.140.62, - 2489 )
(3.91.78.5, - 69827 )
(19.32.140.62, - 2489 )
(19.35.140.62, - 2489 )
(3.93.78.5, - 69827 )
(12.38.140.62, - 2489 )
(12.38.140.62, - 4712 )
(13.94.78.5, - 69827 )
scala> val r3 = r2.map (x => (x._1, x._2.slice(2,x._2.length-1).trim().toInt))
r3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[13] at map at <console>:25
scala> r3.foreach(println)
(3.93.78.5,69827)
(12.38.140.62,2489)
(12.38.140.62,4712)
(13.94.78.5,69827)
(11.38.140.62,4712)
(3.94.78.5,69827)
(19.33.140.62,21475)
(19.31.140.62,2489)
(3.91.78.5,69827)
(19.32.140.62,2489)
(19.35.140.62,2489)
(12.38.140.62,4712)
scala> val df = r3.toDF("ip","port")
df: org.apache.spark.sql.DataFrame = [ip: string, port: int]
scala> df.printSchema
root
|-- ip: string (nullable = true)
|-- port: integer (nullable = false)
------------
3.94.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.33.140.62 - 21475 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.31.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
3.91.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.32.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.35.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
3.93.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
12.38.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
12.38.140.62 - 4712 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
13.94.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
11.38.140.62 - 4712 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
12.38.140.62 - 4712 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
scala> val r1 = sc.textFile("E:\\POCs\\weblog.txt")
r1: org.apache.spark.rdd.RDD[String] = E:\POCs\weblog.txt MapPartitionsRDD[8] at textFile at <console>:24
scala> r1.count
res9: Long = 12
scala> val ip_pattern = "[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}".r
ip_pattern: scala.util.matching.Regex = [0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}
scala> val port_pattern = " - [0-9]{1,5} ".r
port_pattern: scala.util.matching.Regex = - [0-9]{1,5}
scala> val r2 = r1.map { x =>
| val ip = ip_pattern.findFirstIn(x).get
| val port = port_pattern.findFirstIn(x).get
| (ip,port)
| }
scala> r2.count
res10: Long = 12
scala> r2.take(10).foreach(println)
(3.94.78.5, - 69827 )
(19.33.140.62, - 21475 )
(19.31.140.62, - 2489 )
(3.91.78.5, - 69827 )
(19.32.140.62, - 2489 )
(19.35.140.62, - 2489 )
(3.93.78.5, - 69827 )
(12.38.140.62, - 2489 )
(12.38.140.62, - 4712 )
(13.94.78.5, - 69827 )
scala> val r3 = r2.map (x => (x._1, x._2.slice(2,x._2.length-1).trim().toInt))
r3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[13] at map at <console>:25
scala> r3.foreach(println)
(3.93.78.5,69827)
(12.38.140.62,2489)
(12.38.140.62,4712)
(13.94.78.5,69827)
(11.38.140.62,4712)
(3.94.78.5,69827)
(19.33.140.62,21475)
(19.31.140.62,2489)
(3.91.78.5,69827)
(19.32.140.62,2489)
(19.35.140.62,2489)
(12.38.140.62,4712)
scala> val df = r3.toDF("ip","port")
df: org.apache.spark.sql.DataFrame = [ip: string, port: int]
scala> df.printSchema
root
|-- ip: string (nullable = true)
|-- port: integer (nullable = false)
No comments:
Post a Comment