Sunday, 7 April 2019

Find 2nd Maximum Salary from employee dataframe in Spark with Scala

scala> dfEmp.orderBy(desc("salary")).show
+---+-------+------+------+------+
| id|   name|gender|salary|deptid|
+---+-------+------+------+------+
|106|Ayeesha|     f|  4000|    10|
|105|  Priya|     f|  3600|    12|
|104| Rashee|     f|  3500|    11|
|109|  Vinay|     m|  3200|    10|
|102| Suresh|     m|  3000|    12|
|108| Arushi|     f|  2800|    12|
|111| Shilpa|     f|  2600|    12|
|110|  Kalai|     f|  2550|    11|
|107|  Aruvi|     f|  2500|    11|
|101|   Rani|     f|  2000|    11|
|103|  Rahul|     m|  1250|    10|
|100|   Ravi|     m|  1000|    10|
+---+-------+------+------+------+


scala> dfEmp.where($"salary" < dfEmp.agg(max("salary")).first().getInt(0)).orderBy(desc("salary")).show(1)
+---+-----+------+------+------+
| id| name|gender|salary|deptid|
+---+-----+------+------+------+
|105|Priya|     f|  3600|    12|
+---+-----+------+------+------+
only showing top 1 row

// 2nd maximum salaried person
hive> select * from emp where salary not in (select max(salary) from emp ) order by salary desc limit 1;
105 Priya f 3600 12

hive> select * from (select * from emp sort by salary desc limit 2) result sort by salary limit 1;

105 Priya f 3600 12

scala> dfEmp.orderBy(desc("Salary")).limit(2).orderBy("salary").show(1);
+---+-----+------+------+------+
| id| name|gender|salary|deptid|
+---+-----+------+------+------+
|105|Priya|     f|  3600|    12|
+---+-----+------+------+------+
only showing top 1 row



scala> dfEmp.orderBy(desc("Salary")).take(2)
res87: Array[org.apache.spark.sql.Row] = Array([106,Ayeesha,f,4000,10], [105,Priya,f,3600,12])

scala> dfEmp.orderBy(desc("Salary")).take(2)(1);
res91: org.apache.spark.sql.Row = [105,Priya,f,3600,12]

Friday, 5 April 2019

Hive Queries Vs Dataframe Queries - Part 1

hadoop@hadoop:~/Desktop/vow$ touch emp.txt
hadoop@hadoop:~/Desktop/vow$ atom emp.txt

101,Sathya,1000
102,Shanthi,2000
103,Mani,3000
104,Kalai,4000
105,Aruvi,5000
106,Nila,1500
107,Praveen,2500
108,Rashee,7500
109,Pinki,3500
110,Ravi,2500

pwd : /home/hadoop/Desktop/vow


hive> create database learning;
OK
Time taken: 0.901 seconds

hive> use learning;
OK
Time taken: 0.08 seconds



hive> create external table emp(id int, name varchar(50), salary int) row format delimited fields terminated by ',';

hive> load data local inpath '/home/hadoop/Desktop/vow/emp.txt' into table emp;


hive> select * from emp;
OK
101 Sathya 1000
102 Shanthi 2000
103 Mani 3000
104 Kalai 4000
105 Aruvi 5000
106 Nila 1500
107 Praveen 2500
108 Rashee 7500
109 Pinki 3500
110 Ravi 2500
Time taken: 0.305 seconds, Fetched: 10 row(s)


scala> val empSchema = StructType(StructField("id",IntegerType,true)::StructField("name",StringType,true)::StructField("salary",IntegerType,true)::Nil)
empSchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(salary,IntegerType,true))

scala> val df = spark.read.format("csv").option("header","false").schema(empSchema).load("/home/hadoop/Desktop/vow/emp.txt");
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df.printSchema
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: integer (nullable = true)


hive> select * from emp;
OK
101 Sathya 1000
102 Shanthi 2000
103 Mani 3000
104 Kalai 4000
105 Aruvi 5000
106 Nila 1500
107 Praveen 2500
108 Rashee 7500
109 Pinki 3500
110 Ravi 2500

scala> df.show
+---+-------+------+
| id|   name|salary|
+---+-------+------+
|101| Sathya|  1000|
|102|Shanthi|  2000|
|103|   Mani|  3000|
|104|  Kalai|  4000|
|105|  Aruvi|  5000|
|106|   Nila|  1500|
|107|Praveen|  2500|
|108| Rashee|  7500|
|109|  Pinki|  3500|
|110|   Ravi|  2500|
+---+-------+------+


hive> select max(salary) from emp;
7500

scala> df.select(max(df("salary")) as "Salary").show
or
scala> df.select(max($"salary") as "Salary").show

+------+
|Salary|
+------+
|  7500|
+------+

hive> select max(salary),min(salary) from emp;
OK
7500 1000

scala> df.select(max(df("salary")) as "MaxSal", min(df("salary")) as "MinSal").show
df.select(max($"salary") as "MaxSal",min($"salary") as "MinSal").show
+------+------+
|MaxSal|MinSal|
+------+------+
|  7500|  1000|
+------+------+


hive> select salary from emp order by salary;
OK
1000
1500
2000
2500
2500
3000
3500
4000
5000
7500

scala> df.select(df("salary")).orderBy("salary").show
df.select($"salary").orderBy($"salary").show
+------+
|salary|
+------+
|  1000|
|  1500|
|  2000|
|  2500|
|  2500|
|  3000|
|  3500|
|  4000|
|  5000|
|  7500|
+------+

hive> select salary from emp order by salary desc;
OK
7500
5000
4000
3500
3000
2500
2500
2000
1500
1000

import org.apache.spark.sql.functions._

scala> df.select(df("salary")).orderBy(desc("salary")).show
or
scala> df.select($"salary").orderBy($"salary".desc).show
+------+
|salary|
+------+
|  7500|
|  5000|
|  4000|
|  3500|
|  3000|
|  2500|
|  2500|
|  2000|
|  1500|
|  1000|
+------+



hive> select sum(salary) from emp;
OK
32500


scala> df.select(sum("salary") as "Sum").show
or
scala> df.select(sum($"salary") as "Sum").show

+-----+
|  Sum|
+-----+
|32500|
+-----+



hadoop@hadoop:~$ touch emp.txt
hadoop@hadoop:~$ atom emp.txt

id,name,gender,salary,deptid
100,Ravi,m,1000,10
101,Rani,f,2000,11
102,Suresh,m,3000,12
103,Rahul,m,1250,10
104,Rashee,f,3500,11
105,Priya,f,3600,12
106,Ayeesha,f,4000,10
107,Aruvi,f,2500,11
108,Arushi,f,2800,12
109,Vinay,m,3200,10
110,Kalai,f,2550,11
111,Shilpa,f,2600,12

hadoop@hadoop:~$ atom dept.txt
hadoop@hadoop:~$ atom dept.txt

deptid,deptname
10,Marketing
11,Sales
12,Production

//  tblproperties("skip.header.line.count"="1");  --> which skips the header line

hive> create external table emp(id int, name varchar(50),gender char(1), salary int, deptid int) row format delimited fields terminated by ',' tblproperties("skip.header.line.count"="1");


hive> load data local inpath "/home/hadoop/Desktop/vow/emp.txt" into table emp;





hive> create external table dept(deptid int, deptname varchar(50)) row format delimited fields terminated by ',' tblproperties("skip.header.line.count"="1");

hive> load data local inpath "/home/hadoop/Desktop/vow/dept.txt" into table dept;




scala> val empSchema = StructType(StructField("id",IntegerType,true)::StructField("name",StringType,true)::StructField("gender",StringType,true)::StructField("salary",IntegerType,true)::StructField("deptid",IntegerType,true)::Nil)
empSchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(gender,StringType,true), StructField(salary,IntegerType,true), StructField(deptid,IntegerType,true))

scala> scala> val deptSchema = StructType(StructField("deptid",IntegerType,true)::StructField("deptname",StringType,true)::Nil)
deptSchema: org.apache.spark.sql.types.StructType = StructType(StructField(deptid,IntegerType,true), StructField(deptname,StringType,true))


 val dfEmp  = spark.read.format("csv").option("header","true").schema(empSchema).load("/home/hadoop/Desktop/vow/emp.txt");

 val dfDept = spark.read.format("csv").option("header","true").schema(deptSchema).load("/home/hadoop/Desktop/vow/dept.txt");

 hive> select * from emp;
OK
100 Ravi m 1000 10
101 Rani f 2000 11
102 Suresh m 3000 12
103 Rahul m 1250 10
104 Rashee f 3500 11
105 Priya f 3600 12
106 Ayeesha f 4000 10
107 Aruvi f 2500 11
108 Arushi f 2800 12
109 Vinay m 3200 10
110 Kalai f 2550 11
111 Shilpa f 2600 12

 scala> dfEmp.show
 or
 scala> dfEmp.select("*").show

+---+-------+------+------+------+
| id|   name|gender|salary|deptid|
+---+-------+------+------+------+
|100|   Ravi|     m|  1000|    10|
|101|   Rani|     f|  2000|    11|
|102| Suresh|     m|  3000|    12|
|103|  Rahul|     m|  1250|    10|
|104| Rashee|     f|  3500|    11|
|105|  Priya|     f|  3600|    12|
|106|Ayeesha|     f|  4000|    10|
|107|  Aruvi|     f|  2500|    11|
|108| Arushi|     f|  2800|    12|
|109|  Vinay|     m|  3200|    10|
|110|  Kalai|     f|  2550|    11|
|111| Shilpa|     f|  2600|    12|
+---+-------+------+------+------+

hive> select * from dept;
OK
10 Marketing
11 Sales
12 Production
Time taken: 0.238 seconds, Fetched: 3 row(s)

scala> dfDept.show
or
scala> dfDept.select("*").show

+------+----------+
|deptid|  deptname|
+------+----------+
|    10| Marketing|
|    11|     Sales|
|    12|Production|
+------+----------+


scala> dfEmp.select(max($"salary") as "MaxSal").show
+------+
|MaxSal|
+------+
|  4000|
+------+

hive> select max(salary) from emp;
4000


scala> dfEmp.select(min($"salary") as "MaxSal").show
+------+
|MaxSal|
+------+
|  1000|
+------+

hive> select min(salary) from emp;
1000


hive> select max(salary) as MaxSal, min(salary) as MinSal from emp;
4000 1000

scala> dfEmp.select(max("salary") as "MaxSal",min("salary") as "MinSal").show
+------+------+
|MaxSal|MinSal|
+------+------+
|  4000|  1000|
+------+------+


hive> select deptid,max(salary) from emp group by deptid order by deptid;
10 4000
11 3500
12 3600



scala> dfEmp.groupBy("deptid").agg(max("salary") as "maxSal").orderBy("deptid").show
+------+------+                                                               
|deptid|maxSal|
+------+------+
|    10|  4000|
|    11|  3500|
|    12|  3600|
+------+------+


hive> select deptid,count(name) from emp group by deptid order by deptid;
10 4
11 4
12 4

scala> dfEmp.groupBy("deptid").agg(count("name") as "nameCount").orderBy("deptid").show
+------+---------+                                                           
|deptid|nameCount|
+------+---------+
|    10|        4|
|    11|        4|
|    12|        4|
+------+---------+




 scala> dfEmp.select($"salary").orderBy("salary").show
 or
 scala> dfEmp.select(dfEmp("salary")).orderBy("salary").show

+------+
|salary|
+------+
|  1000|
|  1250|
|  2000|
|  2500|
|  2550|
|  2600|
|  2800|
|  3000|
|  3200|
|  3500|
|  3600|
|  4000|
+------+

select salary from emp order by salary
1000
1250
2000
2500
2550
2600
2800
3000
3200
3500
3600
4000


scala> dfEmp.select($"salary").orderBy(desc("salary")).show
or
dfEmp.select("salary").orderBy(desc("salary")).show
+------+
|salary|
+------+
|  4000|
|  3600|
|  3500|
|  3200|
|  3000|
|  2800|
|  2600|
|  2550|
|  2500|
|  2000|
|  1250|
|  1000|
+------+

hive> select salary from emp order by salary desc;
4000
3600
3500
3200
3000
2800
2600
2550
2500
2000
1250
1000



hive> select gender,max(salary) from emp group by gender order by gender;
f 4000
m 3200

scala> dfEmp.groupBy("gender").agg(max("salary") as "maxSal").orderBy("gender").show
+------+------+                                                               
|gender|maxSal|
+------+------+
|     f|  4000|
|     m|  3200|
+------+------+


hive> select gender,sum(salary) from emp group by gender order by gender;
f 23550
m 8450

scala> dfEmp.groupBy("gender").agg(sum("salary") as "GenderSumSal").orderBy("gender").show
+------+------------+                                                         
|gender|GenderSumSal|
+------+------------+
|     f|       23550|
|     m|        8450|
+------+------------+





hive> select * from emp order by salary desc;
OK
106 Ayeesha f 4000 10
105 Priya f 3600 12
104 Rashee f 3500 11
109 Vinay m 3200 10
102 Suresh m 3000 12
108 Arushi f 2800 12
111 Shilpa f 2600 12
110 Kalai f 2550 11
107 Aruvi f 2500 11
101 Rani f 2000 11
103 Rahul m 1250 10
100 Ravi m 1000 10


scala> dfEmp.orderBy(desc("salary")).show
+---+-------+------+------+------+
| id|   name|gender|salary|deptid|
+---+-------+------+------+------+
|106|Ayeesha|     f|  4000|    10|
|105|  Priya|     f|  3600|    12|
|104| Rashee|     f|  3500|    11|
|109|  Vinay|     m|  3200|    10|
|102| Suresh|     m|  3000|    12|
|108| Arushi|     f|  2800|    12|
|111| Shilpa|     f|  2600|    12|
|110|  Kalai|     f|  2550|    11|
|107|  Aruvi|     f|  2500|    11|
|101|   Rani|     f|  2000|    11|
|103|  Rahul|     m|  1250|    10|
|100|   Ravi|     m|  1000|    10|
+---+-------+------+------+------+




hive> select * from emp order by salary desc limit 2;
OK
106 Ayeesha f 4000 10
105 Priya f 3600 12


scala> dfEmp.orderBy(desc("salary")).show(2);
+---+-------+------+------+------+
| id|   name|gender|salary|deptid|
+---+-------+------+------+------+
|106|Ayeesha|     f|  4000|    10|
|105|  Priya|     f|  3600|    12|
+---+-------+------+------+------+
only showing top 2 rows



// top salaried person
hive> select * from emp order by salary desc limit 1;
106 Ayeesha f 4000 10


scala> dfEmp.orderBy(desc("salary")).show(1);
+---+-------+------+------+------+
| id|   name|gender|salary|deptid|
+---+-------+------+------+------+
|106|Ayeesha|     f|  4000|    10|
+---+-------+------+------+------+
only showing top 1 row






//extract single value (scalar) from dataframe
scala> val x:Int = dfEmp.agg(max("salary")).head().getInt(0)
x: Int = 4000



scala> dfEmp.orderBy(desc("salary")).show
+---+-------+------+------+------+
| id|   name|gender|salary|deptid|
+---+-------+------+------+------+
|106|Ayeesha|     f|  4000|    10|
|105|  Priya|     f|  3600|    12|
|104| Rashee|     f|  3500|    11|
|109|  Vinay|     m|  3200|    10|
|102| Suresh|     m|  3000|    12|
|108| Arushi|     f|  2800|    12|
|111| Shilpa|     f|  2600|    12|
|110|  Kalai|     f|  2550|    11|
|107|  Aruvi|     f|  2500|    11|
|101|   Rani|     f|  2000|    11|
|103|  Rahul|     m|  1250|    10|
|100|   Ravi|     m|  1000|    10|
+---+-------+------+------+------+


scala> dfEmp.where($"salary" < dfEmp.agg(max("salary")).first().getInt(0)).orderBy(desc("salary")).show(1)
+---+-----+------+------+------+
| id| name|gender|salary|deptid|
+---+-----+------+------+------+
|105|Priya|     f|  3600|    12|
+---+-----+------+------+------+
only showing top 1 row

// 2nd maximum salaried person
hive> select * from emp where salary not in (select max(salary) from emp ) order by salary desc limit 1;
105 Priya f 3600 12

hive> select * from (select * from emp sort by salary desc limit 2) result sort by salary limit 1;

105 Priya f 3600 12

scala> dfEmp.orderBy(desc("Salary")).limit(2).orderBy("salary").show(1);
+---+-----+------+------+------+
| id| name|gender|salary|deptid|
+---+-----+------+------+------+
|105|Priya|     f|  3600|    12|
+---+-----+------+------+------+
only showing top 1 row



scala> dfEmp.orderBy(desc("Salary")).take(2)
res87: Array[org.apache.spark.sql.Row] = Array([106,Ayeesha,f,4000,10], [105,Priya,f,3600,12])

scala> dfEmp.orderBy(desc("Salary")).take(2)(1);
res91: org.apache.spark.sql.Row = [105,Priya,f,3600,12]

Thursday, 4 April 2019

Calculate the Square Root of Sum of Squares of Each numbers in a given file - using UDF which use Option..Some..None

Calculate the Square Root of Sum of Squares of Each numbers in a given file - using UDF which use Option..Some..None

// Excluded all characters from each line and find the square root of sum of squares of each numbers

$ cat charsAndNumbers.txt
1,a,b,c,2,3,4
2,3,4,x,y,z
s,t,u,5,2
m,n,8,10
5,2,1,a,x,y
7,a,x,2,6,h


scala> val r1 = sc.textFile("/home/hadoop/Desktop/vow/charsAndNumbers.txt")

// user defined function to extract only integers and exclude all characters
 def toInt(s:String):Option[Int] ={
   try{
Some(s.toInt)
   }
   catch {
case e: Exception => None
   }
       }
     

val r2 = r1.map(x => {
                       val fields = x.split(",")
   var s = 0
   for(f <- fields)
{
val currentNumber =  toInt(f).getOrElse(0) // calling UDF
if (currentNumber != 0){
s = s + (currentNumber * currentNumber)
}
}

s
                   })


scala> r2.collect
res1: Array[Int] = Array(30, 29, 29, 164, 30, 89)


scala> val result = r2.reduce(_+_)
result: Int = 371


scala> val finalResult = scala.math.sqrt(result)
finalResult: Double = 19.261360284258224

scala> scala.math.sqrt( (1*1) + (2*2) + (3*3) + (4*4)
     |  + (2*2) + (3*3) + (4*4)
     |  + (5*5) + (2*2)
     |  + (8*8) + (10*10)
     |  + (5*5) + (2*2) + (1*1)
     |  + (7*7) + (2*2) + (6*6))
res8: Double = 19.261360284258224



Find the square root of sum of squares of each numbers from a file using Spark with Scala

// Exclude all characters from each line and find the square root of sum of squares of each numbers


// given input file has character and numbers separated by comma
$ cat charsAndNumbers.txt
1,a,b,c,2,3,4
2,3,4,x,y,z
s,t,u,5,2
m,n,8,10
5,2,1,a,x,y
7,a,x,2,6,h


scala.math.sqrt( (1*1) + (2*2) + (3*3) + (4*4)
 + (2*2) + (3*3) + (4*4)
 + (5*5) + (2*2)
 + (8*8) + (10*10)
 + (5*5) + (2*2) + (1*1)
 + (7*7) + (2*2) + (6*6))

scala> val r1 = sc.textFile("/home/hadoop/Desktop/vow/charsAndNumbers.txt")

scala> r1.foreach(println)
1,a,b,c,2,3,4
2,3,4,x,y,z
s,t,u,5,2
m,n,8,10
5,2,1,a,x,y
7,a,x,2,6,h



val r2 = r1.map(x => {
                       val fields = x.split(",")
   var s = 0
   for(f <- fields)
{
  try
  {
s = s + (f.toInt * f.toInt)
  }
  catch
  {
case ex: Exception  => {
}
  }
}
s
                   })


scala> r2.collect
res1: Array[Int] = Array(30, 29, 29, 164, 30, 89)

scala> (1*1) + (2*2) + (3*3) + (4*4)
res2: Int = 30

scala> (2*2) + (3*3) + (4*4)
res3: Int = 29

scala> (5*5) + (2*2)
res4: Int = 29

scala> (8*8) + (10*10)
res5: Int = 164

scala> (5*5) + (2*2) + (1*1)
res6: Int = 30

scala> (7*7) + (2*2) + (6*6)
res7: Int = 89



scala> r2.foreach(println)
30
29
29
164
30
89


scala> val result = r2.reduce(_+_)
result: Int = 371


scala> val finalResult = scala.math.sqrt(result)
finalResult: Double = 19.261360284258224

scala> scala.math.sqrt( (1*1) + (2*2) + (3*3) + (4*4)
     |  + (2*2) + (3*3) + (4*4)
     |  + (5*5) + (2*2)
     |  + (8*8) + (10*10)
     |  + (5*5) + (2*2) + (1*1)
     |  + (7*7) + (2*2) + (6*6))
res8: Double = 19.261360284258224



Wednesday, 3 April 2019

Unix Shell Scripting Crash Course

$ touch hello.sh // create an empty file
$ atom hello.sh  // open atom editor
hello.sh:
--------
#!/usr/bin/env bash
#hello world sample script
echo Hello world!
echo Mars is red!

hadoop@hadoop:~/Desktop/vow/shellscript$ chmod 755 hello.sh

$ ./hello.sh

$ sh hello.sh
Hello world!
Mars is red!



$ touch greeting.sh

$ atom greeting.sh
#!/usr/bin/env bash
FIRST_NAME=Bob
FAVORITE_COLOR=blue
echo Hi $FIRST_NAME, your favorite color is $FAVORITE_COLOR

$ ./greeting.sh
Hi Bob, your favorite color is blue


greeting.sh:
-------------
#!/usr/bin/env bash
FIRST_NAME="Bob Roberts"
FAVORITE_COLOR=blue
echo Hi $FIRST_NAME, your favorite color is $FAVORITE_COLOR


$ ./greeting.sh
Hi Bob Roberts, your favorite color is blue

//Parameters
$0 - name of the script, the path is included

$1, $2, $3, ${10},
${255} - the last parameter


$ touch params.sh
$ atom params.sh

params.sh
------------
#!/usr/bin/env bash
echo File name is, $0
echo First Name : $1
echo Last Name : $2


$ ./params.sh Kapil Dev
File name is, ./params.sh
First Name : Kapil
Last Name : Dev



#!/usr/bin/env bash
FIRST_NAME=$1
LAST_NAME=$2
echo Hello, $FIRST_NAME, $LAST_NAME
echo $`date`
echo $`pwd`


$ bash params.sh sare ga
Hello, sare, ga
$Wed Apr 3 10:15:42 IST 2019
$/home/hadoop/Desktop/vow/shellscript



Challenge:
----------
Create a script named sport.sh
Make it executable
Accept 2 parameters : name and a favorite sport
Display any sentence to the console using those inputs


sport.sh:
----------
#!/usr/bin/env bash
NAME=$1
SPORT=$2
echo $NAME likes to watch $SPORT.

Execute the script:
-------------------
$ sh sport.sh Vijay Soccer
Vijay likes to watch Soccer.


If example:
-----------
#!/usr/bin/env bash
COLOR=$1
if [ $COLOR = "blue" ]
then
  echo "The color is blue"
fi

USER_GUESS=$2
COMPUTER=50

if [ $USER_GUESS -lt $COMPUTER ]
then
    echo "You are too Low"
fi


$ ./if.sh blue 40
The color is blue
You are too Low



#!/usr/bin/env bash
COLOR=$1
if [ $COLOR = "blue" ]
then
  echo "The color is blue"
else
  echo "The color is NOT blue"
fi

USER_GUESS=$2
COMPUTER=50

if [ $USER_GUESS -lt $COMPUTER ]
then
    echo "You are too Low"
else
    echo "You are equal or too high"
fi


$ ./if.sh Orange 55
The color is NOT blue
You are equal or too high



#!/usr/bin/env bash
USER_GUESS=$1
COMPUTER=50
if [ $USER_GUESS -lt $COMPUTER ]
then
  echo "You are too low"
elif [ $USER_GUESS -gt $COMPUTER ]
then
  echo "You are too High"
else
  echo "You have guessed it"
fi


$ ./if.sh 50
You have guessed it

$ ./if.sh 44
You are too low

$ ./if.sh 66
You are too High




whileexa.sh:
------------
#!/usr/bin/env bash
COUNT=0
while [ $COUNT -lt 10 ]
do
  echo "COUNT = $COUNT"
  ((COUNT++))
done

echo "While Loop Finished!".
exit 0


$ ./whileexa.sh
COUNT = 0
COUNT = 1
COUNT = 2
COUNT = 3
COUNT = 4
COUNT = 5
COUNT = 6
COUNT = 7
COUNT = 8
COUNT = 9
While Loop Finished!.


for.sh: (for each - multiple arguments)
-------
#!/usr/bin/env bash
NAMES=$@

for NAME in $NAMES
do
  echo "Hello $NAME"
done

echo "for loop terminated"



$ ./for.sh a b c d e f g h (n number of variable number of arguments)
Hello a
Hello b
Hello c
Hello d
Hello e
Hello f
Hello g
Hello h
for loop terminated


$ ./for.sh Awesome Outstanding Tremendous
Hello Awesome
Hello Outstanding
Hello Tremendous
for loop terminated



// break example
for.sh:
-------
#!/usr/bin/env bash
NAMES=$@

for NAME in $NAMES
do
  if [ $NAME = "Tracy" ]
  then
    break
  fi
  echo "Hello $NAME"
done

echo "for loop terminated"



$ ./for.sh Stacy Tracy Lacy
Hello Stacy
for loop terminated



Challenge:
-----------
Write a script named counter.sh
It should count from 1 to the number entered by the user
Through the loop, display the current count value
Once the loop terminates, display "Loop finished"

#!/usr/bin/env bash
COUNT=1
END=$1

while [ $COUNT -le $END ]
do
  echo "COUNT = $COUNT"
  ((COUNT++))
done

echo "Loop Finished."


$ ./counter.sh 5
COUNT = 1
COUNT = 2
COUNT = 3
COUNT = 4
COUNT = 5
Loop Finished.


$ ./counter.sh 7
COUNT = 1
COUNT = 2
COUNT = 3
COUNT = 4
COUNT = 5
COUNT = 6
COUNT = 7
Loop Finished.


//Environment variable example
vars.sh:
--------
#!/usr/bin/env bash
echo "The PATH is : $PATH"
echo "The terminal is : $TERM"
echo "The editor is : $EDITOR"

if [ -z $EDITOR ]
then
  echo "The EDITOR variable is not set"
fi

PATH="/bob"
echo "The PATH IS : $PATH"


output:
---------
$ ./vars.sh
The PATH is : /usr/local/hive/bin:/usr/local/hive/lib:/usr/local/spark/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/usr/local/java/bin:/usr/local/hadoop/bin:/usr/local/hadoop/sbin:/usr/local/kafka/bin
The terminal is : xterm-256color
The editor is :
The EDITOR variable is not set
The PATH IS : /bob

HOME - user's home directory
PATH - directories which are searched for commands
HOSTNAME - hostname of the machine
SHELL - shell thats being used
USER - user of this session
TERM - type of command-line terminal that is being used


// display environmental variables
$ echo $HOME
/home/hadoop

$ echo $PATH
/usr/local/hive/bin:/usr/local/hive/lib:/usr/local/spark/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/usr/local/java/bin:/usr/local/hadoop/bin:/usr/local/hadoop/sbin:/usr/local/kafka/bin

$ echo $HOSTNAME
hadoop

$ echo $SHELL
/bin/bash

$ echo $TERM
xterm-256color

$ echo $USER
hadoop

Challenge:
----------

Create a Script named : env.sh
Display a sentence
Include the computer name, user's name, home directory

env.sh:
--------
#!/usr/bin/env bash
echo "The computer's name is $HOSTNAME, the user's name is $USER, AND THE home directory is $HOME"
exit 0


$ ./env.sh
The computer's name is hadoop, the user's name is hadoop, AND THE home directory is /home/hadoop


Function Example:
-------------------
#!/usr/bin/env bash

function Hello(){
  echo "Hello!"
}

Goodbye(){
  echo "Goodbye!"
}

echo "Calling the Hello function"
Hello

echo "Calling the Goodbye function"
Goodbye
# Do not call like Goodbye()
exit 0
#define the function first, then call it later


$ ./func.sh
Calling the Hello function
Hello!
Calling the Goodbye function
Goodbye!


// Function with Parameters example
#!/usr/bin/env bash

function Hello(){
  local LNAME=$1
  echo "Hello! $LNAME"
}

Goodbye(){
  echo "Goodbye! $1"
}

echo "Calling the Hello function"
Hello Steve

echo "Calling the Goodbye function"
Goodbye David

exit 0



$ ./func.sh
Calling the Hello function
Hello! Steve
Calling the Goodbye function
Goodbye! David


//pipe operation example:
#!/usr/bin/env bash

FILES=`ls -1 | sort -r | head -3`
COUNT=1

for FILE in $FILES
do
  echo "FILE #$COUNT : $FILE"
  ((count++))
done

exit 0



output:
----------
$ ./pipe.sh
FILE #COUNT : whileexa.sh
FILE #COUNT : vars.sh
FILE #COUNT : sport.sh


Challenge:
----------

Create a script named pfunc.sh
Create two functions in the script:
a) GetFiles,
b) ShowFiles

GetFiles returns the first 10 files in the directory

pfunch.sh:
-----------
#!/usr/bin/env bash
function GetFiles(){
  FILES=`ls -1 | sort | head -10`
}

function ShowFiles(){
  local COUNT=1
  for FILE in $@
  do
    echo "FILE #$COUNT : $FILE"
    ((COUNT++))
  done
}

GetFiles
ShowFiles $FILES

exit 0

output:
-------
$ ./pfunch.sh
FILE #1 : counter.sh
FILE #2 : env.sh
FILE #3 : for.sh
FILE #4 : func.sh
FILE #5 : greeting.sh
FILE #6 : hello.sh
FILE #7 : if.sh
FILE #8 : params.sh
FILE #9 : pfunch.sh
FILE #10 : pipe.sh


File Example:
--------------

names.txt:
----------
Ravi
Kumar
Aradhana
Siva
Karthikeyan
Dhanush
Viswanathan
SelvaKumar
Ulagappan

fileReader.sh:
---------------
#!/usr/bin/env bash
COUNT=1

while IFS="" read -r LINE
do
  echo "LINE $COUNT : $LINE"
  ((COUNT++))
done < "$1"

exit 0


//IFS means Internal Field Separator \t \n ,
// execute the script and pass names.txt as input argument / parameter
$ ./fileReader.sh names.txt
LINE 1 : Ravi
LINE 2 : Kumar
LINE 3 : Aradhana
LINE 4 : Siva
LINE 5 : Karthikeyan
LINE 6 : Dhanush
LINE 7 : Viswanathan
LINE 8 : SelvaKumar
LINE 9 : Ulagappan


// create a file using redirection operators

 $ ./fileReader.sh names.txt > output1.txt  // overwrite / create
 $ ./fileReader.sh names.txt >> output1.txt // append
 $ cat output1.txt
LINE 1 : Ravi
LINE 2 : Kumar
LINE 3 : Aradhana
LINE 4 : Siva
LINE 5 : Karthikeyan
LINE 6 : Dhanush
LINE 7 : Viswanathan
LINE 8 : SelvaKumar
LINE 9 : Ulagappan
LINE 1 : Ravi
LINE 2 : Kumar
LINE 3 : Aradhana
LINE 4 : Siva
LINE 5 : Karthikeyan
LINE 6 : Dhanush
LINE 7 : Viswanathan
LINE 8 : SelvaKumar
LINE 9 : Ulagappan



// cksum of a file
//whether the file is tampered or not

hadoop@hadoop:~/Desktop/vow/shellscript$ cat  names.txt
Ravi
Kumar
Aradhana
Siva
Karthikeyan
Dhanush
Viswanathan
SelvaKumar
Ulagappan

hadoop@hadoop:~/Desktop/vow/shellscript$ cksum names.txt  // original
4016456064 78 names.txt  // 3 parameters checksum value, filesize,filename

hadoop@hadoop:~/Desktop/vow/shellscript$ atom names.txt
Ravi
Kumar
Aradhana
Siva
Karthikeyan
Dhanush
Viswanaathan // added one extra 'a'
SelvaKumar
Ulagappan

hadoop@hadoop:~/Desktop/vow/shellscript$ cksum names.txt
783674190 79 names.txt  // because of file tampered, the cksum value changed


hadoop@hadoop:~/Desktop/vow/shellscript$ atom names.txt
Ravi
Kumar
Aradhana
Siva
Karthikeyan
Dhanush
Viswanathan // removed extra 'a'
SelvaKumar
Ulagappan

hadoop@hadoop:~/Desktop/vow/shellscript$ cksum names.txt
4016456064 78 names.txt  // file is not tampered - i mean transported without any corruption, or modification

4016456064 -- value should be sent via e-mail. that number should match before and after tranfer


Challenge:
----------
Create a script named read3.sh
Have it read a file name passed as a parameter
It should only display the first 3 lines, with count

read3.sh:
--------
#!/usr/bin/env bash
COUNT=1

while IFS="" read -r LINE
do
  echo "LINE $COUNT : $LINE"
  if [ $COUNT -ge 3 ]
  then
    break
  fi
  ((COUNT++))
done < "$1"

exit 0

output:
--------
$ ./read3.sh names.txt
LINE 1 : Ravi
LINE 2 : Kumar
LINE 3 : Aradhana


delayexa.sh:
-------------
#!/usr/bin/env bash
DELAY=$1
if [ -z $DELAY ]
then
  echo "You must supply a delay"
  exit 1
fi

echo "Going to sleep for $DELAY seconds"
sleep $DELAY
echo "We are awake now."
exit 0



// put & at the end.
$ ./delayexa.sh 5 &
[1] 5833 // process id of the script running in background
hadoop@hadoop:~/Desktop/vow/shellscript$ Going to sleep for 5 seconds
We are awake now.

[1]+  Done                    ./delayexa.sh 5


output:
-------
$ ./delayexa.sh 5
Going to sleep for 5 seconds
We are awake now.




proc.sh:
---------

#!/usr/bin/env bash
STATUS=0

if [ -z $1 ]
then
  echo "Please supply a PID"
  exit 1
fi

echo "Watching PID = $1"

while [ $STATUS -eq 0 ]
do
  ps $1 > /dev/null
  STATUS=$?
done
echo "Process $1 has terminated"
exit 0


// open a new terminal window, and start nano editor
$ nano a


$ ps -a
  PID TTY          TIME CMD
 5971 pts/1    00:00:00 nano // see the process id for nano is 5971
 5973 pts/0    00:00:00 ps


// start the shell script
$ ./proc.sh 5971
Watching PID = 5971

// stop nano
Process 5971 has terminated



// run the delayexa.sh script
$ ./delayexa.sh 20 &
[1] 8468
hadoop@hadoop:~/Desktop/vow/shellscript$ Going to sleep for 20 seconds
We are awake now.

// pass the process id of delay program
$ ./proc.sh 8468
Watching PID = 8468
Process 8468 has terminated



//Get input from user
prompt.sh:
----------
#!/usr/bin/env bash
read -p "What is your first name? " NAME
echo "Your name is : $NAME"
exit 0


output:
-------
$ ./prompt.sh
What is your first name? sarega
Your name is : sarega




//validating user input in shell scripting:
user.sh:
---------
#!/usr/bin/env bash
VALID=0

while [ $VALID -eq 0 ]
do
  read -p "Please enter your name and age : " NAME  AGE
  if [[ ( -z $NAME ) || ( -z $AGE ) ]]
  then
    echo "Not enough parameters passed"
    continue
  elif [[ ! $NAME =~ ^[A-Za-z]+$ ]]
  then
    echo "Non Alpha characters detected [ $NAME ]"
    continue
  elif [[ ! $AGE =~ ^[0-9]+$ ]]
  then
    echo "Non digit characters detected [ $AGE ]"
    continue
  fi
  VALID=1
done
echo "Name = $NAME and Age = $AGE"
exit 0



output:
---------
$ ./user.sh
Please enter your name and age : SARA 40
Name = SARA and Age = 40
hadoop@hadoop:~/Desktop/vow/shellscript$ ./user.sh
Please enter your name and age :
Not enough parameters passed
Please enter your name and age : SA
Not enough parameters passed
Please enter your name and age : SA 3
Name = SA and Age = 3


//Challenge
Create a script named guess.sh
Set a global variable named COMPUTER to a number between 1 and 50
Take input from the user
if the user's input matches COMPUTER, they won



guess.sh:
---------
#!/usr/bin/env bash
COMPUTER=50
PLAYING=0
while [ $PLAYING -eq 0 ]
do
  read -p "What's your guess : " INPUT
  if [ $INPUT -eq $COMPUTER ]
  then
    echo "You've won, the number was $COMPUTER"
    exit 0
  elif [ $INPUT -lt $COMPUTER ]
  then
    echo "You're too low"
  fi
done

exit 0




$ ./guess.sh
What's your guess : 3
You're too low
What's your guess : 5
You're too low
What's your guess : 50
You've won, the number was 50

Friday, 29 March 2019

Space into Comma Transformation for Dataframe data using Spark with Scala

// Transforming Space into Comma in Dataframe data
scala> val obj = List( (1001, "7 2234 2342 2522"), (1002, "2222 2223 2224 2225"), (1003, "2000 2001 2002 2003 2004"), (1004, "2005 2006 2000 7 2001 2010"))
obj: List[(Int, String)] = List((1001,7 2234 2342 2522), (1002,2222 2223 2224 2225), (1003,2000 2001 2002 2003 2004), (1004,2005 2006 2000 7 2001 2010))


scala> val r1 = sc.parallelize(obj);
r1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[1] at parallelize at <console>:26

scala>  r1.foreach(println)
(1001,7 2234 2342 2522)
(1002,2222 2223 2224 2225)
(1003,2000 2001 2002 2003 2004)
(1004,2005 2006 2000 7 2001 2010)


 case class UserStory(userid:String, storyid:String)

 val r2 = r1.map (x => {
       val userid = x._1.toString
       val storyid = x._2.toString
       UserStory(userid,storyid)
       })
 
scala> r2.foreach(println)
UserStory(1001,7 2234 2342 2522)
UserStory(1002,2222 2223 2224 2225)
UserStory(1003,2000 2001 2002 2003 2004)
UserStory(1004,2005 2006 2000 7 2001 2010)


scala> val df = r2.toDF
df: org.apache.spark.sql.DataFrame = [userid: string, storyid: string]

scala> df.printSchema
root
 |-- userid: string (nullable = true)
 |-- storyid: string (nullable = true)


scala> df.show
+------+--------------------+
|userid|             storyid|
+------+--------------------+
|  1001|    7 2234 2342 2522|
|  1002| 2222 2223 2224 2225|
|  1003|2000 2001 2002 20...|
|  1004|2005 2006 2000 7 ...|
+------+--------------------+

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> df.select(col("userid") as "user id",regexp_replace(col("storyid")," ",",") as "story id").show
+-------+--------------------+
|user id|            story id|
+-------+--------------------+
|   1001|    7,2234,2342,2522|
|   1002| 2222,2223,2224,2225|
|   1003|2000,2001,2002,20...|
|   1004|2005,2006,2000,7,...|
+-------+--------------------+


Wednesday, 27 March 2019

Compare 2 different strings and finding matching letters and count of matched

// Here we are going to compare 2 different strings and finding matching letters and count of matched

 val firstString = "uuai aao ioaau eieoiou"
 val secondString = "i love india and singapore"
 val m = new Array[Char](50)
 var i = 0
 var leftString  = ""


 for (c <- firstString) {
     m(i) = c
     i = i  + 1
     }

  for (c <- m.distinct.sorted(Ordering.Char)){
                if (c != null) {
      leftString = leftString + c.toString
      }
      }

  m.distinct.sorted(Ordering.Char)

  var output = Map[String,String]()
  var count = 0
  for (lc <- leftString) {
     count =   secondString.count(_ == lc)
     output = output + (lc.toString ->count.toString)
     }

 println(output)

 scala>  println(output)
Map(e -> 2,  -> 0, u -> 0, a -> 3, i -> 4,   -> 4, o -> 2)

Monday, 25 March 2019

Scala Crash Course

scala> val myIntArray:Array[Int] = new Array(3)
myIntArray: Array[Int] = Array(0, 0, 0)

scala> myIntArray
res0: Array[Int] = Array(0, 0, 0)

scala> myIntArray.foreach(println)
0
0
0

scala> myIntArray(0)=10

scala> myIntArray(1)=20

scala> myIntArray(2)=30

scala> myIntArray.foreach(println)
10
20
30


scala> def addOne(x:Int): Int = {
     | x+1
     | }
addOne: (x: Int)Int

scala> addOne(5)
res6: Int = 6

scala> addOne(50)
res7: Int = 51


 scala> def addTwo(x:Int):Int = {
     | return x + 2;
     | }
addTwo: (x: Int)Int

scala> addTwo(5);
res8: Int = 7

scala> (1 to 10).foreach(x => println(addTwo(x)))
3
4
5
6
7
8
9
10
11
12


scala> def max(x:Int, y:Int): Int = {
     | if (x > y) x else y
     | }
max: (x: Int, y: Int)Int

scala> max(5,6)
res22: Int = 6


scala> val myArray = Array("Zara","Lara","Sara")
myArray: Array[String] = Array(Zara, Lara, Sara)

scala> var i = 0
i: Int = 0

scala> while (i < myArray.length) {
     | println(myArray(i))
     | i += 1
     | }
Zara
Lara
Sara

scala> myArray.foreach(arg => println(arg))
Zara
Lara
Sara

scala> myArray.foreach(println(_))
Zara
Lara
Sara

scala> myArray.foreach(println)
Zara
Lara
Sara



scala> for(arg <- myArray)
     | println(arg)
Zara
Lara
Sara


$ cat hello.sc
object hello{
def main(args:Array[String]):Unit = {
println("Hello : " + args(0) + " !")
}
}

scala> :load /home/hadoop/Desktop/ScalaTraining/hello.sc
Loading /home/hadoop/Desktop/ScalaTraining/hello.sc...
defined object hello

scala> hello.main(Array("Sara"))
Hello : Sara !







scala> val word = "India!"
word: String = India!

scala> var n = 5
n: Int = 5

scala> var i = 0
i: Int = 0

scala> while (i < n){
     | println(word)
     | i += 1
     | }
India!
India!
India!
India!
India!

scala> (1 to n).foreach(x => println(word + " #" + x))
India! #1
India! #2
India! #3
India! #4
India! #5





$ cat Hello.scala
object Hello extends App{
println("Hello, World!")
}

scala> :load /home/hadoop/Desktop/ScalaTraining/Hello.scala
Loading /home/hadoop/Desktop/ScalaTraining/Hello.scala...
defined object Hello

scala> Hello.main(null)
Hello, World!



scala> 1 to n
res41: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5)

scala> (1).to(5)
res42: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5)



scala> val greetStrings : Array[String] = new Array[String](3)
greetStrings: Array[String] = Array(null, null, null)

scala> greetStrings(0) = "Hello"

scala> greetStrings(1) = ", "

scala> greetStrings(2) = " world! \n"

scala> greetStrings.apply(0)
res47: String = Hello

scala> def printArgs(args:Array[String]):Unit = {
     | var i = 0
     | while (i < args.length) {
     | println(args(i))
     | i += 1
     | }
     | }
printArgs: (args: Array[String])Unit

scala> printArgs(greetStrings)
Hello
,
 world!


scala> printArgs(Array("I","Love","India"))
I
Love
India


scala> def printArgs(args:Array[String]): Unit = {
     | for (arg < args)
<console>:2: error: '<-' expected but ')' found.
for (arg < args)
               ^

scala> def printArgs(args:Array[String]): Unit = {
     | for (arg <- args)
     | println(arg)
     | }
printArgs: (args: Array[String])Unit

scala> printArgs(greetStrings)
Hello
,
 world!


scala> printArgs(Array("I","Love","India"))
I
Love
India


scala> Array("Siva "," Lara", " Mala").mkString("\t")
res56: String = Siva Lara Mala

scala> println(Array("Siva "," Lara", " Mala").mkString("\t"))
Siva Lara Mala

scala> println(Array(" Siva "," Lara", " Mala").mkString("\n"))
 Siva
 Lara
 Mala

scala> println(Array(" Siva "," Lara", " Mala").mkString(":"))
 Siva : Lara: Mala


scala> def formatArgs(args:Array[String]):String = return args.mkString("\n")
formatArgs: (args: Array[String])String


scala> formatArgs(Array("sare","gare"))
res63: String =
sare
gare

scala> formatArgs(Array("sare","gare")).foreach(println)
s
a
r
e


g
a
r
e


scala> myArray
res65: Array[String] = Array(Zara, Lara, Sara)

scala> formatArgs(myArray)
res66: String =
Zara
Lara
Sara


scala> def formatArgs(args:Array[String]):String = return args.mkString(":")
formatArgs: (args: Array[String])String

scala> formatArgs(myArray)
res67: String = Zara:Lara:Sara


scala> val res = formatArgs(Array("zero","one","two"))
res: String = zero:one:two

scala> assert(res == "zero:one:two")

scala> val numNames = Array("zero","one","two")
numNames: Array[String] = Array(zero, one, two)

scala> val numNames = Array.apply("zero","one","two")
numNames: Array[String] = Array(zero, one, two)

scala> numNames.exists(s => s.contains("z"))
res72: Boolean = true



scala> val oneTwo  = List(1,2,3)
oneTwo: List[Int] = List(1, 2, 3)

scala> val threeFour = List(3,4)
threeFour: List[Int] = List(3, 4)

scala> val oneTwoThreeFour  = oneTwo ::: threeFour
oneTwoThreeFour: List[Int] = List(1, 2, 3, 3, 4)


scala> val twoThree = List(2,3)
twoThree: List[Int] = List(2, 3)


scala> val oneTwoThree = 1 :: twoThree
oneTwoThree: List[Int] = List(1, 2, 3)



scala> val thrill = "will " :: "fill " :: "until " :: Nil
thrill: List[String] = List("will ", "fill ", "until ")

scala> thrill.head
res75: String = "will "

scala> thrill.length
res76: Int = 3

scala> thrill.last
res77: String = "until "


scala> val len = thrill.length
len: Int = 3

scala> thrill(len-1)
res78: String = "until "


scala> thrill
res85: List[String] = List("will ", "fill ", "until ")

scala> thrill.filter(x => x.contains("fill"))
res86: List[String] = List("fill ")

scala> thrill.filterNot(x => x.contains("fill"))
res87: List[String] = List("will ", "until ")

Partially applied functions:
-----------------------------
scala> def origFunc(a:Int, b:Int) = a+b
origFunc: (a: Int, b: Int)Int

scala> def modFunc = origFunc(10,_:Int)
modFunc: Int => Int

scala> modFunc(10)
res89: Int = 20

scala> modFunc(103)
res90: Int = 113


named parameters:
-----------------
scala> def speed(distance:Float, time:Float)  = distance / time
speed: (distance: Float, time: Float)Float

scala> speed(time=4.5F, distance=10F)
res91: Float = 2.2222223

scala> speed(distance=100F,time=5.5F)
res92: Float = 18.181818


scala> thrill
res93: List[String] = List("will ", "fill ", "until ")

scala> thrill.sorted
res94: List[String] = List("fill ", "until ", "will ")

scala> thrill.map(s => s + "y")
res95: List[String] = List(will y, fill y, until y)

scala> thrill.map(s => s.trim() + "y")
res96: List[String] = List(willy, filly, untily)

scala> thrill
res99: List[String] = List("will ", "fill ", "until ")

scala> val thrill = List("I","Love","India")
thrill: List[String] = List(I, Love, India)

scala> val thrill  = List("Will","Fill","Until")
thrill: List[String] = List(Will, Fill, Until)

scala> thrill.map(s => s + "y")
res100: List[String] = List(Willy, Filly, Untily)

scala> val thrill = "will" :: "will" :: "until" :: Nil
thrill: List[String] = List(will, will, until)

scala> thrill.map(s => s+ "y")
res0: List[String] = List(willy, willy, untily)

scala> thrill.mkString(",")
res1: String = will,will,until

scala> thrill.mkString("$")
res2: String = will$will$until

scala> thrill.mkString(":")
res3: String = will:will:until

scala> thrill.sortBy(s => s.charAt(0).toLower)
res7: List[String] = List(until, will, will)

scala> val pair = (99,"LuftBallons")
pair: (Int, String) = (99,LuftBallons)

scala> println(pair._1)
99

scala> println(pair._2)
LuftBallons


scala> val largeTuple = ('u','r',"the",1,4,"me")
largeTuple: (Char, Char, String, Int, Int, String) = (u,r,the,1,4,me)



// mutable Set example
scala> import scala.collection.mutable.Set
import scala.collection.mutable.Set

scala> val movieSet = Set("Hitch","Poltergeist")
movieSet: scala.collection.mutable.Set[String] = Set(Poltergeist, Hitch)

scala> movieSet += "Shrek"
res12: movieSet.type = Set(Poltergeist, Shrek, Hitch)

scala> println(movieSet)
Set(Poltergeist, Shrek, Hitch)



//Immutable Set Example
scala> import scala.collection.immutable.Set
import scala.collection.immutable.Set

scala> val movieSet = Set("Hitch","Poltergeist")
movieSet: scala.collection.immutable.Set[String] = Set(Hitch, Poltergeist)

scala> movieSet += "Shrek"
<console>:29: error: value += is not a member of scala.collection.immutable.Set[String]
  Expression does not convert to assignment because receiver is not assignable.
       movieSet += "Shrek"
                ^

scala> println(movieSet)
Set(Hitch, Poltergeist)



scala> import scala.collection.mutable.Map
import scala.collection.mutable.Map

scala> val treasureMap = Map[Int,String]()
treasureMap: scala.collection.mutable.Map[Int,String] = Map()

scala> treasureMap += (1 -> "Go to Island")
res16: treasureMap.type = Map(1 -> Go to Island)

scala> treasureMap += (2 -> "Find Big X on Ground")
res17: treasureMap.type = Map(2 -> Find Big X on Ground, 1 -> Go to Island)

scala> treasureMap += (3 -> "Dig.")
res18: treasureMap.type = Map(2 -> Find Big X on Ground, 1 -> Go to Island, 3 -> Dig.)

scala> println(treasureMap)
Map(2 -> Find Big X on Ground, 1 -> Go to Island, 3 -> Dig.)

scala> treasureMap.foreach(println)
(2,Find Big X on Ground)
(1,Go to Island)
(3,Dig.)


//Immutable Map but val
scala> import scala.collection.immutable.Map
import scala.collection.immutable.Map

scala> val romanNumeral = Map(1 -> "I", 2 -> "II")
romanNumeral: scala.collection.immutable.Map[Int,String] = Map(1 -> I, 2 -> II)

scala> romanNumeral += (3 -> "III")
<console>:32: error: value += is not a member of scala.collection.immutable.Map[Int,String]
  Expression does not convert to assignment because receiver is not assignable.
       romanNumeral += (3 -> "III")
                    ^

//Immutable Map but var
scala> import scala.collection.immutable.Map
import scala.collection.immutable.Map

scala> var romanNumeral = Map(1->"I",2 -> "II")
romanNumeral: scala.collection.immutable.Map[Int,String] = Map(1 -> I, 2 -> II)

scala> romanNumeral += (3 -> "III")

scala> println(romanNumeral)
Map(1 -> I, 2 -> II, 3 -> III)




// Execption handling...
scala> def getContent(filePath:String) = {
     | for (line <- Source.fromFile(filePath).getLines())
     | println(line.length + " " + line)
     | }
getContent: (filePath: String)Unit

scala> getContent("")
java.io.FileNotFoundException:  (No such file or directory)
  at java.io.FileInputStream.open0(Native Method)
  at java.io.FileInputStream.open(FileInputStream.java:195)
  at java.io.FileInputStream.<init>(FileInputStream.java:138)
  at scala.io.Source$.fromFile(Source.scala:91)
  at scala.io.Source$.fromFile(Source.scala:76)
  at scala.io.Source$.fromFile(Source.scala:54)
  at getContent(<console>:37)
  ... 54 elided


scala> getContent("/home/hadoop/Desktop/emp_data.csv")
52 empno,ename,designation,manager,hire_date,sal,deptno
39 7369,SMITH,CLERK,7902,12/17/1980,800,20
42 7499,ALLEN,SALESMAN,7698,2/20/1981,1600,30
41 7521,WARD,SALESMAN,7698,2/22/1981,1250,30
41 7566,TURNER,MANAGER,7839,4/2/1981,2975,20
43 7654,MARTIN,SALESMAN,7698,9/28/1981,1250,30
41 7698,MILLER,MANAGER,7839,5/1/1981,2850,30


//Exception Handling
scala> def getContent(filePath:String) = {
     |    try{
     |        for (line <- Source.fromFile(filePath).getLines())
     |        println(line.length + " " + line)
     |        }
     |     catch{
     |           case e: java.io.FileNotFoundException => println("No file found")
     |     }
     |   }
getContent: (filePath: String)Unit

scala> getContent("")
No file found

scala> getContent("/home/hadoop/Desktop/emp_Nosuchfile.csv")
No file found



//Partial functions
scala> def SalaryCalculator(base:Int, inc:Int, varComp:Int):Int  = base + inc + varComp
SalaryCalculator: (base: Int, inc: Int, varComp: Int)Int

scala> def jrSalCalc = SalaryCalculator(10,_:Int,_:Int)
jrSalCalc: (Int, Int) => Int

scala> def MidSalCalc = SalaryCalculator(20,_:Int, _:Int)
MidSalCalc: (Int, Int) => Int

scala> def SeniorCalc = SalaryCalculator(30, _:Int, _:Int)
SeniorCalc: (Int, Int) => Int

scala> SalaryCalculator(35,10,12)
res0: Int = 57

scala> jrSalCalc(10,12)
res1: Int = 32

scala> MidSalCalc(10,12)
res2: Int = 42

scala> SeniorCalc(10,12)
res3: Int = 52





// named arguments
scala> def speed(distance:Float, time:Float)  = distance / time
speed: (distance: Float, time: Float)Float

scala> def speed(distance:Float=10F, time:Float=2F) = distance / time
speed: (distance: Float, time: Float)Float

scala> speed(time=4F)
res5: Float = 2.5

scala> speed(10,4.5F)
res6: Float = 2.2222223

scala> speed(time=4.5F,distance=10F)
res7: Float = 2.2222223

scala> speed()
res8: Float = 5.0

scala> speed(distance=10F)
res9: Float = 5.0

scala> speed(distance=10F,time=4.5F)
res10: Float = 2.2222223




// Pattern matching example
scala> val myStr = "Hello"
myStr: String = Hello

scala> val result = myStr match {
       case "Hello" => println("Greeting!")
       case "Hi" => println("Short Greeting!")
       case _ => println("Unknown")
       }
Greeting!
result: Unit = ()


scala> val myStr = "Hello"
myStr: String = Hello

scala> val result = myStr match {
     | case "Hello" => println("Greeting!")
     | case "Hi" => println("Short Greeting!")
     | case _ => println("Unknown")
     | }
Greeting!
result: Unit = ()

scala> def UsingMatchPatternExa(myStr:String) = {
     |  val result = myStr match {
     |        case "Hello" => println("Greeting!")
     |        case "Hi" => println("Short Greeting!")
     |        case _ => println("Unknown")
     |        }
     | }
UsingMatchPatternExa: (myStr: String)Unit

scala> UsingMatchPatternExa("Hello")
Greeting!

scala> UsingMatchPatternExa("Hi")
Short Greeting!

scala> UsingMatchPatternExa("Hey")
Unknown




scala> var a = 0
a: Int = 0

scala> var b = 0
b: Int = 0

scala> for (a <- 1 to 3; b <- 1 until 3) {
     | println("Value of a : " + a + ", b: " + b)
     | }
Value of a : 1, b: 1
Value of a : 1, b: 2
Value of a : 2, b: 1
Value of a : 2, b: 2
Value of a : 3, b: 1
Value of a : 3, b: 2




scala> val numList = List(1 to 10)
numList: List[scala.collection.immutable.Range.Inclusive] = List(Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))

scala> for (a <- numList) {
     | println("Value of a : " + a)
     | }
Value of a : Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)




scala> val numList = List(1 to 10)
numList: List[scala.collection.immutable.Range.Inclusive] = List(Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))

scala> for (a <- numList) {
     | println("Value of a : " + a)
     | }
Value of a : Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


scala> val numList = List(1,2,3,4,5,6)
numList: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val retVal = numList.filter( x => x != 3 && x < 6)
retVal: List[Int] = List(1, 2, 4, 5)

scala> val retVal = for(a <- numList if a != 3; if a < 6) yield a
retVal: List[Int] = List(1, 2, 4, 5)




/// variable arguments
scala> def printStrings(args: String*) = {
     | var i:Int = 0
     | for (arg <- args) {
     | println("Arg Value[" + i + "] = " + arg)
     | i = i + 1;
     | }
     | }
printStrings: (args: String*)Unit

scala> printStrings("Hadoop","Scala","Spark","Oozie","Hive","Hbase")
Arg Value[0] = Hadoop
Arg Value[1] = Scala
Arg Value[2] = Spark
Arg Value[3] = Oozie
Arg Value[4] = Hive
Arg Value[5] = Hbase

scala> printStrings("Arun","Prasad")
Arg Value[0] = Arun
Arg Value[1] = Prasad


// Anonymous functions
scala> var inc = (x: Int) => x + 1
inc: Int => Int = <function1>

scala>

scala> inc(7)
res3: Int = 8

scala> inc(7) - 10
res4: Int = -2

scala> var mul = (x:Int, y:Int) => x * y
mul: (Int, Int) => Int = <function2>

scala> mul(5,7)
res5: Int = 35

scala> var userDir = () => { System.getProperty("user.dir")}
userDir: () => String = <function0>

scala> println(userDir())
/home/hadoop




//Higher Order function0
//(x:A) --> x's type is A. So A is a type
//A means Any Type

scala> def apply(f:Int => String, v:Int) = f(v)
apply: (f: Int => String, v: Int)String

scala> def CurlyIt[A] (x: A) = "{" + x.toString() + "}"
CurlyIt: [A](x: A)String

scala> def SquareIt[A](x: A) = "[" + x.toString() + "]"
SquareIt: [A](x: A)String

scala> SquareIt(10)
res9: String = [10]

scala> CurlyIt(20)
res10: String = {20}

scala> println(apply(SquareIt,10))
[10]

scala> println(apply(CurlyIt,20))
{20}

scala> SquareIt("sare")
res14: String = [sare]

scala> CurlyIt("sare")
res15: String = {sare}






//Higher Order function
scala> def apply(f:String => String, v:String) = f(v)
apply: (f: String => String, v: String)String

scala> def CurlyIt[A] (x: A) = "{" + x.toString() + "}"
CurlyIt: [A](x: A)String

scala> def SquareIt[A](x: A) = "[" + x.toString() + "]"
SquareIt: [A](x: A)String

scala> println(apply(SquareIt,"--Sare--"))
[--Sare--]

scala> println(apply(CurlyIt,"--Sare--"))
{--Sare--}




// Partially applied function

scala> def adder (m:Int, n:Int, p:Int) = m + n + p
adder: (m: Int, n: Int, p: Int)Int

scala> val add2 = adder(2, _:Int, _:Int)
add2: (Int, Int) => Int = <function2>

scala> add2(10,2)
res19: Int = 14

scala> add2(1,1)
res20: Int = 4

scala> def adder(m:Int)(n:Int)(p:Int) = m + n + p
adder: (m: Int)(n: Int)(p: Int)Int

scala> adder(2)(3)(4)
res21: Int = 9

/*
Scala collections can be mutable and immutable
Mutable collections can be updated or extended in place.
Immutable collections never change: Additions, Removals, Updates operators return a new collection and leave the old collection unchanged
*/

Arrays :
Fixed size sequential collection of elements of the same type
Lists :
Sequential collection of elements of the same type
Immutable
Lists represents a Linked List
Sets
Maps
Tuples
Fixed number of items of different types together
Immutable
Option



scala> val myList = List(1,2,3)
myList: List[Int] = List(1, 2, 3)

scala> val ourList = 1 :: 2 :: 3 :: Nil
ourList: List[Int] = List(1, 2, 3)

// Add Leader -- Adding an element to the head of a list
scala> val AddLeaderList = 0 :: myList
AddLeaderList: List[Int] = List(0, 1, 2, 3)

//Add Follower -- Adding an element to the tail of a list
scala> val AddFollowerList = ourList :+ 4
AddFollowerList: List[Int] = List(1, 2, 3, 4)

// List concatenation
scala> val t3 = myList ::: ourList
t3: List[Int] = List(1, 2, 3, 1, 2, 3)

// remove duplicates
scala> t3.distinct
res23: List[Int] = List(1, 2, 3)



scala> val Left = List("Arun","Banu","Chitra")
Left: List[String] = List(Arun, Banu, Chitra)

scala> val Right = List("Sara","Tanu","Umesh")
Right: List[String] = List(Sara, Tanu, Umesh)

scala> Left ::: Right
res27: List[String] = List(Arun, Banu, Chitra, Sara, Tanu, Umesh)

//List concatenation
scala> Right ::: Left
res28: List[String] = List(Sara, Tanu, Umesh, Arun, Banu, Chitra)


scala> Left.union(Right)
res30: List[String] = List(Arun, Banu, Chitra, Sara, Tanu, Umesh)

scala> Right.union(Left)
res31: List[String] = List(Sara, Tanu, Umesh, Arun, Banu, Chitra)


scala> val t = (10,"Twenty",30,"Fourty",true,3.5F)
t: (Int, String, Int, String, Boolean, Float) = (10,Twenty,30,Fourty,true,3.5)

scala> t._1
res35: Int = 10




//Sets
scala> val numberSet = Set(0,1,2,3,4,5,6,7,8, 9,10)
numberSet: scala.collection.immutable.Set[Int] = Set(0, 5, 10, 1, 6, 9, 2, 7, 3, 8, 4)


scala> numberSet.filter ( _ % 2 == 0)
res39: scala.collection.immutable.Set[Int] = Set(0, 10, 6, 2, 8, 4)

scala> numberSet.filter ( _ % 2 != 0)
res40: scala.collection.immutable.Set[Int] = Set(5, 1, 9, 7, 3)

// Set doesn't keep duplicates. It keeps unique only
scala> val noDuplicates = Set(1,2,3,2,3,1,2,3,4,3,2,1,2,34,5,4,3,2,1)
noDuplicates: scala.collection.immutable.Set[Int] = Set(5, 1, 2, 34, 3, 4)

scala> noDuplicates.foreach(println)
5
1
2
34
3
4







scala> def toInt(in : String) : Option[Int] = {
     | try{
     | Some(Integer.parseInt(in.trim))
     | } catch {
     | case e:NumberFormatException => None
     | }
     | }
toInt: (in: String)Option[Int]

scala> val someString = "123"
someString: String = 123

scala> toInt(someString) match {
     | case Some(i) => println(i)
     | case None => println(" Failed ")
     | }
123

scala> val someString = "sare"
someString: String = sare

scala> toInt(someString) match {
     | case Some(i) => println(i)
     | case None => println(" Failed ")
     | }
 Failed

scala> toInt(someString).getOrElse(-1)
res44: Int = -1

scala> toInt("10101").getOrElse(-1)
res45: Int = 10101

scala> toInt("Aries").getOrElse("Format Error")
res47: Any = Format Error





scala> val countryCapitals = Map ("India"->"Delhi","Afhanistan"->"Kabul","Egypt"->"Cairo")
countryCapitals: scala.collection.immutable.Map[String,String] = Map(India -> Delhi, Afhanistan -> Kabul, Egypt -> Cairo)

scala> countryCapitals.get("Egypt")
res48: Option[String] = Some(Cairo)

scala> countryCapitals.get("Egypt").isDefined
res49: Boolean = true

scala> countryCapitals.get("China").getOrElse("Not Defined in our list")
res51: String = Not Defined in our list





// zip example
scala> val numbers = List(1,2,3,4,5)
numbers: List[Int] = List(1, 2, 3, 4, 5)

scala> val chars  = List("a","b","c","d","e")
chars: List[String] = List(a, b, c, d, e)

scala> numbers.zip(chars)
res52: List[(Int, String)] = List((1,a), (2,b), (3,c), (4,d), (5,e))

scala> chars.zip(numbers)
res54: List[(String, Int)] = List((a,1), (b,2), (c,3), (d,4), (e,5))



// zipWithIndex example
scala> val myList = List("Arun","Prasad","Kalai","Harini","Nila","Silva")
myList: List[String] = List(Arun, Prasad, Kalai, Harini, Nila, Silva)

scala> myList.zipWithIndex
res55: List[(String, Int)] = List((Arun,0), (Prasad,1), (Kalai,2), (Harini,3), (Nila,4), (Silva,5))

scala> myList.zipWithIndex.foreach(println)
(Arun,0)
(Prasad,1)
(Kalai,2)
(Harini,3)
(Nila,4)
(Silva,5)


scala> myList.zip(List(1,2,3,4,5,6))
res57: List[(String, Int)] = List((Arun,1), (Prasad,2), (Kalai,3), (Harini,4), (Nila,5), (Silva,6))

scala> List(1,2,3,4,5,6).zip(myList)
res58: List[(Int, String)] = List((1,Arun), (2,Prasad), (3,Kalai), (4,Harini), (5,Nila), (6,Silva))



//fold left
scala> val numbers = List(1,2,3,4,5,6,7,8,9,10)
numbers: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// with Seed value as 0
scala> numbers.foldLeft(0) { (m,n) => println("m : " + m + " n: " + n ); m + n}
m : 0 n: 1
m : 1 n: 2
m : 3 n: 3
m : 6 n: 4
m : 10 n: 5
m : 15 n: 6
m : 21 n: 7
m : 28 n: 8
m : 36 n: 9
m : 45 n: 10
res59: Int = 55

// with Seed value as 5
scala> numbers.foldLeft(5) { (m,n) => println("m : " + m + " n: " + n ); m + n}
m : 5 n: 1
m : 6 n: 2
m : 8 n: 3
m : 11 n: 4
m : 15 n: 5
m : 20 n: 6
m : 26 n: 7
m : 33 n: 8
m : 41 n: 9
m : 50 n: 10
res60: Int = 60



// fold Right

scala> val numbers = List(1,2,3,4,5,6,7,8,9,10)
numbers: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> numbers.foldRight(0) { (m,n) => println("m : " + m + " n: " + n ); m + n}
m : 10 n: 0
m : 9 n: 10
m : 8 n: 19
m : 7 n: 27
m : 6 n: 34
m : 5 n: 40
m : 4 n: 45
m : 3 n: 49
m : 2 n: 52
m : 1 n: 54
res61: Int = 55


//with seed value as 5
scala> numbers.foldRight(5) { (m,n) => println("m : " + m + " n: " + n ); m + n}
m : 10 n: 5
m : 9 n: 15
m : 8 n: 24
m : 7 n: 32
m : 6 n: 39
m : 5 n: 45
m : 4 n: 50
m : 3 n: 54
m : 2 n: 57
m : 1 n: 59
res62: Int = 60


//flatten
//It collapses one level of nested structure

cala> List(List(1,2,3),List(3,4,5),List(5,6,7)).flatten
res63: List[Int] = List(1, 2, 3, 3, 4, 5, 5, 6, 7)

scala> List(List(1,2,3),List(3,4,5),List(5,6,7)).flatten.toSet
res64: scala.collection.immutable.Set[Int] = Set(5, 1, 6, 2, 7, 3, 4)

scala> List(List(1,2,3),List(3,4,5),List(5,6,7)).flatten.distinct
res65: List[Int] = List(1, 2, 3, 4, 5, 6, 7)

class Calculator(brand:String){
  val color: String = if (brand =="TI"){     
                "blue"     
                } else if (brand =="HP"){     
                "black"     
               } else {     
                "white"     
           }
  def add(m:Int, n:Int) : Int = m + n
 def displayBrandAndColor()={
println("Brand : " + brand + ", Color : " + color)     
 }defined class Calculator
}
scala>

scala> val calc = new Calculator("HP")
calc: Calculator = Calculator@51d6e3ae

scala> println(calc.color)
black





cala> class Calculator(brand:String){
     |   val color: String = if (brand =="TI"){
     |                 "blue"
     |                 } else if (brand =="HP"){
     |                 "black"
     |                } else {
     |                 "white"
     |            }
     |   def add(m:Int, n:Int) : Int = m + n
     |  def displayBrandAndColor()={
     |   println("Brand : " + brand + ", Color : " + color)
     |  }
     | }
defined class Calculator

scala> val c = new Calculator("HP")
c: Calculator = Calculator@67885e1f

scala> c.displayBrandAndColor
Brand : HP, Color : black

Saturday, 16 March 2019

Json Input To Kafka Broker to Spark Streaming to MySQL using KafkaProducer, kafkaUtils.CreateStream

//Json To Kafka Broker to Spark Streaming to MySQL
// Here we read a json input file from file system and put it into Kafka Broker
// Spark Streaming fetch messages from Kafka Broker and write it into MySQL table

input file:
nameList.json:
--------------
{"id":992,"name":"Herman","city":"Iles","country":"Colombia","Skills":"CVE"},
{"id":993,"name":"Burton","city":"Santo Tomas","country":"Philippines","Skills":"VMware vSphere"},
{"id":994,"name":"Correna","city":"Shirgjan","country":"Albania","Skills":"Wyse"},
{"id":995,"name":"Cathi","city":"Dorūd","country":"Iran","Skills":"SSCP"},
{"id":996,"name":"Lena","city":"Al Judayrah","country":"Palestinian Territory","Skills":"Commercial Kitchen Design"},
{"id":997,"name":"Madalena","city":"Livadiya","country":"Ukraine","Skills":"Software Development"},
{"id":998,"name":"Jo-anne","city":"Khatsyezhyna","country":"Belarus","Skills":"TPD"},
{"id":999,"name":"Georgi","city":"Pasuruan","country":"Indonesia","Skills":"Project Engineering"},
{"id":1000,"name":"Scott","city":"Gyumri","country":"Armenia","Skills":"RHEV"}



start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties

hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic jsonTopic --partitions 1 --replication-factor 1 --zookeeper localhost:2182

//view the topics available in Kafka Broker
hadoop@hadoop:/usr/local/kafka$  bin/kafka-topics.sh --list --zookeeper localhost:2182
jsonTopic



// create a database and table in MySQL:

 $ mysql -uroot -pcloudera

Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 4
Server version: 5.7.25-0ubuntu0.18.10.2 (Ubuntu)

Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> create database KafkaDB;
Query OK, 1 row affected (0.05 sec)

mysql> use KafkaDB;
Database changed

// create a table
mysql> create table jsonTable (id int, name varchar(50), city varchar(50), country varchar(50), Skills varchar(50));
Query OK, 0 rows affected (0.20 sec)





//Produce reads json file and publish them in kafka topic
Producer Programming in Scala:
-------------------------------
JsonProducer.scala:
-------------------------
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}

object JsonProducer {
  def main(args:Array[String]):Unit = {
    val props = new Properties()

    props.put("bootstrap.servers","localhost:9092")
    props.put("acks","all")
    props.put("client.id","ProducerApp")
    props.put("retries","4")
    props.put("batch.size","32768")
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    val topic = "jsonTopic"
    val producer = new KafkaProducer[String,String](props)
    val file = scala.io.Source.fromFile("/home/cloudera/Desktop/vow/nameList.json")

    for (line <- file.getLines()) {
      val msg = new ProducerRecord[String,String](topic,line)
      producer.send(msg)
    }
    producer.close()
  }
}


[cloudera@quickstart kafka]$ spark-shell
scala> sc.stop()
scala> :load JsonProducer.scala
scala> JsonProducer.main(null)




hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic jsonTopic --bootstrap-server localhost:9092 --from-beginning

{"id":1,"name":"Sharline","city":"Uppsala","country":"Sweden","Skills":"Eyelash Extensions"},
{"id":2,"name":"Marris","city":"São Domingos","country":"Cape Verde","Skills":"VMI Programs"},
{"id":3,"name":"Gregg","city":"Qaxbaş","country":"Azerbaijan","Skills":"Historical Research"},
{"id":4,"name":"Christye","city":"Guarapari","country":"Brazil","Skills":"Army"},
{"id":5,"name":"Modesta","city":"Paltamo","country":"Finland","Skills":"Avaya Technologies"},



Kafka2MySQLStreaming.scala:
-----------------------------
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
import java.util.Properties
import org.apache.spark.sql.{SQLContext,SaveMode}
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}

object Kafka2MySQLStreaming{
def main(args:Array[String]):Unit ={

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("SparkStreamingJson").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlc = new SQLContext(sc)
val batchInterval = 20
val zk = "localhost:2182"
val consumerGroupID = "jsonGroup"
val topic = "jsonTopic"
val partition = 1
val perTopicPartitions = Map(topic -> partition)
val ssc = new StreamingContext(sc,Seconds(batchInterval))

val KafkaData = KafkaUtils.createStream(ssc,zk,consumerGroupID,perTopicPartitions)
val ks = KafkaData.map (x => x._2)
ks.foreachRDD { x =>
val df = sqlc.read.json(x)

val props = new Properties()
props.put("driver","com.mysql.jdbc.Driver")
props.put("user","root")
props.put("password","cloudera")

df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/KafkaDB","jsonTable",props)
df.count()
}
ssc.start()
ssc.awaitTermination()
}
}

[cloudera@quickstart kafka]$ spark-shell
scala> sc.stop()
scala> :load Kafka2MySQLStreaming.scala
scala> Kafka2MySQLStreaming.main(null)



// MySQL result:
--------------
[cloudera@quickstart kafka]$ mysql -uroot -pcloudera
mysql> use KafkaDB;
mysql> select * from jsonTable;
+------+------------+--------------------------+-----------------------+-------------------------------+
| id   | name       | city                     | country               | Skills                        |
+------+------------+--------------------------+-----------------------+-------------------------------+
|    1 | Sharline   | Uppsala                  | Sweden                | Eyelash Extensions            |
|    2 | Marris     | São Domingos             | Cape Verde            | VMI Programs                  |
|    3 | Gregg      | Qaxbaş                   | Azerbaijan            | Historical Research           |
|    4 | Christye   | Guarapari                | Brazil                | Army                          |
|    5 | Modesta    | Paltamo                  | Finland               | Avaya Technologies            |

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...