Showing posts with label kafka. Show all posts
Showing posts with label kafka. Show all posts

Saturday, 16 March 2019

Json Input To Kafka Broker to Spark Streaming to MySQL using KafkaProducer, kafkaUtils.CreateStream

//Json To Kafka Broker to Spark Streaming to MySQL
// Here we read a json input file from file system and put it into Kafka Broker
// Spark Streaming fetch messages from Kafka Broker and write it into MySQL table

input file:
nameList.json:
--------------
{"id":992,"name":"Herman","city":"Iles","country":"Colombia","Skills":"CVE"},
{"id":993,"name":"Burton","city":"Santo Tomas","country":"Philippines","Skills":"VMware vSphere"},
{"id":994,"name":"Correna","city":"Shirgjan","country":"Albania","Skills":"Wyse"},
{"id":995,"name":"Cathi","city":"Dorūd","country":"Iran","Skills":"SSCP"},
{"id":996,"name":"Lena","city":"Al Judayrah","country":"Palestinian Territory","Skills":"Commercial Kitchen Design"},
{"id":997,"name":"Madalena","city":"Livadiya","country":"Ukraine","Skills":"Software Development"},
{"id":998,"name":"Jo-anne","city":"Khatsyezhyna","country":"Belarus","Skills":"TPD"},
{"id":999,"name":"Georgi","city":"Pasuruan","country":"Indonesia","Skills":"Project Engineering"},
{"id":1000,"name":"Scott","city":"Gyumri","country":"Armenia","Skills":"RHEV"}



start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties

hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic jsonTopic --partitions 1 --replication-factor 1 --zookeeper localhost:2182

//view the topics available in Kafka Broker
hadoop@hadoop:/usr/local/kafka$  bin/kafka-topics.sh --list --zookeeper localhost:2182
jsonTopic



// create a database and table in MySQL:

 $ mysql -uroot -pcloudera

Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 4
Server version: 5.7.25-0ubuntu0.18.10.2 (Ubuntu)

Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> create database KafkaDB;
Query OK, 1 row affected (0.05 sec)

mysql> use KafkaDB;
Database changed

// create a table
mysql> create table jsonTable (id int, name varchar(50), city varchar(50), country varchar(50), Skills varchar(50));
Query OK, 0 rows affected (0.20 sec)





//Produce reads json file and publish them in kafka topic
Producer Programming in Scala:
-------------------------------
JsonProducer.scala:
-------------------------
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}

object JsonProducer {
  def main(args:Array[String]):Unit = {
    val props = new Properties()

    props.put("bootstrap.servers","localhost:9092")
    props.put("acks","all")
    props.put("client.id","ProducerApp")
    props.put("retries","4")
    props.put("batch.size","32768")
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    val topic = "jsonTopic"
    val producer = new KafkaProducer[String,String](props)
    val file = scala.io.Source.fromFile("/home/cloudera/Desktop/vow/nameList.json")

    for (line <- file.getLines()) {
      val msg = new ProducerRecord[String,String](topic,line)
      producer.send(msg)
    }
    producer.close()
  }
}


[cloudera@quickstart kafka]$ spark-shell
scala> sc.stop()
scala> :load JsonProducer.scala
scala> JsonProducer.main(null)




hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic jsonTopic --bootstrap-server localhost:9092 --from-beginning

{"id":1,"name":"Sharline","city":"Uppsala","country":"Sweden","Skills":"Eyelash Extensions"},
{"id":2,"name":"Marris","city":"São Domingos","country":"Cape Verde","Skills":"VMI Programs"},
{"id":3,"name":"Gregg","city":"Qaxbaş","country":"Azerbaijan","Skills":"Historical Research"},
{"id":4,"name":"Christye","city":"Guarapari","country":"Brazil","Skills":"Army"},
{"id":5,"name":"Modesta","city":"Paltamo","country":"Finland","Skills":"Avaya Technologies"},



Kafka2MySQLStreaming.scala:
-----------------------------
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
import java.util.Properties
import org.apache.spark.sql.{SQLContext,SaveMode}
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}

object Kafka2MySQLStreaming{
def main(args:Array[String]):Unit ={

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("SparkStreamingJson").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlc = new SQLContext(sc)
val batchInterval = 20
val zk = "localhost:2182"
val consumerGroupID = "jsonGroup"
val topic = "jsonTopic"
val partition = 1
val perTopicPartitions = Map(topic -> partition)
val ssc = new StreamingContext(sc,Seconds(batchInterval))

val KafkaData = KafkaUtils.createStream(ssc,zk,consumerGroupID,perTopicPartitions)
val ks = KafkaData.map (x => x._2)
ks.foreachRDD { x =>
val df = sqlc.read.json(x)

val props = new Properties()
props.put("driver","com.mysql.jdbc.Driver")
props.put("user","root")
props.put("password","cloudera")

df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/KafkaDB","jsonTable",props)
df.count()
}
ssc.start()
ssc.awaitTermination()
}
}

[cloudera@quickstart kafka]$ spark-shell
scala> sc.stop()
scala> :load Kafka2MySQLStreaming.scala
scala> Kafka2MySQLStreaming.main(null)



// MySQL result:
--------------
[cloudera@quickstart kafka]$ mysql -uroot -pcloudera
mysql> use KafkaDB;
mysql> select * from jsonTable;
+------+------------+--------------------------+-----------------------+-------------------------------+
| id   | name       | city                     | country               | Skills                        |
+------+------------+--------------------------+-----------------------+-------------------------------+
|    1 | Sharline   | Uppsala                  | Sweden                | Eyelash Extensions            |
|    2 | Marris     | São Domingos             | Cape Verde            | VMI Programs                  |
|    3 | Gregg      | Qaxbaş                   | Azerbaijan            | Historical Research           |
|    4 | Christye   | Guarapari                | Brazil                | Army                          |
|    5 | Modesta    | Paltamo                  | Finland               | Avaya Technologies            |

Multiple Kafka Programs - Console Producer, Console Consumer - Spark Streaming

// NetCat Input into Spark Streaming Example
$ spark-shell --master local[2]

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

val ssc = new StreamingContext(sc,Seconds(10))
val lines = ssc.socketTextStream("localhost",9999)
val words = lines.flatMap(x => x.split(" "))
val pair = words.map(x => (x,1))
val res = pair.reduceByKey(_+_)
res.print()
ssc.start()


hadoop@hadoop:~$ nc -lk 9999
i love india
who loves pakistan
^C

Time: 1552710680000 ms
-------------------------------------------
(love,1)
(who,1)
(india,1)
(pakistan,1)
(i,1)
(loves,1)


// Kafka Console Producer and Console Consumer Example
[cloudera@quickstart kafka]$ sudo gedit server.properties
   broker.id=0
   zoo keeper port : 2182
   log.dirs=/tmp/k1/kafka-logs
   listeners=PLAINTEXT://:9092
 
[cloudera@quickstart kafka]$ sudo gedit config/zookeeper.properties
   clientPort=2182
 
start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties



hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic myFresh --partitions 1 --replication-factor 1 --zookeeper localhost:2182
Created topic "myFresh".


hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2182
myFresh

hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myFresh
>i love india
>i love singapore
>i love malaysia

hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic myFresh --bootstrap-server localhost:9092 --from-beginning
i love india
i love singapore
i love malaysia




// Kafka Console Producer data into => Spark Streaming Example (KafkaUtils.createStream)

KafkaStreamingExa.scala:
------------------------
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level


object KafkaStreamingExa {
def main (args : Array[String]) : Unit ={
  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)
  val conf = new SparkConf().setAppName("SparkStreamingJson").setMaster("local[4]")
  val sc = new SparkContext(conf)
  val ssc = new StreamingContext(sc,Seconds(10))
  val kafkaStream = KafkaUtils.createStream(ssc,"localhost:2182","testGroup",Map("myFresh" -> 0))
  val lines = kafkaStream.map(x => x._2.toUpperCase)
  print(lines)
  val words = lines.flatMap(x => x.split(" "))
 
  val pair = words.map(x => (x,1))

  val res = pair.reduceByKey(_+_)


 
  res.foreachRDD( rdd =>
      rdd.foreachPartition( partition =>
        partition.foreach{
            case (w:String, cnt:Int) => {
              val x = w + "\t" + cnt
              print(x)
 
            }
          }
      )
  )

  ssc.start()
  ssc.awaitTermination()

}
}


 spark-shell 
 sc.stop()
 scala> :load  KafkaStreamingExa.scala


// Input data into Console Producer of Kafka
 [cloudera@quickstart kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myFresh
>silva silva silva
>aravinda aravinda aravinda
>silva
>kamal rajini
>kamal rajini


// Result
KafkaStreamingExa.main(null)
RAJINI  2
KAMAL  2
SILVA 4
ARAVINDA 3




Wednesday, 13 March 2019

Kafka : Call Logs SUCCESS, FAILED, DROPPED - Producer and Consumer Programming in Scala

input file:
calllogdata.txt:
-----------------
ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:05DROPPED 80526900577757919463
ec59cea2-5006-448f-a032-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:07DROPPED 98861773759916790556
ec59cea2-5006-448f-a033-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:45SUCCESS 86186279969886177375
ec59cea2-5006-448f-a034-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:53DROPPED 98765156164894949494
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a036-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:05SUCCESS 12354678902153698431
ec59cea2-5006-448f-a037-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80556456458478787877
ec59cea2-5006-448f-a038-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80809056095676236992
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a040-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 96090652158087080806
ec59cea2-5006-448f-a041-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 89797946465879874615
ec59cea2-5006-448f-a042-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05SUCCESS 45454545457978978979
ec59cea2-5006-448f-a043-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 74584564564564564656
ec59cea2-5006-448f-a044-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 98794894945648947898
ec59cea2-5006-448f-a045-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05SUCCESS 84645645605646064646
ec59cea2-5006-448f-a046-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 78545456456456456456
ec59cea2-5006-448f-a047-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 57554548979797979797
ec59cea2-5006-448f-a048-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 87898640989489089409
ec59cea2-5006-448f-a049-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 75884848478978978979
ec59cea2-5006-448f-a050-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 74894086489489489489
ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:05DROPPED 80526900577757919463
ec59cea2-5006-448f-a032-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:07DROPPED 98861773759916790556
ec59cea2-5006-448f-a033-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:45SUCCESS 86186279969886177375
ec59cea2-5006-448f-a034-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:53DROPPED 98765156164894949494
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a036-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:05SUCCESS 12354678902153698431
ec59cea2-5006-448f-a037-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80556456458478787877
ec59cea2-5006-448f-a038-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80809056095676236992
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a040-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 96090652158087080806
ec59cea2-5006-448f-a041-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 89797946465879874615
ec59cea2-5006-448f-a042-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05SUCCESS 45454545457978978979
ec59cea2-5006-448f-a043-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 74584564564564564656
ec59cea2-5006-448f-a044-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 98794894945648947898
ec59cea2-5006-448f-a045-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05SUCCESS 84645645605646064646
ec59cea2-5006-448f-a046-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 78545456456456456456
ec59cea2-5006-448f-a047-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 57554548979797979797
ec59cea2-5006-448f-a048-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 87898640989489089409
ec59cea2-5006-448f-a049-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 75884848478978978979
ec59cea2-5006-448f-a050-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 74894086489489489489
ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:05DROPPED 80526900577757919463
ec59cea2-5006-448f-a032-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:07DROPPED 98861773759916790556
ec59cea2-5006-448f-a033-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:45SUCCESS 86186279969886177375
ec59cea2-5006-448f-a034-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:53DROPPED 98765156164894949494
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a036-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:05SUCCESS 12354678902153698431
ec59cea2-5006-448f-a037-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80556456458478787877
ec59cea2-5006-448f-a038-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80809056095676236992
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a040-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 96090652158087080806
ec59cea2-5006-448f-a041-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 89797946465879874615
ec59cea2-5006-448f-a042-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05SUCCESS 45454545457978978979
ec59cea2-5006-448f-a043-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 74584564564564564656
ec59cea2-5006-448f-a044-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 98794894945648947898
ec59cea2-5006-448f-a045-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05SUCCESS 84645645605646064646
ec59cea2-5006-448f-a046-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 78545456456456456456
ec59cea2-5006-448f-a047-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 57554548979797979797
ec59cea2-5006-448f-a048-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 87898640989489089409
ec59cea2-5006-448f-a049-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 75884848478978978979
ec59cea2-5006-448f-a050-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 74894086489489489489

start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties


bin/kafka-topics.sh --create --topic SUCCESS_RECORDS --partitions 1 --replication-factor 1 --zookeeper localhost:2181
bin/kafka-topics.sh --create --topic FAILED_RECORDS --partitions 1 --replication-factor 1 --zookeeper localhost:2181
bin/kafka-topics.sh --create --topic DROPPED_RECORDS --partitions 1 --replication-factor 1 --zookeeper localhost:2181


hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2181
DROPPED_RECORDS
FAILED_RECORDS
SUCCESS_RECORDS


build.properties:
-----------------
sbt.version = 1.2.8

build.sbt dependency packages:
--------------------------------
name := "myOwn"

version := "0.1"

scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"



import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}

object KafkaProducerCallLog {
  def main(args:Array[String]):Unit = {
    val props = new Properties()
    val topic = "KafkaTopic"
    props.put("bootstrap.servers","localhost:9092")
    props.put("client.id","ProducerApp")
    props.put("batch.size","32768")
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    val topicSUCCESS = "SUCCESS_RECORDS"
    val topicFAILED = "FAILED_RECORDS"
    val topicDROPPED = "DROPPED_RECORDS"

    val producer = new KafkaProducer[String,String](props)

    val file = scala.io.Source.fromFile("/home/hadoop/Desktop/vow/calllogdata.txt")

    for (line <- file.getLines()) {
      val status_pattern = "(SUCCESS|FAILED|DROPPED)".r
      val status = status_pattern.findFirstIn(line).get
      if (status == "SUCCESS") {
        val msg = new ProducerRecord[String,String](topicSUCCESS,line)
        producer.send(msg)
        println(msg)
      }
      else if (status == "FAILED") {
        val msg = new ProducerRecord[String,String](topicFAILED,line)
        producer.send(msg)
        println(msg)
      }
      else
      {
        val msg = new ProducerRecord[String,String](topicDROPPED,line)
        producer.send(msg)
        println(msg)
      }
    }
    producer.close()
  }
}





bin/kafka-console-consumer.sh --topic SUCCESS_RECORDS --bootstrap-server localhost:9092 --from-beginning

bin/kafka-console-consumer.sh --topic FAILED_RECORDS --bootstrap-server localhost:9092 --from-beginning
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469


bin/kafka-console-consumer.sh --topic DROPPED_RECORDS --bootstrap-server localhost:9092 --from-beginning


import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer

import scala.collection.JavaConversions._

object KafkaConsumerExa1 {
  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    properties.put("bootstrap.servers", "127.0.0.1:9092")
    properties.put("group.id", "testGroup1")

    properties.put("client.id", "ConsumerApp")
    // properties.put("partition.assignment.strategy", "range");



    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val consumer = new KafkaConsumer[String, String](properties)
    val topic = "FAILED_RECORDS"


    consumer.subscribe(Collections.singletonList(topic))

    System.out.println("Subscribed to topic " + topic)
 
   var records = consumer.poll(4000)
     consumer.seekToBeginning(consumer.assignment)
    records = consumer.poll(4000)
      for (record <- records.iterator()){
        println("Received Message : "  + record)
      }
 
    consumer.commitSync()
  }
}

Tuesday, 12 March 2019

Kafka : Producer, Consumer Programming in Scala with Multi Server Configuration

// Here we are going to do Scala Programming for Kafka Producer, Kafka Consumer Programming in Scala with Multi Server Configuration

start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)



default zookeeper port number is : 2181

Make a copy of server.properties and rename it into server1.properties,server2.properties,server3.properties respectively

Open individual file and change the port number mentioned below.

sudo gedit server0.properties
broker.id=0
zoo keeper port : 2181
log.dirs=/tmp/k1/kafka-logs
listeners=PLAINTEXT://:9090

sudo gedit server1.properties
broker.id=1
zoo keeper port : 2181
log.dirs=/tmp/k1/kafka-logs
listeners=PLAINTEXT://:9091

sudo gedit server2.properties
broker.id=2
zoo keeper port : 2181
log.dirs=/tmp/k2/kafka-logs
listeners=PLAINTEXT://:9092

sudo gedit server3.properties
broker.id=3
zoo keeper port : 2181
log.dirs=/tmp/k3/kafka-logs
listeners=PLAINTEXT://:9093

Open 4 new terminals and run each lines in each terminals
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server0.properties
INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server1.properties
INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server2.properties
INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server3.properties
INFO [KafkaServer id=3] started (kafka.server.KafkaServer)


 

// 4 different kafka server instances are running
hadoop@hadoop:/usr/local/kafka$ jps
6384 Jps
4851 Kafka  // #instance 1
6243 Main
4163 Kafka // #instance 2
3492 QuorumPeerMain
4504 Kafka // #instance 3
5196 Kafka // #instance 4




// Create a new topic named as : myTopic with 4 partitions and 4 replications
hadoop@hadoop:/usr/local/kafka$  bin/kafka-topics.sh --create --topic myTopic --partitions 4 --replication-factor 4 --zookeeper localhost:2181
Created topic "myTopic".


// see the topic : myTopic description
bin/kafka-topics.sh --describe  --zookeeper localhost:2181
Topic:myTopic PartitionCount:4 ReplicationFactor:4 Configs:
Topic: myTopic Partition: 0 Leader: 3 Replicas: 3,0,1,2 Isr: 3,0,1,2
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1,2,3 Isr: 0,1,2,3
Topic: myTopic Partition: 2 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
Topic: myTopic Partition: 3 Leader: 2 Replicas: 2,3,0,1 Isr: 2,3,0,1




build.properties:
-----------------
sbt.version = 1.2.8

build.sbt dependency packages:
--------------------------------
name := "Kafka"

version := "0.1"

scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"




// Producer Programming in Scala
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}

object KafkaProducerExa2 {
  def main(args:Array[String]):Unit = {

    val props = new Properties()
    val topic = "myTopic"

    props.put("bootstrap.servers","192.168.0.106:9090,192.168.0.106:9091,192.168.0.106:9092,192.168.0.106:9093")
    props.put("acks","all")
    props.put("client.id","ProducerApp")
    props.put("retries","4")
    props.put("batch.size","32768")

    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String,String](props)
    val msg:String = "Welcome to Kafka : #"
    for (i <- 1 to 10){
      val data = new ProducerRecord[String,String](topic,msg+i.toString)
      producer.send(data)
    }
    producer.close()
    println("------Successfully published messages to topic : " + topic + "----")
  }
}




Run the program in IntelliJ IDEA.

------Successfully published messages to topic : myTopic----


// view the output in console:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic myTopic --bootstrap-server localhost:9090 --from-beginning
Welcome to Kafka : #3
Welcome to Kafka : #7
Welcome to Kafka : #1
Welcome to Kafka : #5
Welcome to Kafka : #9
Welcome to Kafka : #2
Welcome to Kafka : #6
Welcome to Kafka : #10
Welcome to Kafka : #4
Welcome to Kafka : #8



//Consumer Programming in Scala
import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer

import scala.collection.JavaConversions._

object KafkaConsumerExa1 {
  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    properties.put("bootstrap.servers", "192.168.0.106:9091")
    properties.put("group.id", "testGroup")
    properties.put("client.id", "ConsumerApp")

    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val consumer = new KafkaConsumer[String, String](properties)
    val topic = "myTopic"


  consumer.subscribe(Collections.singletonList(topic))
    System.out.println("Subscribed to topic " + topic)
    var records = consumer.poll(4000)
    consumer.seekToBeginning(consumer.assignment)
    records = consumer.poll(4000)
    for (record <- records.iterator()){
      println("Received Message : "  + record)
    }
    consumer.commitSync()
  }
}

Output:
--------
Subscribed to topic myTopic
Received Message : ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 1, offset = 4, CreateTime = 1552402871872, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #4)
Received Message : ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 1, offset = 5, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #8)
Received Message : ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 2, offset = 6, CreateTime = 1552402871872, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #2)
Received Message : ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 2, offset = 7, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #6)
Received Message : ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 2, offset = 8, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #10)
Received Message : ConsumerRecord(topic = myTopic, partition = 1, leaderEpoch = 1, offset = 6, CreateTime = 1552402871872, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #3)
Received Message : ConsumerRecord(topic = myTopic, partition = 1, leaderEpoch = 1, offset = 7, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #7)
Received Message : ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 1, offset = 4, CreateTime = 1552402871830, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #1)
Received Message : ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 1, offset = 5, CreateTime = 1552402871872, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #5)
Received Message : ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 1, offset = 6, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #9)


Kafka Producer in Scala Programming Example

start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties

bin/kafka-topics.sh --create --topic KafkaTopic --partitions 1 --replication-factor 1 --zookeeper localhost:2181


build.properties:
-----------------
sbt.version = 1.2.8

build.sbt dependency packages:
--------------------------------
name := "TwitInt"

version := "0.1"

scalaVersion := "2.11.12"


// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"



import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}

object KafkaProducerExa {
  def main(args:Array[String]):Unit = {
    val props = new Properties()
    val topic = "KafkaTopic"
    props.put("bootstrap.servers","localhost:9092")
    props.put("client.id","ScalaProducer")
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String,String](props)
    val msg:String = "Welcome to Kafka : #"
    for (i <- 1 to 10){
      val data = new ProducerRecord[String,String](topic,msg+i)
      producer.send(data)
    }
    producer.close()
    println("------Successfully published messages to topic : " + topic + "----")
  }
}


Run the .scala file in IntelliJ IDEA
------Successfully published messages to topic : KafkaTopic----


See the output in console:
--------------------------
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic KafkaTopic --bootstrap-server localhost:9092

Welcome to Kafka : #1
Welcome to Kafka : #2
Welcome to Kafka : #3
Welcome to Kafka : #4
Welcome to Kafka : #5
Welcome to Kafka : #6
Welcome to Kafka : #7
Welcome to Kafka : #8
Welcome to Kafka : #9
Welcome to Kafka : #10



 

Monday, 11 March 2019

Kafka - Configuring Multiple Servers in Single Linux System

// Here I'm describing how to configure Multiple Kafka Server instances in Single Linux System.
// I will be having 3 different server.properties files with different configurations for each

start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)



default zookeeper port number is : 2181

Make a copy of server.properties and rename them into :
server1.properties,
server2.properties,
server3.properties respectively

Open individual file and change the port number mentioned below.


sudo gedit server1.properties
broker.id=1
zoo keeper port : 2181
log.dirs=/tmp/k1/kafka-logs
listeners=PLAINTEXT://:9093

sudo gedit server2.properties
broker.id=2
zoo keeper port : 2181
log.dirs=/tmp/k2/kafka-logs
listeners=PLAINTEXT://:9094

sudo gedit server3.properties
broker.id=3
zoo keeper port : 2181
log.dirs=/tmp/k3/kafka-logs
listeners=PLAINTEXT://:9095

Open 3 new terminals and run each lines in each terminals
#1
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server1.properties
[2019-03-11 21:50:24,652] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
#2
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server2.properties
[2019-03-11 21:50:41,541] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
#3
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server3.properties
[2019-03-11 21:50:55,067] INFO [KafkaServer id=3] started (kafka.server.KafkaServer)

// 3 different kafka server instances are running
hadoop@hadoop:/usr/local/kafka$ jps
5792 QuorumPeerMain
4501 NameNode
7653 Kafka  // #1
7303 Kafka  // #2
4695 DataNode
5417 NodeManager
5228 ResourceManager
8350 Jps
7999 Kafka  // #3
4927 SecondaryNameNode


// Currently running Kafka server instances are 3. But here I specified replication-factor as 4. It got failed
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic topic1 --partitions 3 --replication-factor 4 --zookeeper localhost:2181
Error while executing topic command : Replication factor: 4 larger than available brokers: 3.
[2019-03-11 21:54:58,039] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3.
 (kafka.admin.TopicCommand$)


// Create a new topic named as : topic1 with 3 partitions and replications
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic topic1 --partitions 3 --replication-factor 3 --zookeeper localhost:2181
Created topic "topic1".

// see the topic description
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --describe topic1 --zookeeper localhost:2181
Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: topic1 Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1

// Isr : In Synch Replica

// create a new topic named : topic2 with 2 replicas
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic topic2 --partitions 3 --replication-factor 2 --zookeeper localhost:2181
Created topic "topic2".

// Describe topics
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs: // 3 replicas
Topic: topic1 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: topic1 Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1

Topic:topic2 PartitionCount:3 ReplicationFactor:2 Configs: // 2 replicas
Topic: topic2 Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: topic2 Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: topic2 Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2


// console producer to send data to broker
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --topic topic1 --broker-list localhost:9093
>kafka1
>kafka2
>kafka3
>kafka4
>kafka5
>kafka6
>kafka7
>kafka8
>kafka9
>kafka10

// This is : How the data got distributed?
hadoop@hadoop:/usr/local/kafka$ a$ bin/kaftopics.sh --describe topic1 --zookeeper localhost:2181
Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: topic1 Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1


hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --from-beginning
kafka2
kafka5
kafka8

kafka3
kafka6
kafka9
kafka1
kafka4
kafka7
kafka10
^CProcessed a total of 11 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9094 --from-beginning
kafka2
kafka5
kafka8

kafka3
kafka6
kafka9
kafka1
kafka4
kafka7
kafka10
^CProcessed a total of 11 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9095 --from-beginning
kafka2
kafka5
kafka8

kafka3
kafka6
kafka9
kafka1
kafka4
kafka7
kafka10

bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 1 --offset earliest
kafka2
kafka5
kafka8




hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 0 --offset earliest
kafka3
kafka6
kafka9


hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 1 --offset earliest
kafka2
kafka5
kafka8


hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 2 --offset earliest
kafka1
kafka4
kafka7
kafka10


Consumer Group:
---------------

If set of consumers are trying to read data from same topic,
but from different partitions these consumers must belongs to same group



sudo gedit consumer.properties
------------------------------
#consume group id
group.id=group1


B - Broker
p - partition
m - message / records

Partition replicated across:
---------------------------
p1 -> B1,B2,B3
p2 -> B2,B3,B1
p3 -> B3,B1,B2

Broker -> Partition -> Messages (round robin)
B1 -> p1 ->> m1,m4,m7
B2 -> p2 ->> m2,m5,m8
B3 -> p3 ->> m3,m6,m9

Offset mapping:
offset (0) -> m1,m2,m3
offset (1) -> m4,m5,m6
offset (2) -> m7, m8, m9



hadoop@hadoop:/usr/local/kafka$  bin/kafka-topics.sh --create --topic topic1 --partitions 3 --replication-factor 3 --zookeeper localhost:2181
Created topic "topic1".
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: topic1 Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

// producer is writing data into topics
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --topic topic1 --broker-list localhost:9093
>i love india
>2. i love pakistan
>3. i love south africa
>4. i love srilanka
>5. i love singapore
>6. i love malaysia
>7. i love indonesia
>8. i love thailand
>9. i love australia
>10. i love newzealand
>^Chadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --topic topic1 --broker-list localhost:9093
>amala
>paul
>vimal
>kalavani
>oviya

// consumer is reading data from topics
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --from-beginning
i love india
4. i love srilanka
7. i love indonesia
10. i love newzealand
2. i love pakistan
5. i love singapore
8. i love thailand
3. i love south africa
6. i love malaysia
9. i love australia
^CProcessed a total of 10 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 0 --offset earliest
2. i love pakistan
5. i love singapore
8. i love thailand
^CProcessed a total of 3 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 1 --offset earliest
i love india
4. i love srilanka
7. i love indonesia
10. i love newzealand
^CProcessed a total of 4 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 2 --offset earliest
3. i love south africa
6. i love malaysia
9. i love australia
^CProcessed a total of 3 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 2 --offset earliest
3. i love south africa
6. i love malaysia
9. i love australia
paul
oviya


// I forcefully stopped 3rd server : hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server3.properties
^C
// so 3rd server is failed but zookeeper lets 1st or 2nd kafka server become leader for the new inputs



hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --topic topic1 --broker-list localhost:9093
>i love india
>2. i love pakistan
>3. i love south africa
>4. i love srilanka
>5. i love singapore
>6. i love malaysia
>7. i love indonesia
>8. i love thailand
>9. i love australia
>10. i love newzealand
>^Chadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --topic topic1 --broker-list localhost:9093
>amala
>paul
>vimal
>kalavani
>oviya // 3rd server failed here
>super  // new inputs without 3rd server. But 1st or 2nd server become leader for the below inputs
>star
>rajinikanth
>i love india
>kamal
>kalai
>selvam
>awesome
>nator
>mai
>




// consumer displays the results even one of the server (i.e, server3) fails
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --from-beginning
i love india
4. i love srilanka
7. i love indonesia
10. i love newzealand
vimal
super
i love india
selvam
2. i love pakistan
5. i love singapore
8. i love thailand
amala
kalavani
star
kamal
3. i love south africa
6. i love malaysia
9. i love australia
paul
oviya
rajinikanth
kalai
awesome
nator


// now 3rd server is down
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2
Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,1
Topic: topic1 Partition: 2 Leader: 1 Replicas: 3,1,2 Isr: 1,2

// There are 3 replicas. a) 1,2,3. b) 2,3,1. c) 3,1,2 ==> expected
But actual is : a ) Isr : 1,2. b) Isr : 2,1. c) Isr : 1,2 
// Because 3rd server is down - Earlier we forcefully did shutdown


//Now I am manually making server3 up:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server3.properties
// so, 3rd server is up and running now.

// I'm checking it again:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --describe --zookeeper localhost:2181
// Now see the replicas - everything perfect. 3rd server got values from Server1 and Server2

Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,1,3
Topic: topic1 Partition: 2 Leader: 1 Replicas: 3,1,2 Isr: 1,2,3


Sunday, 17 February 2019

Producer and Consumer Programming in Spark Streaming

Kafka_Producer.sc:
------------------


package Kafka_Stream

import java.util.Properties
import scala.util.Random
import org.apache.kafka.clients.producer.{KafkaProducer,ProducerRecord}

object Kafka_Producer {
  def main(args: Array[String]): Unit = {
    val brokers = "localhost:9092"

    //change the topic name if required
    val topic = "kafka-sparkstream-test"
    val messagesPerSec = 1
    val wordsPerMessage = 15
    val random = new Random()
    val names = Seq("Hi", "Hello", "How", "are", "you", "test", "yes", "no", "fine", "great", "ok", "not", "that")

    //ZooKeeper connection properties
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    //send some messages
    while (true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => random.shuffle(names).head).mkString(" ")
        println(str)
        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }
      Thread.sleep(1000)
    }
  }
}

build.properties:
-----------------
sbt.version = 1.2.8

build.sbt dependency packages:
--------------------------------
name := "TwitInt"

version := "0.1"

scalaVersion := "2.11.12"


// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"





Kafka_Stream.sc:
---------------
package Kafka_Stream
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils

import org.apache.log4j.Logger
import org.apache.log4j.Level

object Kafka_Stream{
  def main(args:Array[String]):Unit={
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)

    val conf = new SparkConf()
    conf.set("spark.master","local[3]")
    conf.set("spark.app.name","KafkaReceiver")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(20))

    val KafkaStream = KafkaUtils.createStream(ssc,"localhost:2181","spark-streaming-consumer-group",Map("kafka-sparkstream-test" -> 5))

    //need to change the topic name and port number accordingly
    val words = KafkaStream.flatMap(x => x._2.split(" "))
    val wordCounts = words.map(x => (x,1)).reduceByKey(_+_)

   // KafkaStream.print() // prints the Stream of data received
    wordCounts.print() // prints the wordcount result of the streaming


    ssc.start()
    ssc.awaitTermination()
  }
}

Step 1:

//Make sure zookeeper is running:

hadoop@hadoop:/usr/local/kafka$ sh bin/zookeeper-server-start.sh config/zookeer.properties


Step 2:

//Start the kafka server:

hadoop@hadoop:/usr/local/kafka$  sh bin/kafka-server-start.sh config/server.properties



Step 3:
// create kafka topic

hadoop@hadoop:/usr/local/kafka/bin$ sh kafka-topics.sh --create --topic kafka-sparkstream-test --zookeeper localhost:2181 --replication-factor 1 --partitions 3
Created topic "kafka-sparkstream-test".

// list the kafka topics to verify
hadoop@hadoop:/usr/local/kafka/bin$ sh kafka-topics.sh --zookeeper localhost:2181 --list
kafka-sparkstream-test






hadoop@hadoop:/tmp/kafka-logs/kafka-sparkstream-test-0$ ls -l
total 20488
-rw-r--r-- 1 hadoop hadoop 10485760 Feb 17 20:52 00000000000000000000.index
-rw-r--r-- 1 hadoop hadoop     2109 Feb 17 20:56 00000000000000000000.log
-rw-r--r-- 1 hadoop hadoop 10485756 Feb 17 20:52 00000000000000000000.timeindex
-rw-r--r-- 1 hadoop hadoop        8 Feb 17 20:52 leader-epoch-checkpoint


[IJ]sbt:TwitInt> compile


sbt>> package

[IJ]sbt:TwitInt> show discoveredMainClasses
[info] * IntTwit
[info] * Kafka_Stream.Kafka_Producer
[info] * Kafka_Stream.Kafka_Stream
[success] Total time: 2 s, completed 17 Feb, 2019 9:29:23 PM


<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.11</artifactId>
    <version>1.6.3</version>
</dependency>
org.apache.spark:spark-streaming-kafka_2.11:1.6.3


//start an  instance of spark shell
$ spark-shell --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3

scala> :load Kafka_Producer.scala

scala> Kafka_Producer.main(null)
ok you test are How you not you that ok great you are How How
How fine How not test yes fine not Hi great great great that fine great
that Hello ok fine yes you yes you not fine that are yes Hello that
not great test you fine are are no that ok not no Hello that test
test ok that no fine that that How not great ok ok ok not Hello


//create new instance of spark-shell
$ spark-shell --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3

scala> :load Kafka_Stream.scala
Loading Kafka_Stream.scala...
Kafka_Stream.scala:1: error: illegal start of definition
package Kafka_Stream
^
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
defined object Kafka_Stream

scala> Kafka_Stream.main(null)

// result of wordcount will be displayed here.

spark-submit --class Kafka_Stream.Kafka_Producer /home/hadoop/Desktop/vow/TwitInt/target/scala-2.11/twitint_2.11-0.1.jar

spark-submit --class Kafka_Stream.Kafka_Stream /home/hadoop/Desktop/vow/TwitInt/target/scala-2.11/twitint_2.11-0.1.jar

Friday, 15 February 2019

Kafka Console Producer and Consumer Step by Step

Kafka
-------
ZooKeeper
Broker
Topic
Partition
Replica

Producer

Consumer

[cloudera@quickstart bin]$ su
Password: cloudera

[root@quickstart bin]# pwd
/home/cloudera/kafka_2.11-1.0.0/bin

[root@quickstart kafka_2.11-1.0.0]# cd config

[root@quickstart config]# ls
connect-console-sink.properties    consumer.properties
connect-console-source.properties  log4j.properties
connect-distributed.properties     producer.properties
connect-file-sink.properties       server.properties
connect-file-source.properties     tools-log4j.properties
connect-log4j.properties           zookeeper.properties
connect-standalone.properties


All broker should refer the same port number mentioned in ZooKeeper


Spark Streaming and Kafka Integration steps:
1. start ZooKeeper server
2. Start Kafka Brokers (one or more)
3. Create Topic
4. Start Console Producer (To write Message into Broker's topic)
5. start console consumer (To test)
6. create spark streaming context, which streams from kafka topic
7. perform transfromations or aggregations
8. output operation


[root@quickstart kafka_2.11-1.0.0]# gedit config/zookeeper.properties
change port number : 2181 to 2182
save it and close it

pwd
/home/cloudera/kafka_2.11-1.0.0
[root@quickstart kafka_2.11-1.0.0]#

start zookeeper server:
Step #1: sh bin/zookeeper-server-start.sh config/zookeeper.properties
(at the end : [2018-10-20 01:55:47,072] INFO binding to port 0.0.0.0/0.0.0.0:2182 (org.apache.zookeeper.server.NIOServerCnxnFactory)


[root@quickstart kafka_2.11-1.0.0]# gedit config/server.properties
change zookeeper.connect=localhost:2181 ==> zookeeper.connect=localhost:2182

start Kafka Server:
step #2: # sh bin/kafka-server-start.sh config/server.properties
(at the end : INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

create a kafka topic:
step #3: sh bin/kafka-topics.sh --create  --zookeeper quickstart.cloudera:2182 --replication-factor 1 --partitions 3 --topic testtopic
sh bin/kafka-topics.sh --create  --zookeeper quickstart.cloudera:2182 --replication-factor 1 --partitions 3 --topic mytopic
Created topic "mytopic".


to list available topic in kafka broker:
step #4: sh bin/kafka-topics.sh --list --zookeeper localhost:2182
mytopic
testtopic

to write messages into kafka topic: (with the help of kafka console producer)
step #5: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
>hello
>how are you
>bye

step #6: sh bin/kafka-console-consumer.sh --zookeeper quickstart.cloudera:2182 --topic mytopic --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
bye
hello
how are you


Monday, 10 December 2018

Kafka Notes

Kafka:
-------
Go to /home/cloudera

Download Kafka from :
wget http://www-eu.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz


Run Terminal
tar -xvzf kafka_2.11-1.0.0.tgz

cd kafka_2.11-1.0.0
cd config

now pwd is :
/home/cloudera/kafka_2.11-1.0.0/config

gedit server.properties

search and ensure this is there : zookeeper.connect=localhost:2181

search for 2181 and verify it is there or not. if not add it


Now goto ZooKeeper:
$ cd /usr/lib/zookeeper/bin

start ZoopKeeper
sh zkCli.sh


Now goto Kafka installed folder:
cd /home/cloudera/kafka_2.11-1.0.0/bin/

start kafka
nohup sh kafka-server-start.sh /home/cloudera/kafka_2.11-1.0.0/config/server.properties &

tail -f nohup.outt  -

sh kafka-topics.sh --create --topic Payment_BLR --zookeeper quickstart.cloudera:2181 --replication-factor 1 --partitions 3

sh kafka-console-producer.sh --broker-list quickstart.cloudera:9092 --topic Payment_BLR


sh kafka-console-consumer.sh --zookeeper quickstart.cloudera:2182 --topic mytopic --from-beginning


-----------------------------------------------  May be later use ------------------------------------------------
Here I got : JAVA Unsupported major.minor version 51.0 exception

so, trying to resolve it

Network Issue

checking the mirrorlist server is accessible.

$ ping mirrorlist.centos.org

If you might meet the error message below.

ping: unknown host mirrorlist.centos.org

To solve this, adding a valid nameserver into resolv.conf (use sudo if you are not root user)
$  su
$  nameserver 8.8.8.8 > /etc/resolv.conf

install wget :
sudo yum install wget

install java:
sudo yum install java-1.8.0-openjdk
----------------------------------------- For Future Use -------------------------------------


start kafka
nohup sh kafka-server-start.sh /home/cloudera/kafka_2.11-2.0.0/config/server.properties &

tail -f nohup.out
https://downloads.cloudera.com/demo_vm/vmware/cloudera-quickstart-vm-5.13.0-0-vmware.zip


How to access Windows Shared folder contents within VM?
Devices - Network  - Network Settings - Shared Folders - Add (+) - select windows shared folder location
Install : Insert Guest Additions CD Image
Open Terminal within Cent OS

mount -t vboxsf Tut Tutorials

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...