Friday, 29 March 2019

Space into Comma Transformation for Dataframe data using Spark with Scala

// Transforming Space into Comma in Dataframe data
scala> val obj = List( (1001, "7 2234 2342 2522"), (1002, "2222 2223 2224 2225"), (1003, "2000 2001 2002 2003 2004"), (1004, "2005 2006 2000 7 2001 2010"))
obj: List[(Int, String)] = List((1001,7 2234 2342 2522), (1002,2222 2223 2224 2225), (1003,2000 2001 2002 2003 2004), (1004,2005 2006 2000 7 2001 2010))


scala> val r1 = sc.parallelize(obj);
r1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[1] at parallelize at <console>:26

scala>  r1.foreach(println)
(1001,7 2234 2342 2522)
(1002,2222 2223 2224 2225)
(1003,2000 2001 2002 2003 2004)
(1004,2005 2006 2000 7 2001 2010)


 case class UserStory(userid:String, storyid:String)

 val r2 = r1.map (x => {
       val userid = x._1.toString
       val storyid = x._2.toString
       UserStory(userid,storyid)
       })
 
scala> r2.foreach(println)
UserStory(1001,7 2234 2342 2522)
UserStory(1002,2222 2223 2224 2225)
UserStory(1003,2000 2001 2002 2003 2004)
UserStory(1004,2005 2006 2000 7 2001 2010)


scala> val df = r2.toDF
df: org.apache.spark.sql.DataFrame = [userid: string, storyid: string]

scala> df.printSchema
root
 |-- userid: string (nullable = true)
 |-- storyid: string (nullable = true)


scala> df.show
+------+--------------------+
|userid|             storyid|
+------+--------------------+
|  1001|    7 2234 2342 2522|
|  1002| 2222 2223 2224 2225|
|  1003|2000 2001 2002 20...|
|  1004|2005 2006 2000 7 ...|
+------+--------------------+

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> df.select(col("userid") as "user id",regexp_replace(col("storyid")," ",",") as "story id").show
+-------+--------------------+
|user id|            story id|
+-------+--------------------+
|   1001|    7,2234,2342,2522|
|   1002| 2222,2223,2224,2225|
|   1003|2000,2001,2002,20...|
|   1004|2005,2006,2000,7,...|
+-------+--------------------+


Wednesday, 27 March 2019

Compare 2 different strings and finding matching letters and count of matched

// Here we are going to compare 2 different strings and finding matching letters and count of matched

 val firstString = "uuai aao ioaau eieoiou"
 val secondString = "i love india and singapore"
 val m = new Array[Char](50)
 var i = 0
 var leftString  = ""


 for (c <- firstString) {
     m(i) = c
     i = i  + 1
     }

  for (c <- m.distinct.sorted(Ordering.Char)){
                if (c != null) {
      leftString = leftString + c.toString
      }
      }

  m.distinct.sorted(Ordering.Char)

  var output = Map[String,String]()
  var count = 0
  for (lc <- leftString) {
     count =   secondString.count(_ == lc)
     output = output + (lc.toString ->count.toString)
     }

 println(output)

 scala>  println(output)
Map(e -> 2,  -> 0, u -> 0, a -> 3, i -> 4,   -> 4, o -> 2)

Monday, 25 March 2019

Scala Crash Course

scala> val myIntArray:Array[Int] = new Array(3)
myIntArray: Array[Int] = Array(0, 0, 0)

scala> myIntArray
res0: Array[Int] = Array(0, 0, 0)

scala> myIntArray.foreach(println)
0
0
0

scala> myIntArray(0)=10

scala> myIntArray(1)=20

scala> myIntArray(2)=30

scala> myIntArray.foreach(println)
10
20
30


scala> def addOne(x:Int): Int = {
     | x+1
     | }
addOne: (x: Int)Int

scala> addOne(5)
res6: Int = 6

scala> addOne(50)
res7: Int = 51


 scala> def addTwo(x:Int):Int = {
     | return x + 2;
     | }
addTwo: (x: Int)Int

scala> addTwo(5);
res8: Int = 7

scala> (1 to 10).foreach(x => println(addTwo(x)))
3
4
5
6
7
8
9
10
11
12


scala> def max(x:Int, y:Int): Int = {
     | if (x > y) x else y
     | }
max: (x: Int, y: Int)Int

scala> max(5,6)
res22: Int = 6


scala> val myArray = Array("Zara","Lara","Sara")
myArray: Array[String] = Array(Zara, Lara, Sara)

scala> var i = 0
i: Int = 0

scala> while (i < myArray.length) {
     | println(myArray(i))
     | i += 1
     | }
Zara
Lara
Sara

scala> myArray.foreach(arg => println(arg))
Zara
Lara
Sara

scala> myArray.foreach(println(_))
Zara
Lara
Sara

scala> myArray.foreach(println)
Zara
Lara
Sara



scala> for(arg <- myArray)
     | println(arg)
Zara
Lara
Sara


$ cat hello.sc
object hello{
def main(args:Array[String]):Unit = {
println("Hello : " + args(0) + " !")
}
}

scala> :load /home/hadoop/Desktop/ScalaTraining/hello.sc
Loading /home/hadoop/Desktop/ScalaTraining/hello.sc...
defined object hello

scala> hello.main(Array("Sara"))
Hello : Sara !







scala> val word = "India!"
word: String = India!

scala> var n = 5
n: Int = 5

scala> var i = 0
i: Int = 0

scala> while (i < n){
     | println(word)
     | i += 1
     | }
India!
India!
India!
India!
India!

scala> (1 to n).foreach(x => println(word + " #" + x))
India! #1
India! #2
India! #3
India! #4
India! #5





$ cat Hello.scala
object Hello extends App{
println("Hello, World!")
}

scala> :load /home/hadoop/Desktop/ScalaTraining/Hello.scala
Loading /home/hadoop/Desktop/ScalaTraining/Hello.scala...
defined object Hello

scala> Hello.main(null)
Hello, World!



scala> 1 to n
res41: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5)

scala> (1).to(5)
res42: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5)



scala> val greetStrings : Array[String] = new Array[String](3)
greetStrings: Array[String] = Array(null, null, null)

scala> greetStrings(0) = "Hello"

scala> greetStrings(1) = ", "

scala> greetStrings(2) = " world! \n"

scala> greetStrings.apply(0)
res47: String = Hello

scala> def printArgs(args:Array[String]):Unit = {
     | var i = 0
     | while (i < args.length) {
     | println(args(i))
     | i += 1
     | }
     | }
printArgs: (args: Array[String])Unit

scala> printArgs(greetStrings)
Hello
,
 world!


scala> printArgs(Array("I","Love","India"))
I
Love
India


scala> def printArgs(args:Array[String]): Unit = {
     | for (arg < args)
<console>:2: error: '<-' expected but ')' found.
for (arg < args)
               ^

scala> def printArgs(args:Array[String]): Unit = {
     | for (arg <- args)
     | println(arg)
     | }
printArgs: (args: Array[String])Unit

scala> printArgs(greetStrings)
Hello
,
 world!


scala> printArgs(Array("I","Love","India"))
I
Love
India


scala> Array("Siva "," Lara", " Mala").mkString("\t")
res56: String = Siva Lara Mala

scala> println(Array("Siva "," Lara", " Mala").mkString("\t"))
Siva Lara Mala

scala> println(Array(" Siva "," Lara", " Mala").mkString("\n"))
 Siva
 Lara
 Mala

scala> println(Array(" Siva "," Lara", " Mala").mkString(":"))
 Siva : Lara: Mala


scala> def formatArgs(args:Array[String]):String = return args.mkString("\n")
formatArgs: (args: Array[String])String


scala> formatArgs(Array("sare","gare"))
res63: String =
sare
gare

scala> formatArgs(Array("sare","gare")).foreach(println)
s
a
r
e


g
a
r
e


scala> myArray
res65: Array[String] = Array(Zara, Lara, Sara)

scala> formatArgs(myArray)
res66: String =
Zara
Lara
Sara


scala> def formatArgs(args:Array[String]):String = return args.mkString(":")
formatArgs: (args: Array[String])String

scala> formatArgs(myArray)
res67: String = Zara:Lara:Sara


scala> val res = formatArgs(Array("zero","one","two"))
res: String = zero:one:two

scala> assert(res == "zero:one:two")

scala> val numNames = Array("zero","one","two")
numNames: Array[String] = Array(zero, one, two)

scala> val numNames = Array.apply("zero","one","two")
numNames: Array[String] = Array(zero, one, two)

scala> numNames.exists(s => s.contains("z"))
res72: Boolean = true



scala> val oneTwo  = List(1,2,3)
oneTwo: List[Int] = List(1, 2, 3)

scala> val threeFour = List(3,4)
threeFour: List[Int] = List(3, 4)

scala> val oneTwoThreeFour  = oneTwo ::: threeFour
oneTwoThreeFour: List[Int] = List(1, 2, 3, 3, 4)


scala> val twoThree = List(2,3)
twoThree: List[Int] = List(2, 3)


scala> val oneTwoThree = 1 :: twoThree
oneTwoThree: List[Int] = List(1, 2, 3)



scala> val thrill = "will " :: "fill " :: "until " :: Nil
thrill: List[String] = List("will ", "fill ", "until ")

scala> thrill.head
res75: String = "will "

scala> thrill.length
res76: Int = 3

scala> thrill.last
res77: String = "until "


scala> val len = thrill.length
len: Int = 3

scala> thrill(len-1)
res78: String = "until "


scala> thrill
res85: List[String] = List("will ", "fill ", "until ")

scala> thrill.filter(x => x.contains("fill"))
res86: List[String] = List("fill ")

scala> thrill.filterNot(x => x.contains("fill"))
res87: List[String] = List("will ", "until ")

Partially applied functions:
-----------------------------
scala> def origFunc(a:Int, b:Int) = a+b
origFunc: (a: Int, b: Int)Int

scala> def modFunc = origFunc(10,_:Int)
modFunc: Int => Int

scala> modFunc(10)
res89: Int = 20

scala> modFunc(103)
res90: Int = 113


named parameters:
-----------------
scala> def speed(distance:Float, time:Float)  = distance / time
speed: (distance: Float, time: Float)Float

scala> speed(time=4.5F, distance=10F)
res91: Float = 2.2222223

scala> speed(distance=100F,time=5.5F)
res92: Float = 18.181818


scala> thrill
res93: List[String] = List("will ", "fill ", "until ")

scala> thrill.sorted
res94: List[String] = List("fill ", "until ", "will ")

scala> thrill.map(s => s + "y")
res95: List[String] = List(will y, fill y, until y)

scala> thrill.map(s => s.trim() + "y")
res96: List[String] = List(willy, filly, untily)

scala> thrill
res99: List[String] = List("will ", "fill ", "until ")

scala> val thrill = List("I","Love","India")
thrill: List[String] = List(I, Love, India)

scala> val thrill  = List("Will","Fill","Until")
thrill: List[String] = List(Will, Fill, Until)

scala> thrill.map(s => s + "y")
res100: List[String] = List(Willy, Filly, Untily)

scala> val thrill = "will" :: "will" :: "until" :: Nil
thrill: List[String] = List(will, will, until)

scala> thrill.map(s => s+ "y")
res0: List[String] = List(willy, willy, untily)

scala> thrill.mkString(",")
res1: String = will,will,until

scala> thrill.mkString("$")
res2: String = will$will$until

scala> thrill.mkString(":")
res3: String = will:will:until

scala> thrill.sortBy(s => s.charAt(0).toLower)
res7: List[String] = List(until, will, will)

scala> val pair = (99,"LuftBallons")
pair: (Int, String) = (99,LuftBallons)

scala> println(pair._1)
99

scala> println(pair._2)
LuftBallons


scala> val largeTuple = ('u','r',"the",1,4,"me")
largeTuple: (Char, Char, String, Int, Int, String) = (u,r,the,1,4,me)



// mutable Set example
scala> import scala.collection.mutable.Set
import scala.collection.mutable.Set

scala> val movieSet = Set("Hitch","Poltergeist")
movieSet: scala.collection.mutable.Set[String] = Set(Poltergeist, Hitch)

scala> movieSet += "Shrek"
res12: movieSet.type = Set(Poltergeist, Shrek, Hitch)

scala> println(movieSet)
Set(Poltergeist, Shrek, Hitch)



//Immutable Set Example
scala> import scala.collection.immutable.Set
import scala.collection.immutable.Set

scala> val movieSet = Set("Hitch","Poltergeist")
movieSet: scala.collection.immutable.Set[String] = Set(Hitch, Poltergeist)

scala> movieSet += "Shrek"
<console>:29: error: value += is not a member of scala.collection.immutable.Set[String]
  Expression does not convert to assignment because receiver is not assignable.
       movieSet += "Shrek"
                ^

scala> println(movieSet)
Set(Hitch, Poltergeist)



scala> import scala.collection.mutable.Map
import scala.collection.mutable.Map

scala> val treasureMap = Map[Int,String]()
treasureMap: scala.collection.mutable.Map[Int,String] = Map()

scala> treasureMap += (1 -> "Go to Island")
res16: treasureMap.type = Map(1 -> Go to Island)

scala> treasureMap += (2 -> "Find Big X on Ground")
res17: treasureMap.type = Map(2 -> Find Big X on Ground, 1 -> Go to Island)

scala> treasureMap += (3 -> "Dig.")
res18: treasureMap.type = Map(2 -> Find Big X on Ground, 1 -> Go to Island, 3 -> Dig.)

scala> println(treasureMap)
Map(2 -> Find Big X on Ground, 1 -> Go to Island, 3 -> Dig.)

scala> treasureMap.foreach(println)
(2,Find Big X on Ground)
(1,Go to Island)
(3,Dig.)


//Immutable Map but val
scala> import scala.collection.immutable.Map
import scala.collection.immutable.Map

scala> val romanNumeral = Map(1 -> "I", 2 -> "II")
romanNumeral: scala.collection.immutable.Map[Int,String] = Map(1 -> I, 2 -> II)

scala> romanNumeral += (3 -> "III")
<console>:32: error: value += is not a member of scala.collection.immutable.Map[Int,String]
  Expression does not convert to assignment because receiver is not assignable.
       romanNumeral += (3 -> "III")
                    ^

//Immutable Map but var
scala> import scala.collection.immutable.Map
import scala.collection.immutable.Map

scala> var romanNumeral = Map(1->"I",2 -> "II")
romanNumeral: scala.collection.immutable.Map[Int,String] = Map(1 -> I, 2 -> II)

scala> romanNumeral += (3 -> "III")

scala> println(romanNumeral)
Map(1 -> I, 2 -> II, 3 -> III)




// Execption handling...
scala> def getContent(filePath:String) = {
     | for (line <- Source.fromFile(filePath).getLines())
     | println(line.length + " " + line)
     | }
getContent: (filePath: String)Unit

scala> getContent("")
java.io.FileNotFoundException:  (No such file or directory)
  at java.io.FileInputStream.open0(Native Method)
  at java.io.FileInputStream.open(FileInputStream.java:195)
  at java.io.FileInputStream.<init>(FileInputStream.java:138)
  at scala.io.Source$.fromFile(Source.scala:91)
  at scala.io.Source$.fromFile(Source.scala:76)
  at scala.io.Source$.fromFile(Source.scala:54)
  at getContent(<console>:37)
  ... 54 elided


scala> getContent("/home/hadoop/Desktop/emp_data.csv")
52 empno,ename,designation,manager,hire_date,sal,deptno
39 7369,SMITH,CLERK,7902,12/17/1980,800,20
42 7499,ALLEN,SALESMAN,7698,2/20/1981,1600,30
41 7521,WARD,SALESMAN,7698,2/22/1981,1250,30
41 7566,TURNER,MANAGER,7839,4/2/1981,2975,20
43 7654,MARTIN,SALESMAN,7698,9/28/1981,1250,30
41 7698,MILLER,MANAGER,7839,5/1/1981,2850,30


//Exception Handling
scala> def getContent(filePath:String) = {
     |    try{
     |        for (line <- Source.fromFile(filePath).getLines())
     |        println(line.length + " " + line)
     |        }
     |     catch{
     |           case e: java.io.FileNotFoundException => println("No file found")
     |     }
     |   }
getContent: (filePath: String)Unit

scala> getContent("")
No file found

scala> getContent("/home/hadoop/Desktop/emp_Nosuchfile.csv")
No file found



//Partial functions
scala> def SalaryCalculator(base:Int, inc:Int, varComp:Int):Int  = base + inc + varComp
SalaryCalculator: (base: Int, inc: Int, varComp: Int)Int

scala> def jrSalCalc = SalaryCalculator(10,_:Int,_:Int)
jrSalCalc: (Int, Int) => Int

scala> def MidSalCalc = SalaryCalculator(20,_:Int, _:Int)
MidSalCalc: (Int, Int) => Int

scala> def SeniorCalc = SalaryCalculator(30, _:Int, _:Int)
SeniorCalc: (Int, Int) => Int

scala> SalaryCalculator(35,10,12)
res0: Int = 57

scala> jrSalCalc(10,12)
res1: Int = 32

scala> MidSalCalc(10,12)
res2: Int = 42

scala> SeniorCalc(10,12)
res3: Int = 52





// named arguments
scala> def speed(distance:Float, time:Float)  = distance / time
speed: (distance: Float, time: Float)Float

scala> def speed(distance:Float=10F, time:Float=2F) = distance / time
speed: (distance: Float, time: Float)Float

scala> speed(time=4F)
res5: Float = 2.5

scala> speed(10,4.5F)
res6: Float = 2.2222223

scala> speed(time=4.5F,distance=10F)
res7: Float = 2.2222223

scala> speed()
res8: Float = 5.0

scala> speed(distance=10F)
res9: Float = 5.0

scala> speed(distance=10F,time=4.5F)
res10: Float = 2.2222223




// Pattern matching example
scala> val myStr = "Hello"
myStr: String = Hello

scala> val result = myStr match {
       case "Hello" => println("Greeting!")
       case "Hi" => println("Short Greeting!")
       case _ => println("Unknown")
       }
Greeting!
result: Unit = ()


scala> val myStr = "Hello"
myStr: String = Hello

scala> val result = myStr match {
     | case "Hello" => println("Greeting!")
     | case "Hi" => println("Short Greeting!")
     | case _ => println("Unknown")
     | }
Greeting!
result: Unit = ()

scala> def UsingMatchPatternExa(myStr:String) = {
     |  val result = myStr match {
     |        case "Hello" => println("Greeting!")
     |        case "Hi" => println("Short Greeting!")
     |        case _ => println("Unknown")
     |        }
     | }
UsingMatchPatternExa: (myStr: String)Unit

scala> UsingMatchPatternExa("Hello")
Greeting!

scala> UsingMatchPatternExa("Hi")
Short Greeting!

scala> UsingMatchPatternExa("Hey")
Unknown




scala> var a = 0
a: Int = 0

scala> var b = 0
b: Int = 0

scala> for (a <- 1 to 3; b <- 1 until 3) {
     | println("Value of a : " + a + ", b: " + b)
     | }
Value of a : 1, b: 1
Value of a : 1, b: 2
Value of a : 2, b: 1
Value of a : 2, b: 2
Value of a : 3, b: 1
Value of a : 3, b: 2




scala> val numList = List(1 to 10)
numList: List[scala.collection.immutable.Range.Inclusive] = List(Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))

scala> for (a <- numList) {
     | println("Value of a : " + a)
     | }
Value of a : Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)




scala> val numList = List(1 to 10)
numList: List[scala.collection.immutable.Range.Inclusive] = List(Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))

scala> for (a <- numList) {
     | println("Value of a : " + a)
     | }
Value of a : Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


scala> val numList = List(1,2,3,4,5,6)
numList: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val retVal = numList.filter( x => x != 3 && x < 6)
retVal: List[Int] = List(1, 2, 4, 5)

scala> val retVal = for(a <- numList if a != 3; if a < 6) yield a
retVal: List[Int] = List(1, 2, 4, 5)




/// variable arguments
scala> def printStrings(args: String*) = {
     | var i:Int = 0
     | for (arg <- args) {
     | println("Arg Value[" + i + "] = " + arg)
     | i = i + 1;
     | }
     | }
printStrings: (args: String*)Unit

scala> printStrings("Hadoop","Scala","Spark","Oozie","Hive","Hbase")
Arg Value[0] = Hadoop
Arg Value[1] = Scala
Arg Value[2] = Spark
Arg Value[3] = Oozie
Arg Value[4] = Hive
Arg Value[5] = Hbase

scala> printStrings("Arun","Prasad")
Arg Value[0] = Arun
Arg Value[1] = Prasad


// Anonymous functions
scala> var inc = (x: Int) => x + 1
inc: Int => Int = <function1>

scala>

scala> inc(7)
res3: Int = 8

scala> inc(7) - 10
res4: Int = -2

scala> var mul = (x:Int, y:Int) => x * y
mul: (Int, Int) => Int = <function2>

scala> mul(5,7)
res5: Int = 35

scala> var userDir = () => { System.getProperty("user.dir")}
userDir: () => String = <function0>

scala> println(userDir())
/home/hadoop




//Higher Order function0
//(x:A) --> x's type is A. So A is a type
//A means Any Type

scala> def apply(f:Int => String, v:Int) = f(v)
apply: (f: Int => String, v: Int)String

scala> def CurlyIt[A] (x: A) = "{" + x.toString() + "}"
CurlyIt: [A](x: A)String

scala> def SquareIt[A](x: A) = "[" + x.toString() + "]"
SquareIt: [A](x: A)String

scala> SquareIt(10)
res9: String = [10]

scala> CurlyIt(20)
res10: String = {20}

scala> println(apply(SquareIt,10))
[10]

scala> println(apply(CurlyIt,20))
{20}

scala> SquareIt("sare")
res14: String = [sare]

scala> CurlyIt("sare")
res15: String = {sare}






//Higher Order function
scala> def apply(f:String => String, v:String) = f(v)
apply: (f: String => String, v: String)String

scala> def CurlyIt[A] (x: A) = "{" + x.toString() + "}"
CurlyIt: [A](x: A)String

scala> def SquareIt[A](x: A) = "[" + x.toString() + "]"
SquareIt: [A](x: A)String

scala> println(apply(SquareIt,"--Sare--"))
[--Sare--]

scala> println(apply(CurlyIt,"--Sare--"))
{--Sare--}




// Partially applied function

scala> def adder (m:Int, n:Int, p:Int) = m + n + p
adder: (m: Int, n: Int, p: Int)Int

scala> val add2 = adder(2, _:Int, _:Int)
add2: (Int, Int) => Int = <function2>

scala> add2(10,2)
res19: Int = 14

scala> add2(1,1)
res20: Int = 4

scala> def adder(m:Int)(n:Int)(p:Int) = m + n + p
adder: (m: Int)(n: Int)(p: Int)Int

scala> adder(2)(3)(4)
res21: Int = 9

/*
Scala collections can be mutable and immutable
Mutable collections can be updated or extended in place.
Immutable collections never change: Additions, Removals, Updates operators return a new collection and leave the old collection unchanged
*/

Arrays :
Fixed size sequential collection of elements of the same type
Lists :
Sequential collection of elements of the same type
Immutable
Lists represents a Linked List
Sets
Maps
Tuples
Fixed number of items of different types together
Immutable
Option



scala> val myList = List(1,2,3)
myList: List[Int] = List(1, 2, 3)

scala> val ourList = 1 :: 2 :: 3 :: Nil
ourList: List[Int] = List(1, 2, 3)

// Add Leader -- Adding an element to the head of a list
scala> val AddLeaderList = 0 :: myList
AddLeaderList: List[Int] = List(0, 1, 2, 3)

//Add Follower -- Adding an element to the tail of a list
scala> val AddFollowerList = ourList :+ 4
AddFollowerList: List[Int] = List(1, 2, 3, 4)

// List concatenation
scala> val t3 = myList ::: ourList
t3: List[Int] = List(1, 2, 3, 1, 2, 3)

// remove duplicates
scala> t3.distinct
res23: List[Int] = List(1, 2, 3)



scala> val Left = List("Arun","Banu","Chitra")
Left: List[String] = List(Arun, Banu, Chitra)

scala> val Right = List("Sara","Tanu","Umesh")
Right: List[String] = List(Sara, Tanu, Umesh)

scala> Left ::: Right
res27: List[String] = List(Arun, Banu, Chitra, Sara, Tanu, Umesh)

//List concatenation
scala> Right ::: Left
res28: List[String] = List(Sara, Tanu, Umesh, Arun, Banu, Chitra)


scala> Left.union(Right)
res30: List[String] = List(Arun, Banu, Chitra, Sara, Tanu, Umesh)

scala> Right.union(Left)
res31: List[String] = List(Sara, Tanu, Umesh, Arun, Banu, Chitra)


scala> val t = (10,"Twenty",30,"Fourty",true,3.5F)
t: (Int, String, Int, String, Boolean, Float) = (10,Twenty,30,Fourty,true,3.5)

scala> t._1
res35: Int = 10




//Sets
scala> val numberSet = Set(0,1,2,3,4,5,6,7,8, 9,10)
numberSet: scala.collection.immutable.Set[Int] = Set(0, 5, 10, 1, 6, 9, 2, 7, 3, 8, 4)


scala> numberSet.filter ( _ % 2 == 0)
res39: scala.collection.immutable.Set[Int] = Set(0, 10, 6, 2, 8, 4)

scala> numberSet.filter ( _ % 2 != 0)
res40: scala.collection.immutable.Set[Int] = Set(5, 1, 9, 7, 3)

// Set doesn't keep duplicates. It keeps unique only
scala> val noDuplicates = Set(1,2,3,2,3,1,2,3,4,3,2,1,2,34,5,4,3,2,1)
noDuplicates: scala.collection.immutable.Set[Int] = Set(5, 1, 2, 34, 3, 4)

scala> noDuplicates.foreach(println)
5
1
2
34
3
4







scala> def toInt(in : String) : Option[Int] = {
     | try{
     | Some(Integer.parseInt(in.trim))
     | } catch {
     | case e:NumberFormatException => None
     | }
     | }
toInt: (in: String)Option[Int]

scala> val someString = "123"
someString: String = 123

scala> toInt(someString) match {
     | case Some(i) => println(i)
     | case None => println(" Failed ")
     | }
123

scala> val someString = "sare"
someString: String = sare

scala> toInt(someString) match {
     | case Some(i) => println(i)
     | case None => println(" Failed ")
     | }
 Failed

scala> toInt(someString).getOrElse(-1)
res44: Int = -1

scala> toInt("10101").getOrElse(-1)
res45: Int = 10101

scala> toInt("Aries").getOrElse("Format Error")
res47: Any = Format Error





scala> val countryCapitals = Map ("India"->"Delhi","Afhanistan"->"Kabul","Egypt"->"Cairo")
countryCapitals: scala.collection.immutable.Map[String,String] = Map(India -> Delhi, Afhanistan -> Kabul, Egypt -> Cairo)

scala> countryCapitals.get("Egypt")
res48: Option[String] = Some(Cairo)

scala> countryCapitals.get("Egypt").isDefined
res49: Boolean = true

scala> countryCapitals.get("China").getOrElse("Not Defined in our list")
res51: String = Not Defined in our list





// zip example
scala> val numbers = List(1,2,3,4,5)
numbers: List[Int] = List(1, 2, 3, 4, 5)

scala> val chars  = List("a","b","c","d","e")
chars: List[String] = List(a, b, c, d, e)

scala> numbers.zip(chars)
res52: List[(Int, String)] = List((1,a), (2,b), (3,c), (4,d), (5,e))

scala> chars.zip(numbers)
res54: List[(String, Int)] = List((a,1), (b,2), (c,3), (d,4), (e,5))



// zipWithIndex example
scala> val myList = List("Arun","Prasad","Kalai","Harini","Nila","Silva")
myList: List[String] = List(Arun, Prasad, Kalai, Harini, Nila, Silva)

scala> myList.zipWithIndex
res55: List[(String, Int)] = List((Arun,0), (Prasad,1), (Kalai,2), (Harini,3), (Nila,4), (Silva,5))

scala> myList.zipWithIndex.foreach(println)
(Arun,0)
(Prasad,1)
(Kalai,2)
(Harini,3)
(Nila,4)
(Silva,5)


scala> myList.zip(List(1,2,3,4,5,6))
res57: List[(String, Int)] = List((Arun,1), (Prasad,2), (Kalai,3), (Harini,4), (Nila,5), (Silva,6))

scala> List(1,2,3,4,5,6).zip(myList)
res58: List[(Int, String)] = List((1,Arun), (2,Prasad), (3,Kalai), (4,Harini), (5,Nila), (6,Silva))



//fold left
scala> val numbers = List(1,2,3,4,5,6,7,8,9,10)
numbers: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// with Seed value as 0
scala> numbers.foldLeft(0) { (m,n) => println("m : " + m + " n: " + n ); m + n}
m : 0 n: 1
m : 1 n: 2
m : 3 n: 3
m : 6 n: 4
m : 10 n: 5
m : 15 n: 6
m : 21 n: 7
m : 28 n: 8
m : 36 n: 9
m : 45 n: 10
res59: Int = 55

// with Seed value as 5
scala> numbers.foldLeft(5) { (m,n) => println("m : " + m + " n: " + n ); m + n}
m : 5 n: 1
m : 6 n: 2
m : 8 n: 3
m : 11 n: 4
m : 15 n: 5
m : 20 n: 6
m : 26 n: 7
m : 33 n: 8
m : 41 n: 9
m : 50 n: 10
res60: Int = 60



// fold Right

scala> val numbers = List(1,2,3,4,5,6,7,8,9,10)
numbers: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> numbers.foldRight(0) { (m,n) => println("m : " + m + " n: " + n ); m + n}
m : 10 n: 0
m : 9 n: 10
m : 8 n: 19
m : 7 n: 27
m : 6 n: 34
m : 5 n: 40
m : 4 n: 45
m : 3 n: 49
m : 2 n: 52
m : 1 n: 54
res61: Int = 55


//with seed value as 5
scala> numbers.foldRight(5) { (m,n) => println("m : " + m + " n: " + n ); m + n}
m : 10 n: 5
m : 9 n: 15
m : 8 n: 24
m : 7 n: 32
m : 6 n: 39
m : 5 n: 45
m : 4 n: 50
m : 3 n: 54
m : 2 n: 57
m : 1 n: 59
res62: Int = 60


//flatten
//It collapses one level of nested structure

cala> List(List(1,2,3),List(3,4,5),List(5,6,7)).flatten
res63: List[Int] = List(1, 2, 3, 3, 4, 5, 5, 6, 7)

scala> List(List(1,2,3),List(3,4,5),List(5,6,7)).flatten.toSet
res64: scala.collection.immutable.Set[Int] = Set(5, 1, 6, 2, 7, 3, 4)

scala> List(List(1,2,3),List(3,4,5),List(5,6,7)).flatten.distinct
res65: List[Int] = List(1, 2, 3, 4, 5, 6, 7)

class Calculator(brand:String){
  val color: String = if (brand =="TI"){     
                "blue"     
                } else if (brand =="HP"){     
                "black"     
               } else {     
                "white"     
           }
  def add(m:Int, n:Int) : Int = m + n
 def displayBrandAndColor()={
println("Brand : " + brand + ", Color : " + color)     
 }defined class Calculator
}
scala>

scala> val calc = new Calculator("HP")
calc: Calculator = Calculator@51d6e3ae

scala> println(calc.color)
black





cala> class Calculator(brand:String){
     |   val color: String = if (brand =="TI"){
     |                 "blue"
     |                 } else if (brand =="HP"){
     |                 "black"
     |                } else {
     |                 "white"
     |            }
     |   def add(m:Int, n:Int) : Int = m + n
     |  def displayBrandAndColor()={
     |   println("Brand : " + brand + ", Color : " + color)
     |  }
     | }
defined class Calculator

scala> val c = new Calculator("HP")
c: Calculator = Calculator@67885e1f

scala> c.displayBrandAndColor
Brand : HP, Color : black

Saturday, 16 March 2019

Json Input To Kafka Broker to Spark Streaming to MySQL using KafkaProducer, kafkaUtils.CreateStream

//Json To Kafka Broker to Spark Streaming to MySQL
// Here we read a json input file from file system and put it into Kafka Broker
// Spark Streaming fetch messages from Kafka Broker and write it into MySQL table

input file:
nameList.json:
--------------
{"id":992,"name":"Herman","city":"Iles","country":"Colombia","Skills":"CVE"},
{"id":993,"name":"Burton","city":"Santo Tomas","country":"Philippines","Skills":"VMware vSphere"},
{"id":994,"name":"Correna","city":"Shirgjan","country":"Albania","Skills":"Wyse"},
{"id":995,"name":"Cathi","city":"Dorūd","country":"Iran","Skills":"SSCP"},
{"id":996,"name":"Lena","city":"Al Judayrah","country":"Palestinian Territory","Skills":"Commercial Kitchen Design"},
{"id":997,"name":"Madalena","city":"Livadiya","country":"Ukraine","Skills":"Software Development"},
{"id":998,"name":"Jo-anne","city":"Khatsyezhyna","country":"Belarus","Skills":"TPD"},
{"id":999,"name":"Georgi","city":"Pasuruan","country":"Indonesia","Skills":"Project Engineering"},
{"id":1000,"name":"Scott","city":"Gyumri","country":"Armenia","Skills":"RHEV"}



start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties

hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic jsonTopic --partitions 1 --replication-factor 1 --zookeeper localhost:2182

//view the topics available in Kafka Broker
hadoop@hadoop:/usr/local/kafka$  bin/kafka-topics.sh --list --zookeeper localhost:2182
jsonTopic



// create a database and table in MySQL:

 $ mysql -uroot -pcloudera

Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 4
Server version: 5.7.25-0ubuntu0.18.10.2 (Ubuntu)

Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> create database KafkaDB;
Query OK, 1 row affected (0.05 sec)

mysql> use KafkaDB;
Database changed

// create a table
mysql> create table jsonTable (id int, name varchar(50), city varchar(50), country varchar(50), Skills varchar(50));
Query OK, 0 rows affected (0.20 sec)





//Produce reads json file and publish them in kafka topic
Producer Programming in Scala:
-------------------------------
JsonProducer.scala:
-------------------------
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}

object JsonProducer {
  def main(args:Array[String]):Unit = {
    val props = new Properties()

    props.put("bootstrap.servers","localhost:9092")
    props.put("acks","all")
    props.put("client.id","ProducerApp")
    props.put("retries","4")
    props.put("batch.size","32768")
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    val topic = "jsonTopic"
    val producer = new KafkaProducer[String,String](props)
    val file = scala.io.Source.fromFile("/home/cloudera/Desktop/vow/nameList.json")

    for (line <- file.getLines()) {
      val msg = new ProducerRecord[String,String](topic,line)
      producer.send(msg)
    }
    producer.close()
  }
}


[cloudera@quickstart kafka]$ spark-shell
scala> sc.stop()
scala> :load JsonProducer.scala
scala> JsonProducer.main(null)




hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic jsonTopic --bootstrap-server localhost:9092 --from-beginning

{"id":1,"name":"Sharline","city":"Uppsala","country":"Sweden","Skills":"Eyelash Extensions"},
{"id":2,"name":"Marris","city":"São Domingos","country":"Cape Verde","Skills":"VMI Programs"},
{"id":3,"name":"Gregg","city":"QaxbaÅŸ","country":"Azerbaijan","Skills":"Historical Research"},
{"id":4,"name":"Christye","city":"Guarapari","country":"Brazil","Skills":"Army"},
{"id":5,"name":"Modesta","city":"Paltamo","country":"Finland","Skills":"Avaya Technologies"},



Kafka2MySQLStreaming.scala:
-----------------------------
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
import java.util.Properties
import org.apache.spark.sql.{SQLContext,SaveMode}
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}

object Kafka2MySQLStreaming{
def main(args:Array[String]):Unit ={

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("SparkStreamingJson").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlc = new SQLContext(sc)
val batchInterval = 20
val zk = "localhost:2182"
val consumerGroupID = "jsonGroup"
val topic = "jsonTopic"
val partition = 1
val perTopicPartitions = Map(topic -> partition)
val ssc = new StreamingContext(sc,Seconds(batchInterval))

val KafkaData = KafkaUtils.createStream(ssc,zk,consumerGroupID,perTopicPartitions)
val ks = KafkaData.map (x => x._2)
ks.foreachRDD { x =>
val df = sqlc.read.json(x)

val props = new Properties()
props.put("driver","com.mysql.jdbc.Driver")
props.put("user","root")
props.put("password","cloudera")

df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/KafkaDB","jsonTable",props)
df.count()
}
ssc.start()
ssc.awaitTermination()
}
}

[cloudera@quickstart kafka]$ spark-shell
scala> sc.stop()
scala> :load Kafka2MySQLStreaming.scala
scala> Kafka2MySQLStreaming.main(null)



// MySQL result:
--------------
[cloudera@quickstart kafka]$ mysql -uroot -pcloudera
mysql> use KafkaDB;
mysql> select * from jsonTable;
+------+------------+--------------------------+-----------------------+-------------------------------+
| id   | name       | city                     | country               | Skills                        |
+------+------------+--------------------------+-----------------------+-------------------------------+
|    1 | Sharline   | Uppsala                  | Sweden                | Eyelash Extensions            |
|    2 | Marris     | São Domingos             | Cape Verde            | VMI Programs                  |
|    3 | Gregg      | QaxbaÅŸ                   | Azerbaijan            | Historical Research           |
|    4 | Christye   | Guarapari                | Brazil                | Army                          |
|    5 | Modesta    | Paltamo                  | Finland               | Avaya Technologies            |

Multiple Kafka Programs - Console Producer, Console Consumer - Spark Streaming

// NetCat Input into Spark Streaming Example
$ spark-shell --master local[2]

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

val ssc = new StreamingContext(sc,Seconds(10))
val lines = ssc.socketTextStream("localhost",9999)
val words = lines.flatMap(x => x.split(" "))
val pair = words.map(x => (x,1))
val res = pair.reduceByKey(_+_)
res.print()
ssc.start()


hadoop@hadoop:~$ nc -lk 9999
i love india
who loves pakistan
^C

Time: 1552710680000 ms
-------------------------------------------
(love,1)
(who,1)
(india,1)
(pakistan,1)
(i,1)
(loves,1)


// Kafka Console Producer and Console Consumer Example
[cloudera@quickstart kafka]$ sudo gedit server.properties
   broker.id=0
   zoo keeper port : 2182
   log.dirs=/tmp/k1/kafka-logs
   listeners=PLAINTEXT://:9092
 
[cloudera@quickstart kafka]$ sudo gedit config/zookeeper.properties
   clientPort=2182
 
start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties



hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic myFresh --partitions 1 --replication-factor 1 --zookeeper localhost:2182
Created topic "myFresh".


hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2182
myFresh

hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myFresh
>i love india
>i love singapore
>i love malaysia

hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic myFresh --bootstrap-server localhost:9092 --from-beginning
i love india
i love singapore
i love malaysia




// Kafka Console Producer data into => Spark Streaming Example (KafkaUtils.createStream)

KafkaStreamingExa.scala:
------------------------
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level


object KafkaStreamingExa {
def main (args : Array[String]) : Unit ={
  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)
  val conf = new SparkConf().setAppName("SparkStreamingJson").setMaster("local[4]")
  val sc = new SparkContext(conf)
  val ssc = new StreamingContext(sc,Seconds(10))
  val kafkaStream = KafkaUtils.createStream(ssc,"localhost:2182","testGroup",Map("myFresh" -> 0))
  val lines = kafkaStream.map(x => x._2.toUpperCase)
  print(lines)
  val words = lines.flatMap(x => x.split(" "))
 
  val pair = words.map(x => (x,1))

  val res = pair.reduceByKey(_+_)


 
  res.foreachRDD( rdd =>
      rdd.foreachPartition( partition =>
        partition.foreach{
            case (w:String, cnt:Int) => {
              val x = w + "\t" + cnt
              print(x)
 
            }
          }
      )
  )

  ssc.start()
  ssc.awaitTermination()

}
}


 spark-shell 
 sc.stop()
 scala> :load  KafkaStreamingExa.scala


// Input data into Console Producer of Kafka
 [cloudera@quickstart kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myFresh
>silva silva silva
>aravinda aravinda aravinda
>silva
>kamal rajini
>kamal rajini


// Result
KafkaStreamingExa.main(null)
RAJINI  2
KAMAL  2
SILVA 4
ARAVINDA 3




Wednesday, 13 March 2019

Kafka : Call Logs SUCCESS, FAILED, DROPPED - Producer and Consumer Programming in Scala

input file:
calllogdata.txt:
-----------------
ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:05DROPPED 80526900577757919463
ec59cea2-5006-448f-a032-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:07DROPPED 98861773759916790556
ec59cea2-5006-448f-a033-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:45SUCCESS 86186279969886177375
ec59cea2-5006-448f-a034-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:53DROPPED 98765156164894949494
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a036-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:05SUCCESS 12354678902153698431
ec59cea2-5006-448f-a037-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80556456458478787877
ec59cea2-5006-448f-a038-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80809056095676236992
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a040-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 96090652158087080806
ec59cea2-5006-448f-a041-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 89797946465879874615
ec59cea2-5006-448f-a042-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05SUCCESS 45454545457978978979
ec59cea2-5006-448f-a043-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 74584564564564564656
ec59cea2-5006-448f-a044-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 98794894945648947898
ec59cea2-5006-448f-a045-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05SUCCESS 84645645605646064646
ec59cea2-5006-448f-a046-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 78545456456456456456
ec59cea2-5006-448f-a047-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 57554548979797979797
ec59cea2-5006-448f-a048-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 87898640989489089409
ec59cea2-5006-448f-a049-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 75884848478978978979
ec59cea2-5006-448f-a050-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 74894086489489489489
ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:05DROPPED 80526900577757919463
ec59cea2-5006-448f-a032-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:07DROPPED 98861773759916790556
ec59cea2-5006-448f-a033-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:45SUCCESS 86186279969886177375
ec59cea2-5006-448f-a034-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:53DROPPED 98765156164894949494
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a036-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:05SUCCESS 12354678902153698431
ec59cea2-5006-448f-a037-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80556456458478787877
ec59cea2-5006-448f-a038-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80809056095676236992
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a040-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 96090652158087080806
ec59cea2-5006-448f-a041-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 89797946465879874615
ec59cea2-5006-448f-a042-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05SUCCESS 45454545457978978979
ec59cea2-5006-448f-a043-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 74584564564564564656
ec59cea2-5006-448f-a044-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 98794894945648947898
ec59cea2-5006-448f-a045-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05SUCCESS 84645645605646064646
ec59cea2-5006-448f-a046-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 78545456456456456456
ec59cea2-5006-448f-a047-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 57554548979797979797
ec59cea2-5006-448f-a048-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 87898640989489089409
ec59cea2-5006-448f-a049-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 75884848478978978979
ec59cea2-5006-448f-a050-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 74894086489489489489
ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:05DROPPED 80526900577757919463
ec59cea2-5006-448f-a032-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:07DROPPED 98861773759916790556
ec59cea2-5006-448f-a033-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:45SUCCESS 86186279969886177375
ec59cea2-5006-448f-a034-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:53DROPPED 98765156164894949494
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a036-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:05SUCCESS 12354678902153698431
ec59cea2-5006-448f-a037-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80556456458478787877
ec59cea2-5006-448f-a038-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05DROPPED 80809056095676236992
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a040-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 96090652158087080806
ec59cea2-5006-448f-a041-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 89797946465879874615
ec59cea2-5006-448f-a042-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05SUCCESS 45454545457978978979
ec59cea2-5006-448f-a043-d5e53f33be232014-03-18 00:02:482014-03-18 00:06:05DROPPED 74584564564564564656
ec59cea2-5006-448f-a044-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 98794894945648947898
ec59cea2-5006-448f-a045-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05SUCCESS 84645645605646064646
ec59cea2-5006-448f-a046-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 78545456456456456456
ec59cea2-5006-448f-a047-d5e53f33be232014-03-19 00:02:482014-03-19 00:06:05DROPPED 57554548979797979797
ec59cea2-5006-448f-a048-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 87898640989489089409
ec59cea2-5006-448f-a049-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 75884848478978978979
ec59cea2-5006-448f-a050-d5e53f33be232014-03-20 00:02:482014-03-20 00:06:05SUCCESS 74894086489489489489

start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties


bin/kafka-topics.sh --create --topic SUCCESS_RECORDS --partitions 1 --replication-factor 1 --zookeeper localhost:2181
bin/kafka-topics.sh --create --topic FAILED_RECORDS --partitions 1 --replication-factor 1 --zookeeper localhost:2181
bin/kafka-topics.sh --create --topic DROPPED_RECORDS --partitions 1 --replication-factor 1 --zookeeper localhost:2181


hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2181
DROPPED_RECORDS
FAILED_RECORDS
SUCCESS_RECORDS


build.properties:
-----------------
sbt.version = 1.2.8

build.sbt dependency packages:
--------------------------------
name := "myOwn"

version := "0.1"

scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"



import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}

object KafkaProducerCallLog {
  def main(args:Array[String]):Unit = {
    val props = new Properties()
    val topic = "KafkaTopic"
    props.put("bootstrap.servers","localhost:9092")
    props.put("client.id","ProducerApp")
    props.put("batch.size","32768")
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    val topicSUCCESS = "SUCCESS_RECORDS"
    val topicFAILED = "FAILED_RECORDS"
    val topicDROPPED = "DROPPED_RECORDS"

    val producer = new KafkaProducer[String,String](props)

    val file = scala.io.Source.fromFile("/home/hadoop/Desktop/vow/calllogdata.txt")

    for (line <- file.getLines()) {
      val status_pattern = "(SUCCESS|FAILED|DROPPED)".r
      val status = status_pattern.findFirstIn(line).get
      if (status == "SUCCESS") {
        val msg = new ProducerRecord[String,String](topicSUCCESS,line)
        producer.send(msg)
        println(msg)
      }
      else if (status == "FAILED") {
        val msg = new ProducerRecord[String,String](topicFAILED,line)
        producer.send(msg)
        println(msg)
      }
      else
      {
        val msg = new ProducerRecord[String,String](topicDROPPED,line)
        producer.send(msg)
        println(msg)
      }
    }
    producer.close()
  }
}





bin/kafka-console-consumer.sh --topic SUCCESS_RECORDS --bootstrap-server localhost:9092 --from-beginning

bin/kafka-console-consumer.sh --topic FAILED_RECORDS --bootstrap-server localhost:9092 --from-beginning
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469
ec59cea2-5006-448f-a035-d5e53f33be232014-03-16 00:02:482014-03-16 00:06:12FAILED  54545454546469496477
ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:02:482014-03-17 00:06:05FAILED  44554584848449644469


bin/kafka-console-consumer.sh --topic DROPPED_RECORDS --bootstrap-server localhost:9092 --from-beginning


import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer

import scala.collection.JavaConversions._

object KafkaConsumerExa1 {
  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    properties.put("bootstrap.servers", "127.0.0.1:9092")
    properties.put("group.id", "testGroup1")

    properties.put("client.id", "ConsumerApp")
    // properties.put("partition.assignment.strategy", "range");



    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val consumer = new KafkaConsumer[String, String](properties)
    val topic = "FAILED_RECORDS"


    consumer.subscribe(Collections.singletonList(topic))

    System.out.println("Subscribed to topic " + topic)
 
   var records = consumer.poll(4000)
     consumer.seekToBeginning(consumer.assignment)
    records = consumer.poll(4000)
      for (record <- records.iterator()){
        println("Received Message : "  + record)
      }
 
    consumer.commitSync()
  }
}

Tuesday, 12 March 2019

Kafka : Producer, Consumer Programming in Scala with Multi Server Configuration

// Here we are going to do Scala Programming for Kafka Producer, Kafka Consumer Programming in Scala with Multi Server Configuration

start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)



default zookeeper port number is : 2181

Make a copy of server.properties and rename it into server1.properties,server2.properties,server3.properties respectively

Open individual file and change the port number mentioned below.

sudo gedit server0.properties
broker.id=0
zoo keeper port : 2181
log.dirs=/tmp/k1/kafka-logs
listeners=PLAINTEXT://:9090

sudo gedit server1.properties
broker.id=1
zoo keeper port : 2181
log.dirs=/tmp/k1/kafka-logs
listeners=PLAINTEXT://:9091

sudo gedit server2.properties
broker.id=2
zoo keeper port : 2181
log.dirs=/tmp/k2/kafka-logs
listeners=PLAINTEXT://:9092

sudo gedit server3.properties
broker.id=3
zoo keeper port : 2181
log.dirs=/tmp/k3/kafka-logs
listeners=PLAINTEXT://:9093

Open 4 new terminals and run each lines in each terminals
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server0.properties
INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server1.properties
INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server2.properties
INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server3.properties
INFO [KafkaServer id=3] started (kafka.server.KafkaServer)


 

// 4 different kafka server instances are running
hadoop@hadoop:/usr/local/kafka$ jps
6384 Jps
4851 Kafka  // #instance 1
6243 Main
4163 Kafka // #instance 2
3492 QuorumPeerMain
4504 Kafka // #instance 3
5196 Kafka // #instance 4




// Create a new topic named as : myTopic with 4 partitions and 4 replications
hadoop@hadoop:/usr/local/kafka$  bin/kafka-topics.sh --create --topic myTopic --partitions 4 --replication-factor 4 --zookeeper localhost:2181
Created topic "myTopic".


// see the topic : myTopic description
bin/kafka-topics.sh --describe  --zookeeper localhost:2181
Topic:myTopic PartitionCount:4 ReplicationFactor:4 Configs:
Topic: myTopic Partition: 0 Leader: 3 Replicas: 3,0,1,2 Isr: 3,0,1,2
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1,2,3 Isr: 0,1,2,3
Topic: myTopic Partition: 2 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
Topic: myTopic Partition: 3 Leader: 2 Replicas: 2,3,0,1 Isr: 2,3,0,1




build.properties:
-----------------
sbt.version = 1.2.8

build.sbt dependency packages:
--------------------------------
name := "Kafka"

version := "0.1"

scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"




// Producer Programming in Scala
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}

object KafkaProducerExa2 {
  def main(args:Array[String]):Unit = {

    val props = new Properties()
    val topic = "myTopic"

    props.put("bootstrap.servers","192.168.0.106:9090,192.168.0.106:9091,192.168.0.106:9092,192.168.0.106:9093")
    props.put("acks","all")
    props.put("client.id","ProducerApp")
    props.put("retries","4")
    props.put("batch.size","32768")

    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String,String](props)
    val msg:String = "Welcome to Kafka : #"
    for (i <- 1 to 10){
      val data = new ProducerRecord[String,String](topic,msg+i.toString)
      producer.send(data)
    }
    producer.close()
    println("------Successfully published messages to topic : " + topic + "----")
  }
}




Run the program in IntelliJ IDEA.

------Successfully published messages to topic : myTopic----


// view the output in console:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic myTopic --bootstrap-server localhost:9090 --from-beginning
Welcome to Kafka : #3
Welcome to Kafka : #7
Welcome to Kafka : #1
Welcome to Kafka : #5
Welcome to Kafka : #9
Welcome to Kafka : #2
Welcome to Kafka : #6
Welcome to Kafka : #10
Welcome to Kafka : #4
Welcome to Kafka : #8



//Consumer Programming in Scala
import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer

import scala.collection.JavaConversions._

object KafkaConsumerExa1 {
  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    properties.put("bootstrap.servers", "192.168.0.106:9091")
    properties.put("group.id", "testGroup")
    properties.put("client.id", "ConsumerApp")

    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val consumer = new KafkaConsumer[String, String](properties)
    val topic = "myTopic"


  consumer.subscribe(Collections.singletonList(topic))
    System.out.println("Subscribed to topic " + topic)
    var records = consumer.poll(4000)
    consumer.seekToBeginning(consumer.assignment)
    records = consumer.poll(4000)
    for (record <- records.iterator()){
      println("Received Message : "  + record)
    }
    consumer.commitSync()
  }
}

Output:
--------
Subscribed to topic myTopic
Received Message : ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 1, offset = 4, CreateTime = 1552402871872, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #4)
Received Message : ConsumerRecord(topic = myTopic, partition = 0, leaderEpoch = 1, offset = 5, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #8)
Received Message : ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 2, offset = 6, CreateTime = 1552402871872, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #2)
Received Message : ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 2, offset = 7, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #6)
Received Message : ConsumerRecord(topic = myTopic, partition = 3, leaderEpoch = 2, offset = 8, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #10)
Received Message : ConsumerRecord(topic = myTopic, partition = 1, leaderEpoch = 1, offset = 6, CreateTime = 1552402871872, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #3)
Received Message : ConsumerRecord(topic = myTopic, partition = 1, leaderEpoch = 1, offset = 7, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #7)
Received Message : ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 1, offset = 4, CreateTime = 1552402871830, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #1)
Received Message : ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 1, offset = 5, CreateTime = 1552402871872, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #5)
Received Message : ConsumerRecord(topic = myTopic, partition = 2, leaderEpoch = 1, offset = 6, CreateTime = 1552402871874, serialized key size = -1, serialized value size = 21, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Welcome to Kafka : #9)


Kafka Producer in Scala Programming Example

start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties

bin/kafka-topics.sh --create --topic KafkaTopic --partitions 1 --replication-factor 1 --zookeeper localhost:2181


build.properties:
-----------------
sbt.version = 1.2.8

build.sbt dependency packages:
--------------------------------
name := "TwitInt"

version := "0.1"

scalaVersion := "2.11.12"


// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"



import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}

object KafkaProducerExa {
  def main(args:Array[String]):Unit = {
    val props = new Properties()
    val topic = "KafkaTopic"
    props.put("bootstrap.servers","localhost:9092")
    props.put("client.id","ScalaProducer")
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String,String](props)
    val msg:String = "Welcome to Kafka : #"
    for (i <- 1 to 10){
      val data = new ProducerRecord[String,String](topic,msg+i)
      producer.send(data)
    }
    producer.close()
    println("------Successfully published messages to topic : " + topic + "----")
  }
}


Run the .scala file in IntelliJ IDEA
------Successfully published messages to topic : KafkaTopic----


See the output in console:
--------------------------
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic KafkaTopic --bootstrap-server localhost:9092

Welcome to Kafka : #1
Welcome to Kafka : #2
Welcome to Kafka : #3
Welcome to Kafka : #4
Welcome to Kafka : #5
Welcome to Kafka : #6
Welcome to Kafka : #7
Welcome to Kafka : #8
Welcome to Kafka : #9
Welcome to Kafka : #10



 

Monday, 11 March 2019

Kafka - Configuring Multiple Servers in Single Linux System

// Here I'm describing how to configure Multiple Kafka Server instances in Single Linux System.
// I will be having 3 different server.properties files with different configurations for each

start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)



default zookeeper port number is : 2181

Make a copy of server.properties and rename them into :
server1.properties,
server2.properties,
server3.properties respectively

Open individual file and change the port number mentioned below.


sudo gedit server1.properties
broker.id=1
zoo keeper port : 2181
log.dirs=/tmp/k1/kafka-logs
listeners=PLAINTEXT://:9093

sudo gedit server2.properties
broker.id=2
zoo keeper port : 2181
log.dirs=/tmp/k2/kafka-logs
listeners=PLAINTEXT://:9094

sudo gedit server3.properties
broker.id=3
zoo keeper port : 2181
log.dirs=/tmp/k3/kafka-logs
listeners=PLAINTEXT://:9095

Open 3 new terminals and run each lines in each terminals
#1
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server1.properties
[2019-03-11 21:50:24,652] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
#2
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server2.properties
[2019-03-11 21:50:41,541] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
#3
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server3.properties
[2019-03-11 21:50:55,067] INFO [KafkaServer id=3] started (kafka.server.KafkaServer)

// 3 different kafka server instances are running
hadoop@hadoop:/usr/local/kafka$ jps
5792 QuorumPeerMain
4501 NameNode
7653 Kafka  // #1
7303 Kafka  // #2
4695 DataNode
5417 NodeManager
5228 ResourceManager
8350 Jps
7999 Kafka  // #3
4927 SecondaryNameNode


// Currently running Kafka server instances are 3. But here I specified replication-factor as 4. It got failed
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic topic1 --partitions 3 --replication-factor 4 --zookeeper localhost:2181
Error while executing topic command : Replication factor: 4 larger than available brokers: 3.
[2019-03-11 21:54:58,039] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3.
 (kafka.admin.TopicCommand$)


// Create a new topic named as : topic1 with 3 partitions and replications
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic topic1 --partitions 3 --replication-factor 3 --zookeeper localhost:2181
Created topic "topic1".

// see the topic description
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --describe topic1 --zookeeper localhost:2181
Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: topic1 Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1

// Isr : In Synch Replica

// create a new topic named : topic2 with 2 replicas
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic topic2 --partitions 3 --replication-factor 2 --zookeeper localhost:2181
Created topic "topic2".

// Describe topics
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs: // 3 replicas
Topic: topic1 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: topic1 Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1

Topic:topic2 PartitionCount:3 ReplicationFactor:2 Configs: // 2 replicas
Topic: topic2 Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: topic2 Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: topic2 Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2


// console producer to send data to broker
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --topic topic1 --broker-list localhost:9093
>kafka1
>kafka2
>kafka3
>kafka4
>kafka5
>kafka6
>kafka7
>kafka8
>kafka9
>kafka10

// This is : How the data got distributed?
hadoop@hadoop:/usr/local/kafka$ a$ bin/kaftopics.sh --describe topic1 --zookeeper localhost:2181
Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: topic1 Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1


hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --from-beginning
kafka2
kafka5
kafka8

kafka3
kafka6
kafka9
kafka1
kafka4
kafka7
kafka10
^CProcessed a total of 11 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9094 --from-beginning
kafka2
kafka5
kafka8

kafka3
kafka6
kafka9
kafka1
kafka4
kafka7
kafka10
^CProcessed a total of 11 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9095 --from-beginning
kafka2
kafka5
kafka8

kafka3
kafka6
kafka9
kafka1
kafka4
kafka7
kafka10

bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 1 --offset earliest
kafka2
kafka5
kafka8




hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 0 --offset earliest
kafka3
kafka6
kafka9


hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 1 --offset earliest
kafka2
kafka5
kafka8


hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 2 --offset earliest
kafka1
kafka4
kafka7
kafka10


Consumer Group:
---------------

If set of consumers are trying to read data from same topic,
but from different partitions these consumers must belongs to same group



sudo gedit consumer.properties
------------------------------
#consume group id
group.id=group1


B - Broker
p - partition
m - message / records

Partition replicated across:
---------------------------
p1 -> B1,B2,B3
p2 -> B2,B3,B1
p3 -> B3,B1,B2

Broker -> Partition -> Messages (round robin)
B1 -> p1 ->> m1,m4,m7
B2 -> p2 ->> m2,m5,m8
B3 -> p3 ->> m3,m6,m9

Offset mapping:
offset (0) -> m1,m2,m3
offset (1) -> m4,m5,m6
offset (2) -> m7, m8, m9



hadoop@hadoop:/usr/local/kafka$  bin/kafka-topics.sh --create --topic topic1 --partitions 3 --replication-factor 3 --zookeeper localhost:2181
Created topic "topic1".
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: topic1 Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

// producer is writing data into topics
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --topic topic1 --broker-list localhost:9093
>i love india
>2. i love pakistan
>3. i love south africa
>4. i love srilanka
>5. i love singapore
>6. i love malaysia
>7. i love indonesia
>8. i love thailand
>9. i love australia
>10. i love newzealand
>^Chadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --topic topic1 --broker-list localhost:9093
>amala
>paul
>vimal
>kalavani
>oviya

// consumer is reading data from topics
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --from-beginning
i love india
4. i love srilanka
7. i love indonesia
10. i love newzealand
2. i love pakistan
5. i love singapore
8. i love thailand
3. i love south africa
6. i love malaysia
9. i love australia
^CProcessed a total of 10 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 0 --offset earliest
2. i love pakistan
5. i love singapore
8. i love thailand
^CProcessed a total of 3 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 1 --offset earliest
i love india
4. i love srilanka
7. i love indonesia
10. i love newzealand
^CProcessed a total of 4 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 2 --offset earliest
3. i love south africa
6. i love malaysia
9. i love australia
^CProcessed a total of 3 messages
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --partition 2 --offset earliest
3. i love south africa
6. i love malaysia
9. i love australia
paul
oviya


// I forcefully stopped 3rd server : hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server3.properties
^C
// so 3rd server is failed but zookeeper lets 1st or 2nd kafka server become leader for the new inputs



hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --topic topic1 --broker-list localhost:9093
>i love india
>2. i love pakistan
>3. i love south africa
>4. i love srilanka
>5. i love singapore
>6. i love malaysia
>7. i love indonesia
>8. i love thailand
>9. i love australia
>10. i love newzealand
>^Chadoop@hadoop:/usr/local/kafka$ bin/kafka-console-producer.sh --topic topic1 --broker-list localhost:9093
>amala
>paul
>vimal
>kalavani
>oviya // 3rd server failed here
>super  // new inputs without 3rd server. But 1st or 2nd server become leader for the below inputs
>star
>rajinikanth
>i love india
>kamal
>kalai
>selvam
>awesome
>nator
>mai
>




// consumer displays the results even one of the server (i.e, server3) fails
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093 --from-beginning
i love india
4. i love srilanka
7. i love indonesia
10. i love newzealand
vimal
super
i love india
selvam
2. i love pakistan
5. i love singapore
8. i love thailand
amala
kalavani
star
kamal
3. i love south africa
6. i love malaysia
9. i love australia
paul
oviya
rajinikanth
kalai
awesome
nator


// now 3rd server is down
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2
Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,1
Topic: topic1 Partition: 2 Leader: 1 Replicas: 3,1,2 Isr: 1,2

// There are 3 replicas. a) 1,2,3. b) 2,3,1. c) 3,1,2 ==> expected
But actual is : a ) Isr : 1,2. b) Isr : 2,1. c) Isr : 1,2 
// Because 3rd server is down - Earlier we forcefully did shutdown


//Now I am manually making server3 up:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server3.properties
// so, 3rd server is up and running now.

// I'm checking it again:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --describe --zookeeper localhost:2181
// Now see the replicas - everything perfect. 3rd server got values from Server1 and Server2

Topic:topic1 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,1,3
Topic: topic1 Partition: 2 Leader: 1 Replicas: 3,1,2 Isr: 1,2,3


Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...