Showing posts with label flume. Show all posts
Showing posts with label flume. Show all posts

Friday, 22 February 2019

Spark Streaming with Flume Integration Example

Start IntelliJ in Windows:
------------------------

build.sbt:
----------
name := "SparkSampleProgram"

version := "0.1"

scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"

// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume-assembly
libraryDependencies += "org.apache.spark" %% "spark-streaming-flume-assembly" % "2.3.2"


build.properties:
-----------------
sbt.version = 1.0.3


Add scala file : FlumeSpark.sc
--------------------------------
import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level
object FlumeSpark {
  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("spark=flumeintegeration(pure based approach").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val BatchInterval = 20
    val host = "192.168.0.100"
    val portno = 7777
    val ssc = new StreamingContext(sc, Seconds(BatchInterval))

    //Pulling the data from flume application
    val flumedata = FlumeUtils.createStream(ssc, host, portno)
    val res = flumedata.map { x =>
      val event = x.event
      val messageBody = new String(event.getBody.array())
      messageBody
    }
    res.print()
    ssc.start()
    ssc.awaitTermination()
  }
}



In Ubuntu Linux:
----------------
// We are going to run flume in Ubuntu
// In windows we are going to run Spark
// Find the windows local Machine's IPv4 Address
Ethernet adapter Ethernet:

   Connection-specific DNS Suffix  . :
   Link-local IPv6 Address . . . . . : fe80::90c1:53dd:fb3d:51d%3
   IPv4 Address. . . . . . . . . . . : 192.168.0.100
   // we are going to mention it in  agent1.sinks.flumesink.hostname


//Ping to verify it within VM where we are going to run flume 
hadoop@hadoop:~/Desktop/vow$ ping 192.168.0.100
PING 192.168.0.100 (192.168.0.100) 56(84) bytes of data.
64 bytes from 192.168.0.100: icmp_seq=1 ttl=128 time=0.371 ms

hadoop@hadoop:~/Desktop/vow$ gedit flumespark1.conf

flumespark1.conf:
-------------------
//creation of components
agent1.sources = flumesource
agent1.channels = flumechannel
agent1.sinks = flumesink

//Source Configuration
agent1.sources.flumesource.type = netcat
agent1.sources.flumesource.bind = localhost
agent1.sources.flumesource.port = 1234
agent1.sources.flumesource.channels = flumechannel

//Channel Configuration
agent1.channels.flumechannel.type = memory
agent1.channels.flumechannel.capacity=1000
agent1.channels.flumechannel.transactionCapacity=100

//Sink Configuration
agent1.sinks.flumesink.type = avro
agent1.sinks.flumesink.hostname = 192.168.0.100
agent1.sinks.flumesink.port = 7777
agent1.sinks.flumesink.channel = flumechannel





Run the flume in Ubuntu:
--------------------------------
$ flume-ng agent --name agent1 --conf /home/hadoop/Desktop/vow --conf-file /home/hadoop/Desktop/vow/flumespark1.conf -Dflume.root.logger=DEBUG,console



// Give sample text on Ubuntu console to feed Flume
curl telnet://localhost:1234
hadoop@hadoop:~/Desktop/vow$ curl telnet://localhost:1234
i love india
OK
super cool
OK






Run the spark program within IntelliJ IDEA in Windows:
------------------------------------------------------------

Output in IntelliJ console
-------------------------------------------
Time: 1550906600000 ms
-------------------------------------------
i love india // values are coming from flume

-------------------------------------------
Time: 1550906620000 ms
-------------------------------------------
super cool







Flume with Telnet input as source

Before starting Flume with Telnet, make sure you installed telnet.
Or do follow the steps below to install Telnet in Ubuntu.

sudo apt-get -y install telnet
sudo apt-get install xinetd telnetd
sudo /etc/init.d/xinetd restart

sudo apt install net-tools
sudo apt install curl


flume.conf
----------------
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
                           
// Make sure Hadoop daemons are running currently. Else do start-all.sh
//Run Flume
flume-ng agent -n a1 -c /home/hadoop/Desktop/vow -f /home/hadoop/Desktop/vow/flume.conf

2019-02-23 07:21:12,088 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
2019-02-23 07:22:54,114 INFO sink.LoggerSink: Event: { headers:{} body: 69 20 6C 6F 76 65 20 69 6E 64 69 61             i love india }
2019-02-23 07:22:56,003 INFO sink.LoggerSink: Event: { headers:{} body: 77 68 6F 20 69 73 20 62 65 61 75 74 79 3F       who is beauty? }

//Run Telnet
curl telnet://localhost:44444
hadoop@hadoop:~/Desktop/vow$ curl telnet://localhost:44444
i love india
OK
who is beauty?
OK

Saturday, 22 December 2018

Flume - Simple Demo

// create a folder in hdfs :

$ hdfs dfs -mkdir /user/flumeExa

// Create a shell script which generates : Hadoop in real world <n>

hadoop@hadoop:~/Desktop/vow$ cat > loopThrough.sh
rm logfile.log
i=0
while :
do
  echo Hadoop in real world $i >> logfile.log
  i=`expr $i + 1`
  sleep 5
done
^C

// Run the shell script to keep on appending into logfile.log:

hadoop@hadoop:~/Desktop/vow$ sh loopThrough.sh

// create a flume configuration file named : simple-flume.conf
-----------------------------------------------------------------

# Flume Components
agent.sources = tail-source
agent.sinks = hdfs-sink
agent.channels = memory-channel

# Source
agent.sources.tail-source.type = exec
agent.sources.tail-source.command = tail -f logfile.log
agent.sources.tail-source.channels = memory-channel

# Sink
agent.sinks.hdfs-sink.type = hdfs
agent.sinks.hdfs-sink.hdfs.path = user/flumeExa 
agent.sinks.hdfs-sink.hdfs.fileType = DataStream
agent.sinks.hdfs-sink.channel = memory-channel

# Channel
agent.channels.memory-channel.type = memory



// Run the flume
//Here flume reads the logfile.log content and write it into hdfs location : user/flumeExa

flume-ng agent --conf /home/hadoop/Desktop/vow/ -f /home/hadoop/Desktop/vow/simple-flume.conf -Dflume.root.logger=DEBUG,console -n agent

// Check the hdfs folder

hdfs dfs -ls user/flumeExa/
Found 9 items
-rw-r--r--   1 hadoop supergroup        240 2019-02-22 12:26 user/flumeExa/FlumeData.1550818590651
-rw-r--r--   1 hadoop supergroup        168 2019-02-22 12:27 user/flumeExa/FlumeData.1550818590652
-rw-r--r--   1 hadoop supergroup        168 2019-02-22 12:27 user/flumeExa/FlumeData.1550818623842
-rw-r--r--   1 hadoop supergroup        168 2019-02-22 12:28 user/flumeExa/FlumeData.1550818658881
-rw-r--r--   1 hadoop supergroup        168 2019-02-22 12:28 user/flumeExa/FlumeData.1550818693932
-rw-r--r--   1 hadoop supergroup        168 2019-02-22 12:29 user/flumeExa/FlumeData.1550818728976
-rw-r--r--   1 hadoop supergroup        168 2019-02-22 12:29 user/flumeExa/FlumeData.1550818764024
-rw-r--r--   1 hadoop supergroup        168 2019-02-22 12:30 user/flumeExa/FlumeData.1550818799065
-rw-r--r--   1 hadoop supergroup        168 2019-02-22 12:31 user/flumeExa/FlumeData.1550818834114

// see the content of a file which is created with the help of flume

hdfs dfs -cat user/flumeExa/FlumeData.1550818834114
Hadoop in real world 72
Hadoop in real world 73
Hadoop in real world 74
Hadoop in real world 75
Hadoop in real world 76
Hadoop in real world 77
Hadoop in real world 78

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...