Start IntelliJ in Windows:
------------------------
build.sbt:
----------
name := "SparkSampleProgram"
version := "0.1"
scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume-assembly
libraryDependencies += "org.apache.spark" %% "spark-streaming-flume-assembly" % "2.3.2"
build.properties:
-----------------
sbt.version = 1.0.3
Add scala file : FlumeSpark.sc
--------------------------------
import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level
object FlumeSpark {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("spark=flumeintegeration(pure based approach").setMaster("local[2]")
val sc = new SparkContext(conf)
val BatchInterval = 20
val host = "192.168.0.100"
val portno = 7777
val ssc = new StreamingContext(sc, Seconds(BatchInterval))
//Pulling the data from flume application
val flumedata = FlumeUtils.createStream(ssc, host, portno)
val res = flumedata.map { x =>
val event = x.event
val messageBody = new String(event.getBody.array())
messageBody
}
res.print()
ssc.start()
ssc.awaitTermination()
}
}
In Ubuntu Linux:
----------------
// We are going to run flume in Ubuntu
// In windows we are going to run Spark
// Find the windows local Machine's IPv4 Address
Ethernet adapter Ethernet:
Connection-specific DNS Suffix . :
Link-local IPv6 Address . . . . . : fe80::90c1:53dd:fb3d:51d%3
IPv4 Address. . . . . . . . . . . : 192.168.0.100
// we are going to mention it in agent1.sinks.flumesink.hostname
//Ping to verify it within VM where we are going to run flume
hadoop@hadoop:~/Desktop/vow$ ping 192.168.0.100
PING 192.168.0.100 (192.168.0.100) 56(84) bytes of data.
64 bytes from 192.168.0.100: icmp_seq=1 ttl=128 time=0.371 ms
hadoop@hadoop:~/Desktop/vow$ gedit flumespark1.conf
flumespark1.conf:
-------------------
//creation of components
agent1.sources = flumesource
agent1.channels = flumechannel
agent1.sinks = flumesink
//Source Configuration
agent1.sources.flumesource.type = netcat
agent1.sources.flumesource.bind = localhost
agent1.sources.flumesource.port = 1234
agent1.sources.flumesource.channels = flumechannel
//Channel Configuration
agent1.channels.flumechannel.type = memory
agent1.channels.flumechannel.capacity=1000
agent1.channels.flumechannel.transactionCapacity=100
//Sink Configuration
agent1.sinks.flumesink.type = avro
agent1.sinks.flumesink.hostname = 192.168.0.100
agent1.sinks.flumesink.port = 7777
agent1.sinks.flumesink.channel = flumechannel
Run the flume in Ubuntu:
--------------------------------
$ flume-ng agent --name agent1 --conf /home/hadoop/Desktop/vow --conf-file /home/hadoop/Desktop/vow/flumespark1.conf -Dflume.root.logger=DEBUG,console
// Give sample text on Ubuntu console to feed Flume
curl telnet://localhost:1234
hadoop@hadoop:~/Desktop/vow$ curl telnet://localhost:1234
i love india
OK
super cool
OK
Run the spark program within IntelliJ IDEA in Windows:
------------------------------------------------------------
Output in IntelliJ console
-------------------------------------------
Time: 1550906600000 ms
-------------------------------------------
i love india // values are coming from flume
-------------------------------------------
Time: 1550906620000 ms
-------------------------------------------
super cool
------------------------
build.sbt:
----------
name := "SparkSampleProgram"
version := "0.1"
scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume-assembly
libraryDependencies += "org.apache.spark" %% "spark-streaming-flume-assembly" % "2.3.2"
build.properties:
-----------------
sbt.version = 1.0.3
Add scala file : FlumeSpark.sc
--------------------------------
import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level
object FlumeSpark {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("spark=flumeintegeration(pure based approach").setMaster("local[2]")
val sc = new SparkContext(conf)
val BatchInterval = 20
val host = "192.168.0.100"
val portno = 7777
val ssc = new StreamingContext(sc, Seconds(BatchInterval))
//Pulling the data from flume application
val flumedata = FlumeUtils.createStream(ssc, host, portno)
val res = flumedata.map { x =>
val event = x.event
val messageBody = new String(event.getBody.array())
messageBody
}
res.print()
ssc.start()
ssc.awaitTermination()
}
}
In Ubuntu Linux:
----------------
// We are going to run flume in Ubuntu
// In windows we are going to run Spark
// Find the windows local Machine's IPv4 Address
Ethernet adapter Ethernet:
Connection-specific DNS Suffix . :
Link-local IPv6 Address . . . . . : fe80::90c1:53dd:fb3d:51d%3
IPv4 Address. . . . . . . . . . . : 192.168.0.100
// we are going to mention it in agent1.sinks.flumesink.hostname
//Ping to verify it within VM where we are going to run flume
hadoop@hadoop:~/Desktop/vow$ ping 192.168.0.100
PING 192.168.0.100 (192.168.0.100) 56(84) bytes of data.
64 bytes from 192.168.0.100: icmp_seq=1 ttl=128 time=0.371 ms
hadoop@hadoop:~/Desktop/vow$ gedit flumespark1.conf
flumespark1.conf:
-------------------
//creation of components
agent1.sources = flumesource
agent1.channels = flumechannel
agent1.sinks = flumesink
//Source Configuration
agent1.sources.flumesource.type = netcat
agent1.sources.flumesource.bind = localhost
agent1.sources.flumesource.port = 1234
agent1.sources.flumesource.channels = flumechannel
//Channel Configuration
agent1.channels.flumechannel.type = memory
agent1.channels.flumechannel.capacity=1000
agent1.channels.flumechannel.transactionCapacity=100
//Sink Configuration
agent1.sinks.flumesink.type = avro
agent1.sinks.flumesink.hostname = 192.168.0.100
agent1.sinks.flumesink.port = 7777
agent1.sinks.flumesink.channel = flumechannel
Run the flume in Ubuntu:
--------------------------------
$ flume-ng agent --name agent1 --conf /home/hadoop/Desktop/vow --conf-file /home/hadoop/Desktop/vow/flumespark1.conf -Dflume.root.logger=DEBUG,console
// Give sample text on Ubuntu console to feed Flume
curl telnet://localhost:1234
hadoop@hadoop:~/Desktop/vow$ curl telnet://localhost:1234
i love india
OK
super cool
OK
Run the spark program within IntelliJ IDEA in Windows:
------------------------------------------------------------
Output in IntelliJ console
-------------------------------------------
Time: 1550906600000 ms
-------------------------------------------
i love india // values are coming from flume
-------------------------------------------
Time: 1550906620000 ms
-------------------------------------------
super cool
No comments:
Post a Comment