// NetCat Input into Spark Streaming Example
$ spark-shell --master local[2]
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc,Seconds(10))
val lines = ssc.socketTextStream("localhost",9999)
val words = lines.flatMap(x => x.split(" "))
val pair = words.map(x => (x,1))
val res = pair.reduceByKey(_+_)
res.print()
ssc.start()
hadoop@hadoop:~$ nc -lk 9999
i love india
who loves pakistan
^C
Time: 1552710680000 ms
-------------------------------------------
(love,1)
(who,1)
(india,1)
(pakistan,1)
(i,1)
(loves,1)
// Kafka Console Producer and Console Consumer Example
[cloudera@quickstart kafka]$ sudo gedit server.properties
broker.id=0
zoo keeper port : 2182
log.dirs=/tmp/k1/kafka-logs
listeners=PLAINTEXT://:9092
[cloudera@quickstart kafka]$ sudo gedit config/zookeeper.properties
clientPort=2182
start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties
start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic myFresh --partitions 1 --replication-factor 1 --zookeeper localhost:2182
Created topic "myFresh".
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2182
myFresh
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myFresh
>i love india
>i love singapore
>i love malaysia
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic myFresh --bootstrap-server localhost:9092 --from-beginning
i love india
i love singapore
i love malaysia
// Kafka Console Producer data into => Spark Streaming Example (KafkaUtils.createStream)
KafkaStreamingExa.scala:
------------------------
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
object KafkaStreamingExa {
def main (args : Array[String]) : Unit ={
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("SparkStreamingJson").setMaster("local[4]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(10))
val kafkaStream = KafkaUtils.createStream(ssc,"localhost:2182","testGroup",Map("myFresh" -> 0))
val lines = kafkaStream.map(x => x._2.toUpperCase)
print(lines)
val words = lines.flatMap(x => x.split(" "))
val pair = words.map(x => (x,1))
val res = pair.reduceByKey(_+_)
res.foreachRDD( rdd =>
rdd.foreachPartition( partition =>
partition.foreach{
case (w:String, cnt:Int) => {
val x = w + "\t" + cnt
print(x)
}
}
)
)
ssc.start()
ssc.awaitTermination()
}
}
spark-shell
sc.stop()
scala> :load KafkaStreamingExa.scala
// Input data into Console Producer of Kafka
[cloudera@quickstart kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myFresh
>silva silva silva
>aravinda aravinda aravinda
>silva
>kamal rajini
>kamal rajini
// Result
KafkaStreamingExa.main(null)
RAJINI 2
KAMAL 2
SILVA 4
ARAVINDA 3
$ spark-shell --master local[2]
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc,Seconds(10))
val lines = ssc.socketTextStream("localhost",9999)
val words = lines.flatMap(x => x.split(" "))
val pair = words.map(x => (x,1))
val res = pair.reduceByKey(_+_)
res.print()
ssc.start()
hadoop@hadoop:~$ nc -lk 9999
i love india
who loves pakistan
^C
Time: 1552710680000 ms
-------------------------------------------
(love,1)
(who,1)
(india,1)
(pakistan,1)
(i,1)
(loves,1)
// Kafka Console Producer and Console Consumer Example
[cloudera@quickstart kafka]$ sudo gedit server.properties
broker.id=0
zoo keeper port : 2182
log.dirs=/tmp/k1/kafka-logs
listeners=PLAINTEXT://:9092
[cloudera@quickstart kafka]$ sudo gedit config/zookeeper.properties
clientPort=2182
start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties
start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic myFresh --partitions 1 --replication-factor 1 --zookeeper localhost:2182
Created topic "myFresh".
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2182
myFresh
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myFresh
>i love india
>i love singapore
>i love malaysia
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic myFresh --bootstrap-server localhost:9092 --from-beginning
i love india
i love singapore
i love malaysia
// Kafka Console Producer data into => Spark Streaming Example (KafkaUtils.createStream)
KafkaStreamingExa.scala:
------------------------
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
object KafkaStreamingExa {
def main (args : Array[String]) : Unit ={
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("SparkStreamingJson").setMaster("local[4]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(10))
val kafkaStream = KafkaUtils.createStream(ssc,"localhost:2182","testGroup",Map("myFresh" -> 0))
val lines = kafkaStream.map(x => x._2.toUpperCase)
print(lines)
val words = lines.flatMap(x => x.split(" "))
val pair = words.map(x => (x,1))
val res = pair.reduceByKey(_+_)
res.foreachRDD( rdd =>
rdd.foreachPartition( partition =>
partition.foreach{
case (w:String, cnt:Int) => {
val x = w + "\t" + cnt
print(x)
}
}
)
)
ssc.start()
ssc.awaitTermination()
}
}
spark-shell
sc.stop()
scala> :load KafkaStreamingExa.scala
// Input data into Console Producer of Kafka
[cloudera@quickstart kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myFresh
>silva silva silva
>aravinda aravinda aravinda
>silva
>kamal rajini
>kamal rajini
// Result
KafkaStreamingExa.main(null)
RAJINI 2
KAMAL 2
SILVA 4
ARAVINDA 3