// How to merge multiple Inputs taken from different ports using Spark Streaming with Scala?
// streamingMultiInput.sc:
---------------------------
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.functions._
object Streaming1{
def main(args:Array[String]):Unit = {
// Create a configuration object
val conf = new SparkConf()
conf.set("spark.master","local[3]")
conf.set("spark.app.name","streamingApp1")
conf.set("spark.streaming.blockInterval","1000ms")
//creation of spark context object
val sc = new SparkContext(conf)
//creation of sqlContext
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// Creation of Streaming Context
val ssc = new StreamingContext(sc,Seconds(2))
//Receiver 1
val d1 = ssc.socketTextStream("localhost",3456)
//Receiver 1
val d2 = ssc.socketTextStream("localhost",5678)
//comine the 2 different inputs
val d3 = d1.union(d2)
// DStream (sequence of RDDs) into Dataframe
d3.foreachRDD { x =>
val ds1 = x.toDS()
val ds2 = ds1.flatMap(x => x.split(" "))
val ds3 = ds2.map(x => (x,1))
ds3.groupBy("_1").agg(count("_2")).show
}
ssc.start()
ssc.awaitTermination()
}
}
// start netcat and open 1st port
hadoop@hadoop:~/Desktop/vow$ nc -lk 3456
arivu arivu arivu
// start netcat and open 2nd port
hadoop@hadoop:~/Desktop/vow$ nc -lk 5678
spark spark spark
therivu therivu
therivu
//start spark shell
$ spark-shell
//stop existing SparkContext
scala> sc.stop
//load spark (scala) file
scala> :load streamingMultiInput.sc
// call the main method
scala> Streaming1.main(null)
// output
// result
+-------+---------+
| _1|count(_2)|
+-------+---------+
| arivu| 3|
| spark| 3|
|therivu| 3|
+-------+---------+
// streamingMultiInput.sc:
---------------------------
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.functions._
object Streaming1{
def main(args:Array[String]):Unit = {
// Create a configuration object
val conf = new SparkConf()
conf.set("spark.master","local[3]")
conf.set("spark.app.name","streamingApp1")
conf.set("spark.streaming.blockInterval","1000ms")
//creation of spark context object
val sc = new SparkContext(conf)
//creation of sqlContext
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// Creation of Streaming Context
val ssc = new StreamingContext(sc,Seconds(2))
//Receiver 1
val d1 = ssc.socketTextStream("localhost",3456)
//Receiver 1
val d2 = ssc.socketTextStream("localhost",5678)
//comine the 2 different inputs
val d3 = d1.union(d2)
// DStream (sequence of RDDs) into Dataframe
d3.foreachRDD { x =>
val ds1 = x.toDS()
val ds2 = ds1.flatMap(x => x.split(" "))
val ds3 = ds2.map(x => (x,1))
ds3.groupBy("_1").agg(count("_2")).show
}
ssc.start()
ssc.awaitTermination()
}
}
// start netcat and open 1st port
hadoop@hadoop:~/Desktop/vow$ nc -lk 3456
arivu arivu arivu
// start netcat and open 2nd port
hadoop@hadoop:~/Desktop/vow$ nc -lk 5678
spark spark spark
therivu therivu
therivu
//start spark shell
$ spark-shell
//stop existing SparkContext
scala> sc.stop
//load spark (scala) file
scala> :load streamingMultiInput.sc
// call the main method
scala> Streaming1.main(null)
// output
// result
+-------+---------+
| _1|count(_2)|
+-------+---------+
| arivu| 3|
| spark| 3|
|therivu| 3|
+-------+---------+
No comments:
Post a Comment