//Hide log messages, warning messages in spark-shell
import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
Input file
weblog.txt:
-------------
3.94.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.33.140.62 - 21475 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.31.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
3.91.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.32.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.35.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
3.93.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
12.38.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
12.38.140.62 - 4712 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
13.94.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
11.38.140.62 - 4712 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
12.38.140.62 - 4712 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
scala> val rdd1 = sc.textFile("/home/hadoop/Desktop/weblog.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /home/hadoop/Desktop/weblog.txt MapPartitionsRDD[50] at textFile at <console>:29
scala> rdd1.count
res15: Long = 12
scala> rdd1.take(5).foreach(println)
3.94.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.33.140.62 - 21475 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.31.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
3.91.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.32.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
scala> val ip_Pattern = "[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}".r
ip_Pattern: scala.util.matching.Regex = - [0-9]{1,5}
scala> val port_Pattern =" - [0-9]{1,5}".r
port_pattern: scala.util.matching.Regex = - [0-9]{1,5}
scala> val time_Pattern = "[0-9]{1,2}/[A-Z][a-z]{2}/[0-9]{4}:[0-9]{2}:[0-9]{2}:[0-9]{2} [+][0-9]{4}".r
time_Pattern: scala.util.matching.Regex = [0-9]{1,2}/[A-Z][a-z]{2}/[0-9]{4}:[0-9]{2}:[0-9]{2}:[0-9]{2} [+][0-9]{4}
scala> val rdd2 = rdd1.map { x =>
val ip = ip_Pattern.findFirstIn(x).get
val port = port_Pattern.findFirstIn(x).get
val port1 = port.slice(2,port.length).trim().toInt
val time = time_Pattern.findFirstIn(x).get
(ip,port1,time)
}
rdd2: org.apache.spark.rdd.RDD[(String, Int, String)] = MapPartitionsRDD[2] at map at <console>:33
scala> rdd2.take(5).foreach(println)
(3.94.78.5,69827,15/Sep/2013:23:58:36 +0100)
(19.33.140.62,21475,15/Sep/2013:23:58:34 +0100)
(19.31.140.62,2489,15/Sep/2013:23:58:34 +0100)
(3.91.78.5,69827,15/Sep/2013:23:58:36 +0100)
(19.32.140.62,2489,15/Sep/2013:23:58:34 +0100)
scala> val df1 = rdd2.toDF("ipAddress","portNo","TimeStamp")
df1: org.apache.spark.sql.DataFrame = [ipAddress: string, portNo: int ... 1 more field]
scala> df1.show
+------------+------+--------------------+
| ipAddress|portNo| TimeStamp|
+------------+------+--------------------+
| 3.94.78.5| 69827|15/Sep/2013:23:58...|
|19.33.140.62| 21475|15/Sep/2013:23:58...|
|19.31.140.62| 2489|15/Sep/2013:23:58...|
| 3.91.78.5| 69827|15/Sep/2013:23:58...|
|19.32.140.62| 2489|15/Sep/2013:23:58...|
|19.35.140.62| 2489|15/Sep/2013:23:58...|
| 3.93.78.5| 69827|15/Sep/2013:23:58...|
|12.38.140.62| 2489|15/Sep/2013:23:58...|
|12.38.140.62| 4712|15/Sep/2013:23:58...|
| 13.94.78.5| 69827|15/Sep/2013:23:58...|
|11.38.140.62| 4712|15/Sep/2013:23:58...|
|12.38.140.62| 4712|15/Sep/2013:23:58...|
+------------+------+--------------------+
scala> val df2 = df1.select ("*").groupBy("ipAddress").agg(count("*"))
df2: org.apache.spark.sql.DataFrame = [ipAddress: string, count(1): bigint]
scala> df2.show
+------------+--------+
| ipAddress|count(1)|
+------------+--------+
|12.38.140.62| 3|
|19.31.140.62| 1|
| 3.94.78.5| 1|
| 3.93.78.5| 1|
|19.35.140.62| 1|
|11.38.140.62| 1|
|19.33.140.62| 1|
| 3.91.78.5| 1|
| 13.94.78.5| 1|
|19.32.140.62| 1|
+------------+--------+
scala> val df2 = df1.select ("*").groupBy("ipAddress").agg(count("*") as "count")
df2: org.apache.spark.sql.DataFrame = [ipAddress: string, count: bigint]
scala> df2.show
+------------+-----+
| ipAddress|count|
+------------+-----+
|12.38.140.62| 3|
|19.31.140.62| 1|
| 3.94.78.5| 1|
| 3.93.78.5| 1|
|19.35.140.62| 1|
|11.38.140.62| 1|
|19.33.140.62| 1|
| 3.91.78.5| 1|
| 13.94.78.5| 1|
|19.32.140.62| 1|
+------------+-----+
scala> df.write.saveAsTable("joy.outstanding")
// joy is the database which is present in Hive
scala> spark.sql("use joy");
res6: org.apache.spark.sql.DataFrame = []
// fetching Hive table info within Spark
scala> spark.sql("select * from outstanding").show
+------------+-----+
| ipAddress|count|
+------------+-----+
|12.38.140.62| 3|
|19.31.140.62| 1|
| 3.94.78.5| 1|
| 3.93.78.5| 1|
|19.35.140.62| 1|
|11.38.140.62| 1|
|19.33.140.62| 1|
| 3.91.78.5| 1|
| 13.94.78.5| 1|
|19.32.140.62| 1|
+------------+-----+
import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
Input file
weblog.txt:
-------------
3.94.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.33.140.62 - 21475 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.31.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
3.91.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.32.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.35.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
3.93.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
12.38.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
12.38.140.62 - 4712 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
13.94.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
11.38.140.62 - 4712 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
12.38.140.62 - 4712 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
scala> val rdd1 = sc.textFile("/home/hadoop/Desktop/weblog.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /home/hadoop/Desktop/weblog.txt MapPartitionsRDD[50] at textFile at <console>:29
scala> rdd1.count
res15: Long = 12
scala> rdd1.take(5).foreach(println)
3.94.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.33.140.62 - 21475 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.31.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
3.91.78.5 - 69827 [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.32.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
scala> val ip_Pattern = "[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}".r
ip_Pattern: scala.util.matching.Regex = - [0-9]{1,5}
scala> val port_Pattern =" - [0-9]{1,5}".r
port_pattern: scala.util.matching.Regex = - [0-9]{1,5}
scala> val time_Pattern = "[0-9]{1,2}/[A-Z][a-z]{2}/[0-9]{4}:[0-9]{2}:[0-9]{2}:[0-9]{2} [+][0-9]{4}".r
time_Pattern: scala.util.matching.Regex = [0-9]{1,2}/[A-Z][a-z]{2}/[0-9]{4}:[0-9]{2}:[0-9]{2}:[0-9]{2} [+][0-9]{4}
scala> val rdd2 = rdd1.map { x =>
val ip = ip_Pattern.findFirstIn(x).get
val port = port_Pattern.findFirstIn(x).get
val port1 = port.slice(2,port.length).trim().toInt
val time = time_Pattern.findFirstIn(x).get
(ip,port1,time)
}
rdd2: org.apache.spark.rdd.RDD[(String, Int, String)] = MapPartitionsRDD[2] at map at <console>:33
scala> rdd2.take(5).foreach(println)
(3.94.78.5,69827,15/Sep/2013:23:58:36 +0100)
(19.33.140.62,21475,15/Sep/2013:23:58:34 +0100)
(19.31.140.62,2489,15/Sep/2013:23:58:34 +0100)
(3.91.78.5,69827,15/Sep/2013:23:58:36 +0100)
(19.32.140.62,2489,15/Sep/2013:23:58:34 +0100)
scala> val df1 = rdd2.toDF("ipAddress","portNo","TimeStamp")
df1: org.apache.spark.sql.DataFrame = [ipAddress: string, portNo: int ... 1 more field]
scala> df1.show
+------------+------+--------------------+
| ipAddress|portNo| TimeStamp|
+------------+------+--------------------+
| 3.94.78.5| 69827|15/Sep/2013:23:58...|
|19.33.140.62| 21475|15/Sep/2013:23:58...|
|19.31.140.62| 2489|15/Sep/2013:23:58...|
| 3.91.78.5| 69827|15/Sep/2013:23:58...|
|19.32.140.62| 2489|15/Sep/2013:23:58...|
|19.35.140.62| 2489|15/Sep/2013:23:58...|
| 3.93.78.5| 69827|15/Sep/2013:23:58...|
|12.38.140.62| 2489|15/Sep/2013:23:58...|
|12.38.140.62| 4712|15/Sep/2013:23:58...|
| 13.94.78.5| 69827|15/Sep/2013:23:58...|
|11.38.140.62| 4712|15/Sep/2013:23:58...|
|12.38.140.62| 4712|15/Sep/2013:23:58...|
+------------+------+--------------------+
scala> val df2 = df1.select ("*").groupBy("ipAddress").agg(count("*"))
df2: org.apache.spark.sql.DataFrame = [ipAddress: string, count(1): bigint]
scala> df2.show
+------------+--------+
| ipAddress|count(1)|
+------------+--------+
|12.38.140.62| 3|
|19.31.140.62| 1|
| 3.94.78.5| 1|
| 3.93.78.5| 1|
|19.35.140.62| 1|
|11.38.140.62| 1|
|19.33.140.62| 1|
| 3.91.78.5| 1|
| 13.94.78.5| 1|
|19.32.140.62| 1|
+------------+--------+
scala> val df2 = df1.select ("*").groupBy("ipAddress").agg(count("*") as "count")
df2: org.apache.spark.sql.DataFrame = [ipAddress: string, count: bigint]
scala> df2.show
+------------+-----+
| ipAddress|count|
+------------+-----+
|12.38.140.62| 3|
|19.31.140.62| 1|
| 3.94.78.5| 1|
| 3.93.78.5| 1|
|19.35.140.62| 1|
|11.38.140.62| 1|
|19.33.140.62| 1|
| 3.91.78.5| 1|
| 13.94.78.5| 1|
|19.32.140.62| 1|
+------------+-----+
scala> df.write.saveAsTable("joy.outstanding")
// joy is the database which is present in Hive
scala> spark.sql("use joy");
res6: org.apache.spark.sql.DataFrame = []
// fetching Hive table info within Spark
scala> spark.sql("select * from outstanding").show
+------------+-----+
| ipAddress|count|
+------------+-----+
|12.38.140.62| 3|
|19.31.140.62| 1|
| 3.94.78.5| 1|
| 3.93.78.5| 1|
|19.35.140.62| 1|
|11.38.140.62| 1|
|19.33.140.62| 1|
| 3.91.78.5| 1|
| 13.94.78.5| 1|
|19.32.140.62| 1|
+------------+-----+
No comments:
Post a Comment