Showing posts with label injtellij idea. Show all posts
Showing posts with label injtellij idea. Show all posts

Wednesday, 26 August 2020

SBT build tool - Pass command line argument to Scala Main


// program expects command line arguments

object my1st {
  def main(args:Array[String]) : Unit = {
    println("Hello " + args(0))
  }
}


sbt package --> To build jar file

C:\Users\sankara\IdeaProjects\my1st>sbt package
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
[info] welcome to sbt 1.3.13 (Oracle Corporation Java 1.8.0_202)
[info] loading project definition from C:\Users\sankara\IdeaProjects\my1st\project
[info] loading settings for project my1st from build.sbt ...
[info] set current project to my1st (in build file:/C:/Users/sankara/IdeaProjects/my1st/)
[info] Compiling 1 Scala source to C:\Users\sankara\IdeaProjects\my1st\target\scala-2.11\classes ...
[success] Total time: 4 s, completed 26 Aug, 2020 1:46:21 PM


sbt "runMain objectName Argument1" --> to run the jar file with passing the command line arguments

C:\Users\sankara\IdeaProjects\my1st>sbt "runMain my1st India"
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
[info] welcome to sbt 1.3.13 (Oracle Corporation Java 1.8.0_202)
[info] loading project definition from C:\Users\sankara\IdeaProjects\my1st\project
[info] loading settings for project my1st from build.sbt ...
[info] set current project to my1st (in build file:/C:/Users/sankara/IdeaProjects/my1st/)
[info] running my1st India
Hello India
[success] Total time: 4 s, completed 26 Aug, 2020 1:50:13 PM

How to build jar file and run the jar using SBT?

// Sample program
package my1st

object my1st {
  def main(args:Array[String]) : Unit = {
    println("Hello World")
  }
}


SBT - Scala Build Tool (Simple Build Tool) / Simple Build Tool

Download and install sbt from here : https://www.scala-sbt.org/download.html



Open the project in IntelliJ Idea
Right click the project name - Copy - Copy Path
Select : Absolute Path : C:\Users\sankara\IdeaProjects\my1st
Open Windows Explorer
Do paste in address bar
Shift + Right click on the free space 
Open command prompt here


C:\Users\sankara\IdeaProjects\my1st>dir
 Volume in drive C has no label.
 Volume Serial Number is 363F-334D

 Directory of C:\Users\sankara\IdeaProjects\my1st

26-08-2020  12:44    <DIR>          .
26-08-2020  12:44    <DIR>          ..
26-08-2020  13:21    <DIR>          .idea
26-08-2020  12:42               328 build.sbt
26-08-2020  12:43    <DIR>          project
26-08-2020  12:40    <DIR>          src
26-08-2020  12:44    <DIR>          target
               1 File(s)            328 bytes
               6 Dir(s)  334,877,937,664 bytes free

sbt package --> to build jar file

C:\Users\sankara\IdeaProjects\my1st>sbt package
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; sup
port was removed in 8.0
[info] welcome to sbt 1.3.13 (Oracle Corporation Java 1.8.0_202)
[info] loading project definition from C:\Users\sankara\IdeaProjects\my1st\project
[info] loading settings for project my1st from build.sbt ...
[info] set current project to my1st (in build file:/C:/Users/sankara/IdeaProjects/my1st/)
[success] Total time: 2 s, completed 26 Aug, 2020 1:31:33 PM

sbt run --> to run the jar 

C:\Users\sankara\IdeaProjects\my1st>sbt run
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
[info] welcome to sbt 1.3.13 (Oracle Corporation Java 1.8.0_202)
[info] loading project definition from C:\Users\sankara\IdeaProjects\my1st\project
[info] loading settings for project my1st from build.sbt ...
[info] set current project to my1st (in build file:/C:/Users/sankara/IdeaProjects/my1st/)
[info] running my1st.my1st
Hello World
[success] Total time: 3 s, completed 26 Aug, 2020 1:32:34 PM

Wednesday, 12 August 2020

IntelliJ IDEA : Spark program and Jar File creation in Windows and Run the JAR in Cloudera

// Create the Spark program using IntelliJ idea in Windows 

package my
import org.apache.spark.sql.SparkSession

object my {
  def main(args:Array[String]) : Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("Ex")
      .getOrCreate()
    val df = spark.read.format("json").load("hdfs://quickstart.cloudera:8020/user/cloudera/customers.json")
    df.printSchema()
    df.show()
  }
}


build.sbt:
----------
name := "myspark"

version := "0.1"

scalaVersion := "2.11.12"

// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.6"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.6"


// to make the jar file 
sbt> package --> to generate a jar file 


.jar file will be created in :  IdeaProjects\myspark\target\scala-2.11\myspark_2.11-0.1.jar

copy the myspark_2.11-0.1.jar file from Windows to Cloudera VM using WinSCP.


// run the spark-submit in Cloudera VM 
spark-submit --master local --driver-memory 2g --executor-memory  2g  --class my.my myspark_2.11-0.1.jar


root
 |-- emailAddress: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- phoneNumber: string (nullable = true)
 |-- userId: long (nullable = true)


+--------------------+---------+--------+-----------+------+
|        emailAddress|firstName|lastName|phoneNumber|userId|
+--------------------+---------+--------+-----------+------+
|krish.lee@learnin...|    Krish|     Lee|     123456|     1|
|racks.jacson@lear...|    racks|  jacson|     123456|     2|
|denial.roast@lear...|   denial|   roast|   33333333|     3|
|devid.neo@learnin...|    devid|     neo|  222222222|     4|
|jone.mac@learning...|     jone|     mac|  111111111|     5|
+--------------------+---------+--------+-----------+------+

Wednesday, 13 March 2019

Kafka : Call Logs SUCCESS, FAILED, DROPPED - Producer and Consumer Programming in Scala

input file:
calllogdata.txt:
-----------------
ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:05DROPPED 80526900577757919463
ec59cea2-5006-448f-a032-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:07DROPPED 98861773759916790556
ec59cea2-5006-448f-a033-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:45SUCCESS 86186279969886177375
ec59cea2-5006-448f-a034-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:53DROPPED 98765156164894949494
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a036-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:05SUCCESS 12354678902153698431
ec59cea2-5006-448f-a037-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80556456458478787877
ec59cea2-5006-448f-a038-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80809056095676236992
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a040-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 96090652158087080806
ec59cea2-5006-448f-a041-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 89797946465879874615
ec59cea2-5006-448f-a042-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05SUCCESS 45454545457978978979
ec59cea2-5006-448f-a043-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 74584564564564564656
ec59cea2-5006-448f-a044-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 98794894945648947898
ec59cea2-5006-448f-a045-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05SUCCESS 84645645605646064646
ec59cea2-5006-448f-a046-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 78545456456456456456
ec59cea2-5006-448f-a047-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 57554548979797979797
ec59cea2-5006-448f-a048-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 87898640989489089409
ec59cea2-5006-448f-a049-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 75884848478978978979
ec59cea2-5006-448f-a050-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 74894086489489489489
ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:05DROPPED 80526900577757919463
ec59cea2-5006-448f-a032-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:07DROPPED 98861773759916790556
ec59cea2-5006-448f-a033-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:45SUCCESS 86186279969886177375
ec59cea2-5006-448f-a034-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:53DROPPED 98765156164894949494
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a036-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:05SUCCESS 12354678902153698431
ec59cea2-5006-448f-a037-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80556456458478787877
ec59cea2-5006-448f-a038-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80809056095676236992
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a040-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 96090652158087080806
ec59cea2-5006-448f-a041-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 89797946465879874615
ec59cea2-5006-448f-a042-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05SUCCESS 45454545457978978979
ec59cea2-5006-448f-a043-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 74584564564564564656
ec59cea2-5006-448f-a044-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 98794894945648947898
ec59cea2-5006-448f-a045-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05SUCCESS 84645645605646064646
ec59cea2-5006-448f-a046-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 78545456456456456456
ec59cea2-5006-448f-a047-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 57554548979797979797
ec59cea2-5006-448f-a048-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 87898640989489089409
ec59cea2-5006-448f-a049-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 75884848478978978979
ec59cea2-5006-448f-a050-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 74894086489489489489
ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:05DROPPED 80526900577757919463
ec59cea2-5006-448f-a032-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:07DROPPED 98861773759916790556
ec59cea2-5006-448f-a033-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:45SUCCESS 86186279969886177375
ec59cea2-5006-448f-a034-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:53DROPPED 98765156164894949494
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a036-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:05SUCCESS 12354678902153698431
ec59cea2-5006-448f-a037-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80556456458478787877
ec59cea2-5006-448f-a038-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80809056095676236992
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a040-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 96090652158087080806
ec59cea2-5006-448f-a041-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 89797946465879874615
ec59cea2-5006-448f-a042-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05SUCCESS 45454545457978978979
ec59cea2-5006-448f-a043-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 74584564564564564656
ec59cea2-5006-448f-a044-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 98794894945648947898
ec59cea2-5006-448f-a045-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05SUCCESS 84645645605646064646
ec59cea2-5006-448f-a046-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 78545456456456456456
ec59cea2-5006-448f-a047-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 57554548979797979797
ec59cea2-5006-448f-a048-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 87898640989489089409
ec59cea2-5006-448f-a049-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 75884848478978978979
ec59cea2-5006-448f-a050-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 74894086489489489489

start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties


bin/kafka-topics.sh --create --topic SUCCESS_RECORDS --partitions 1 --replication-factor 1 --zookeeper localhost:2181
bin/kafka-topics.sh --create --topic FAILED_RECORDS --partitions 1 --replication-factor 1 --zookeeper localhost:2181
bin/kafka-topics.sh --create --topic DROPPED_RECORDS --partitions 1 --replication-factor 1 --zookeeper localhost:2181


hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2181
DROPPED_RECORDS
FAILED_RECORDS
SUCCESS_RECORDS


build.properties:
-----------------
sbt.version = 1.2.8

build.sbt dependency packages:
--------------------------------
name := "myOwn"

version := "0.1"

scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"



import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}

object KafkaProducerCallLog {
  def main(args:Array[String]):Unit = {
    val props = new Properties()
    val topic = "KafkaTopic"
    props.put("bootstrap.servers","localhost:9092")
    props.put("client.id","ProducerApp")
    props.put("batch.size","32768")
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    val topicSUCCESS = "SUCCESS_RECORDS"
    val topicFAILED = "FAILED_RECORDS"
    val topicDROPPED = "DROPPED_RECORDS"

    val producer = new KafkaProducer[String,String](props)

    val file = scala.io.Source.fromFile("/home/hadoop/Desktop/vow/calllogdata.txt")

    for (line <- file.getLines()) {
      val status_pattern = "(SUCCESS|FAILED|DROPPED)".r
      val status = status_pattern.findFirstIn(line).get
      if (status == "SUCCESS") {
        val msg = new ProducerRecord[String,String](topicSUCCESS,line)
        producer.send(msg)
        println(msg)
      }
      else if (status == "FAILED") {
        val msg = new ProducerRecord[String,String](topicFAILED,line)
        producer.send(msg)
        println(msg)
      }
      else
      {
        val msg = new ProducerRecord[String,String](topicDROPPED,line)
        producer.send(msg)
        println(msg)
      }
    }
    producer.close()
  }
}





bin/kafka-console-consumer.sh --topic SUCCESS_RECORDS --bootstrap-server localhost:9092 --from-beginning

bin/kafka-console-consumer.sh --topic FAILED_RECORDS --bootstrap-server localhost:9092 --from-beginning
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469


bin/kafka-console-consumer.sh --topic DROPPED_RECORDS --bootstrap-server localhost:9092 --from-beginning


import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer

import scala.collection.JavaConversions._

object KafkaConsumerExa1 {
  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    properties.put("bootstrap.servers", "127.0.0.1:9092")
    properties.put("group.id", "testGroup1")

    properties.put("client.id", "ConsumerApp")
    // properties.put("partition.assignment.strategy", "range");



    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val consumer = new KafkaConsumer[String, String](properties)
    val topic = "FAILED_RECORDS"


    consumer.subscribe(Collections.singletonList(topic))

    System.out.println("Subscribed to topic " + topic)
 
   var records = consumer.poll(4000)
     consumer.seekToBeginning(consumer.assignment)
    records = consumer.poll(4000)
      for (record <- records.iterator()){
        println("Received Message : "  + record)
      }
 
    consumer.commitSync()
  }
}

Tuesday, 12 March 2019

Kafka : Producer, Consumer Programming in Scala with Multi Server Configuration

// Here we are going to do Scala Programming for Kafka Producer, Kafka Consumer Programming in Scala with Multi Server Configuration

start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)



default zookeeper port number is : 2181

Make a copy of server.properties and rename it into server1.properties,server2.properties,server3.properties respectively

Open individual file and change the port number mentioned below.

sudo gedit server0.properties
broker.id=0
zoo keeper port : 2181
log.dirs=/tmp/k1/kafka-logs
listeners=PLAINTEXT://:9090

sudo gedit server1.properties
broker.id=1
zoo keeper port : 2181
log.dirs=/tmp/k1/kafka-logs
listeners=PLAINTEXT://:9091

sudo gedit server2.properties
broker.id=2
zoo keeper port : 2181
log.dirs=/tmp/k2/kafka-logs
listeners=PLAINTEXT://:9092

sudo gedit server3.properties
broker.id=3
zoo keeper port : 2181
log.dirs=/tmp/k3/kafka-logs
listeners=PLAINTEXT://:9093

Open 4 new terminals and run each lines in each terminals
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server0.properties
INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server1.properties
INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server2.properties
INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server3.properties
INFO [KafkaServer id=3] started (kafka.server.KafkaServer)


 

// 4 different kafka server instances are running
hadoop@hadoop:/usr/local/kafka$ jps
6384 Jps
4851 Kafka  // #instance 1
6243 Main
4163 Kafka // #instance 2
3492 QuorumPeerMain
4504 Kafka // #instance 3
5196 Kafka // #instance 4




// Create a new topic named as : myTopic with 4 partitions and 4 replications
hadoop@hadoop:/usr/local/kafka$  bin/kafka-topics.sh --create --topic myTopic --partitions 4 --replication-factor 4 --zookeeper localhost:2181
Created topic "myTopic".


// see the topic : myTopic description
bin/kafka-topics.sh --describe  --zookeeper localhost:2181
Topic:myTopic PartitionCount:4 ReplicationFactor:4 Configs:
Topic: myTopic Partition: 0 Leader: 3 Replicas: 3,0,1,2 Isr: 3,0,1,2
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1,2,3 Isr: 0,1,2,3
Topic: myTopic Partition: 2 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
Topic: myTopic Partition: 3 Leader: 2 Replicas: 2,3,0,1 Isr: 2,3,0,1




build.properties:
-----------------
sbt.version = 1.2.8

build.sbt dependency packages:
--------------------------------
name := "Kafka"

version := "0.1"

scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"




// Producer Programming in Scala
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}

object KafkaProducerExa2 {
  def main(args:Array[String]):Unit = {

    val props = new Properties()
    val topic = "myTopic"

    props.put("bootstrap.servers","192.168.0.106:9090,192.168.0.106:9091,192.168.0.106:9092,192.168.0.106:9093")
    props.put("acks","all")
    props.put("client.id","ProducerApp")
    props.put("retries","4")
    props.put("batch.size","32768")

    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String,String](props)
    val msg:String = "Welcome to Kafka : #"
    for (i <- 1 to 10){
      val data = new ProducerRecord[String,String](topic,msg+i.toString)
      producer.send(data)
    }
    producer.close()
    println("------Successfully published messages to topic : " + topic + "----")
  }
}




Run the program in IntelliJ IDEA.

------Successfully published messages to topic : myTopic----


// view the output in console:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic myTopic --bootstrap-server localhost:9090 --from-beginning
Welcome to Kafka : #3
Welcome to Kafka : #7
Welcome to Kafka : #1
Welcome to Kafka : #5
Welcome to Kafka : #9
Welcome to Kafka : #2
Welcome to Kafka : #6
Welcome to Kafka : #10
Welcome to Kafka : #4
Welcome to Kafka : #8



//Consumer Programming in Scala
import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer

import scala.collection.JavaConversions._

object KafkaConsumerExa1 {
  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    properties.put("bootstrap.servers", "192.168.0.106:9091")
    properties.put("group.id", "testGroup")
    properties.put("client.id", "ConsumerApp")

    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val consumer = new KafkaConsumer[String, String](properties)
    val topic = "myTopic"


  consumer.subscribe(Collections.singletonList(topic))
    System.out.println("Subscribed to topic " + topic)
    var records = consumer.poll(4000)
    consumer.seekToBeginning(consumer.assignment)
    records = consumer.poll(4000)
    for (record <- records.iterator()){
      println("Received Message : "  + record)
    }
    consumer.commitSync()
  }
}

Output:
--------
Subscribed to topic myTopic
Received Message : ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 1, offset = 4, CreateTime = 1552402871872, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #4)
Received Message : ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 1, offset = 5, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #8)
Received Message : ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 2, offset = 6, CreateTime = 1552402871872, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #2)
Received Message : ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 2, offset = 7, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #6)
Received Message : ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 2, offset = 8, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #10)
Received Message : ConsumerRecord(topic = myTopic, partition = 1, leaderEpoch = 1, offset = 6, CreateTime = 1552402871872, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #3)
Received Message : ConsumerRecord(topic = myTopic, partition = 1, leaderEpoch = 1, offset = 7, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #7)
Received Message : ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 1, offset = 4, CreateTime = 1552402871830, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #1)
Received Message : ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 1, offset = 5, CreateTime = 1552402871872, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #5)
Received Message : ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 1, offset = 6, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #9)


Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...