Saturday, 16 March 2019

Json Input To Kafka Broker to Spark Streaming to MySQL using KafkaProducer, kafkaUtils.CreateStream

//Json To Kafka Broker to Spark Streaming to MySQL
// Here we read a json input file from file system and put it into Kafka Broker
// Spark Streaming fetch messages from Kafka Broker and write it into MySQL table

input file:
nameList.json:
--------------
{"id":992,"name":"Herman","city":"Iles","country":"Colombia","Skills":"CVE"},
{"id":993,"name":"Burton","city":"Santo Tomas","country":"Philippines","Skills":"VMware vSphere"},
{"id":994,"name":"Correna","city":"Shirgjan","country":"Albania","Skills":"Wyse"},
{"id":995,"name":"Cathi","city":"Dorūd","country":"Iran","Skills":"SSCP"},
{"id":996,"name":"Lena","city":"Al Judayrah","country":"Palestinian Territory","Skills":"Commercial Kitchen Design"},
{"id":997,"name":"Madalena","city":"Livadiya","country":"Ukraine","Skills":"Software Development"},
{"id":998,"name":"Jo-anne","city":"Khatsyezhyna","country":"Belarus","Skills":"TPD"},
{"id":999,"name":"Georgi","city":"Pasuruan","country":"Indonesia","Skills":"Project Engineering"},
{"id":1000,"name":"Scott","city":"Gyumri","country":"Armenia","Skills":"RHEV"}



start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties

hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic jsonTopic --partitions 1 --replication-factor 1 --zookeeper localhost:2182

//view the topics available in Kafka Broker
hadoop@hadoop:/usr/local/kafka$  bin/kafka-topics.sh --list --zookeeper localhost:2182
jsonTopic



// create a database and table in MySQL:

 $ mysql -uroot -pcloudera

Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 4
Server version: 5.7.25-0ubuntu0.18.10.2 (Ubuntu)

Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> create database KafkaDB;
Query OK, 1 row affected (0.05 sec)

mysql> use KafkaDB;
Database changed

// create a table
mysql> create table jsonTable (id int, name varchar(50), city varchar(50), country varchar(50), Skills varchar(50));
Query OK, 0 rows affected (0.20 sec)





//Produce reads json file and publish them in kafka topic
Producer Programming in Scala:
-------------------------------
JsonProducer.scala:
-------------------------
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}

object JsonProducer {
  def main(args:Array[String]):Unit = {
    val props = new Properties()

    props.put("bootstrap.servers","localhost:9092")
    props.put("acks","all")
    props.put("client.id","ProducerApp")
    props.put("retries","4")
    props.put("batch.size","32768")
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    val topic = "jsonTopic"
    val producer = new KafkaProducer[String,String](props)
    val file = scala.io.Source.fromFile("/home/cloudera/Desktop/vow/nameList.json")

    for (line <- file.getLines()) {
      val msg = new ProducerRecord[String,String](topic,line)
      producer.send(msg)
    }
    producer.close()
  }
}


[cloudera@quickstart kafka]$ spark-shell
scala> sc.stop()
scala> :load JsonProducer.scala
scala> JsonProducer.main(null)




hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic jsonTopic --bootstrap-server localhost:9092 --from-beginning

{"id":1,"name":"Sharline","city":"Uppsala","country":"Sweden","Skills":"Eyelash Extensions"},
{"id":2,"name":"Marris","city":"São Domingos","country":"Cape Verde","Skills":"VMI Programs"},
{"id":3,"name":"Gregg","city":"Qaxbaş","country":"Azerbaijan","Skills":"Historical Research"},
{"id":4,"name":"Christye","city":"Guarapari","country":"Brazil","Skills":"Army"},
{"id":5,"name":"Modesta","city":"Paltamo","country":"Finland","Skills":"Avaya Technologies"},



Kafka2MySQLStreaming.scala:
-----------------------------
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
import java.util.Properties
import org.apache.spark.sql.{SQLContext,SaveMode}
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}

object Kafka2MySQLStreaming{
def main(args:Array[String]):Unit ={

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("SparkStreamingJson").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlc = new SQLContext(sc)
val batchInterval = 20
val zk = "localhost:2182"
val consumerGroupID = "jsonGroup"
val topic = "jsonTopic"
val partition = 1
val perTopicPartitions = Map(topic -> partition)
val ssc = new StreamingContext(sc,Seconds(batchInterval))

val KafkaData = KafkaUtils.createStream(ssc,zk,consumerGroupID,perTopicPartitions)
val ks = KafkaData.map (x => x._2)
ks.foreachRDD { x =>
val df = sqlc.read.json(x)

val props = new Properties()
props.put("driver","com.mysql.jdbc.Driver")
props.put("user","root")
props.put("password","cloudera")

df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/KafkaDB","jsonTable",props)
df.count()
}
ssc.start()
ssc.awaitTermination()
}
}

[cloudera@quickstart kafka]$ spark-shell
scala> sc.stop()
scala> :load Kafka2MySQLStreaming.scala
scala> Kafka2MySQLStreaming.main(null)



// MySQL result:
--------------
[cloudera@quickstart kafka]$ mysql -uroot -pcloudera
mysql> use KafkaDB;
mysql> select * from jsonTable;
+------+------------+--------------------------+-----------------------+-------------------------------+
| id   | name       | city                     | country               | Skills                        |
+------+------------+--------------------------+-----------------------+-------------------------------+
|    1 | Sharline   | Uppsala                  | Sweden                | Eyelash Extensions            |
|    2 | Marris     | São Domingos             | Cape Verde            | VMI Programs                  |
|    3 | Gregg      | Qaxbaş                   | Azerbaijan            | Historical Research           |
|    4 | Christye   | Guarapari                | Brazil                | Army                          |
|    5 | Modesta    | Paltamo                  | Finland               | Avaya Technologies            |

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...