Saturday, 16 March 2019

Multiple Kafka Programs - Console Producer, Console Consumer - Spark Streaming

// NetCat Input into Spark Streaming Example
$ spark-shell --master local[2]

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

val ssc = new StreamingContext(sc,Seconds(10))
val lines = ssc.socketTextStream("localhost",9999)
val words = lines.flatMap(x => x.split(" "))
val pair = words.map(x => (x,1))
val res = pair.reduceByKey(_+_)
res.print()
ssc.start()


hadoop@hadoop:~$ nc -lk 9999
i love india
who loves pakistan
^C

Time: 1552710680000 ms
-------------------------------------------
(love,1)
(who,1)
(india,1)
(pakistan,1)
(i,1)
(loves,1)


// Kafka Console Producer and Console Consumer Example
[cloudera@quickstart kafka]$ sudo gedit server.properties
   broker.id=0
   zoo keeper port : 2182
   log.dirs=/tmp/k1/kafka-logs
   listeners=PLAINTEXT://:9092
 
[cloudera@quickstart kafka]$ sudo gedit config/zookeeper.properties
   clientPort=2182
 
start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties



hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic myFresh --partitions 1 --replication-factor 1 --zookeeper localhost:2182
Created topic "myFresh".


hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2182
myFresh

hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myFresh
>i love india
>i love singapore
>i love malaysia

hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic myFresh --bootstrap-server localhost:9092 --from-beginning
i love india
i love singapore
i love malaysia




// Kafka Console Producer data into => Spark Streaming Example (KafkaUtils.createStream)

KafkaStreamingExa.scala:
------------------------
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level


object KafkaStreamingExa {
def main (args : Array[String]) : Unit ={
  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)
  val conf = new SparkConf().setAppName("SparkStreamingJson").setMaster("local[4]")
  val sc = new SparkContext(conf)
  val ssc = new StreamingContext(sc,Seconds(10))
  val kafkaStream = KafkaUtils.createStream(ssc,"localhost:2182","testGroup",Map("myFresh" -> 0))
  val lines = kafkaStream.map(x => x._2.toUpperCase)
  print(lines)
  val words = lines.flatMap(x => x.split(" "))
 
  val pair = words.map(x => (x,1))

  val res = pair.reduceByKey(_+_)


 
  res.foreachRDD( rdd =>
      rdd.foreachPartition( partition =>
        partition.foreach{
            case (w:String, cnt:Int) => {
              val x = w + "\t" + cnt
              print(x)
 
            }
          }
      )
  )

  ssc.start()
  ssc.awaitTermination()

}
}


 spark-shell 
 sc.stop()
 scala> :load  KafkaStreamingExa.scala


// Input data into Console Producer of Kafka
 [cloudera@quickstart kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myFresh
>silva silva silva
>aravinda aravinda aravinda
>silva
>kamal rajini
>kamal rajini


// Result
KafkaStreamingExa.main(null)
RAJINI  2
KAMAL  2
SILVA 4
ARAVINDA 3




No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...