Monday, 28 January 2019

Spark RDD to Dataframe To Hive Table (Spark-SQL with Hive integration)

//Hide log messages, warning messages in spark-shell
import org.apache.log4j.Logger
import org.apache.log4j.Level

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

Input file
weblog.txt:
-------------
3.94.78.5 - 69827    [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.33.140.62 - 21475 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.31.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
3.91.78.5 - 69827    [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.32.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.35.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
3.93.78.5 - 69827    [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
12.38.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
12.38.140.62 - 4712 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
13.94.78.5 - 69827    [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
11.38.140.62 - 4712 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
12.38.140.62 - 4712 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"


scala> val rdd1 = sc.textFile("/home/hadoop/Desktop/weblog.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /home/hadoop/Desktop/weblog.txt MapPartitionsRDD[50] at textFile at <console>:29

scala> rdd1.count
res15: Long = 12

scala> rdd1.take(5).foreach(println)
3.94.78.5 - 69827    [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.33.140.62 - 21475 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.31.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"
3.91.78.5 - 69827    [15/Sep/2013:23:58:36 +0100] "GET /KBDOC-00033.html HTTP/1.0"
19.32.140.62 - 2489 [15/Sep/2013:23:58:34 +0100] "GET /KBDOC-00033.html HTTP/1.0"


scala> val ip_Pattern = "[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}".r
ip_Pattern: scala.util.matching.Regex = - [0-9]{1,5}

scala> val port_Pattern =" - [0-9]{1,5}".r
port_pattern: scala.util.matching.Regex =  - [0-9]{1,5}

scala> val time_Pattern = "[0-9]{1,2}/[A-Z][a-z]{2}/[0-9]{4}:[0-9]{2}:[0-9]{2}:[0-9]{2} [+][0-9]{4}".r
time_Pattern: scala.util.matching.Regex = [0-9]{1,2}/[A-Z][a-z]{2}/[0-9]{4}:[0-9]{2}:[0-9]{2}:[0-9]{2} [+][0-9]{4}




scala> val rdd2 = rdd1.map { x =>
             val ip  = ip_Pattern.findFirstIn(x).get
             val port = port_Pattern.findFirstIn(x).get
             val port1 = port.slice(2,port.length).trim().toInt
             val time = time_Pattern.findFirstIn(x).get
             (ip,port1,time)
             }
rdd2: org.apache.spark.rdd.RDD[(String, Int, String)] = MapPartitionsRDD[2] at map at <console>:33


scala> rdd2.take(5).foreach(println)
(3.94.78.5,69827,15/Sep/2013:23:58:36 +0100)
(19.33.140.62,21475,15/Sep/2013:23:58:34 +0100)
(19.31.140.62,2489,15/Sep/2013:23:58:34 +0100)
(3.91.78.5,69827,15/Sep/2013:23:58:36 +0100)
(19.32.140.62,2489,15/Sep/2013:23:58:34 +0100)


scala> val df1 = rdd2.toDF("ipAddress","portNo","TimeStamp")
df1: org.apache.spark.sql.DataFrame = [ipAddress: string, portNo: int ... 1 more field]

scala> df1.show
+------------+------+--------------------+
|   ipAddress|portNo|           TimeStamp|
+------------+------+--------------------+
|   3.94.78.5| 69827|15/Sep/2013:23:58...|
|19.33.140.62| 21475|15/Sep/2013:23:58...|
|19.31.140.62|  2489|15/Sep/2013:23:58...|
|   3.91.78.5| 69827|15/Sep/2013:23:58...|
|19.32.140.62|  2489|15/Sep/2013:23:58...|
|19.35.140.62|  2489|15/Sep/2013:23:58...|
|   3.93.78.5| 69827|15/Sep/2013:23:58...|
|12.38.140.62|  2489|15/Sep/2013:23:58...|
|12.38.140.62|  4712|15/Sep/2013:23:58...|
|  13.94.78.5| 69827|15/Sep/2013:23:58...|
|11.38.140.62|  4712|15/Sep/2013:23:58...|
|12.38.140.62|  4712|15/Sep/2013:23:58...|
+------------+------+--------------------+


scala> val df2 = df1.select ("*").groupBy("ipAddress").agg(count("*"))
df2: org.apache.spark.sql.DataFrame = [ipAddress: string, count(1): bigint]

scala> df2.show
+------------+--------+                                                       
|   ipAddress|count(1)|
+------------+--------+
|12.38.140.62|       3|
|19.31.140.62|       1|
|   3.94.78.5|       1|
|   3.93.78.5|       1|
|19.35.140.62|       1|
|11.38.140.62|       1|
|19.33.140.62|       1|
|   3.91.78.5|       1|
|  13.94.78.5|       1|
|19.32.140.62|       1|
+------------+--------+

scala> val df2 = df1.select ("*").groupBy("ipAddress").agg(count("*") as "count")
df2: org.apache.spark.sql.DataFrame = [ipAddress: string, count: bigint]

scala> df2.show
+------------+-----+                                                         
|   ipAddress|count|
+------------+-----+
|12.38.140.62|    3|
|19.31.140.62|    1|
|   3.94.78.5|    1|
|   3.93.78.5|    1|
|19.35.140.62|    1|
|11.38.140.62|    1|
|19.33.140.62|    1|
|   3.91.78.5|    1|
|  13.94.78.5|    1|
|19.32.140.62|    1|
+------------+-----+

scala> df.write.saveAsTable("joy.outstanding")

// joy is the database which is present in Hive                                                         
scala> spark.sql("use joy");
res6: org.apache.spark.sql.DataFrame = []

// fetching Hive table info within Spark
scala> spark.sql("select * from outstanding").show
+------------+-----+                                                           
|   ipAddress|count|
+------------+-----+
|12.38.140.62|    3|
|19.31.140.62|    1|
|   3.94.78.5|    1|
|   3.93.78.5|    1|
|19.35.140.62|    1|
|11.38.140.62|    1|
|19.33.140.62|    1|
|   3.91.78.5|    1|
|  13.94.78.5|    1|
|19.32.140.62|    1|
+------------+-----+

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...