Thursday, 28 February 2019

Spark with MongoDB - Read / Write Operations

$ mongo // start CLI
MongoDB shell version v3.6.3
connecting to: mongodb://127.0.0.1:27017
MongoDB server version: 3.6.3

// Adding 2 records in person

> db.person.insert([{
   "id":100,
           "name":"Sankara",
           "salary":3000,
"city":"Pallathur"
         
        },
        {
           "id":101,
           "name":"Rasee",
           "salary":3100,
"city":"Kanadukathan"
        }])
BulkWriteResult({
"writeErrors" : [ ],
"writeConcernErrors" : [ ],
"nInserted" : 2,
"nUpserted" : 0,
"nMatched" : 0,
"nModified" : 0,
"nRemoved" : 0,
"upserted" : [ ]
})

// select
> db.person.find()
{ "_id" : ObjectId("5c780db0d44e3fc2ebd26d43"), "id" : 100, "name" : "Sankara", "salary" : 3000, "city" : "Pallathur" }
{ "_id" : ObjectId("5c780db0d44e3fc2ebd26d44"), "id" : 101, "name" : "Rasee", "salary" : 3100, "city" : "Kanadukathan" }

// beautify
> db.person.find().pretty()
{
"_id" : ObjectId("5c780db0d44e3fc2ebd26d43"),
"id" : 100,
"name" : "Sankara",
"salary" : 3000,
"city" : "Pallathur"
}
{
"_id" : ObjectId("5c780db0d44e3fc2ebd26d44"),
"id" : 101,
"name" : "Rasee",
"salary" : 3100,
"city" : "Kanadukathan"
}

//search with condition
> db.person.find({"name":"Sankara"})
{ "_id" : ObjectId("5c780db0d44e3fc2ebd26d43"), "id" : 100, "name" : "Sankara", "salary" : 3000, "city" : "Pallathur" }


// Spark starts here
spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0
 
import com.mongodb.spark._
import com.mongodb.spark.config.ReadConfig
import com.mongodb.spark.sql._
 import org.bson.Document
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("MongoPerson").master("local[*]").getOrCreate()
val readConfig = ReadConfig(Map("uri" -> "mongodb://127.0.0.1/", "database" -> "test", "collection" -> "person"))
val df = spark.read.mongo(readConfig)  // test.person content populated here


scala> df.show
+--------------------+------------+-----+-------+------+
|                 _id|        city|   id|   name|salary|
+--------------------+------------+-----+-------+------+
|[5c780db0d44e3fc2...|   Pallathur|100.0|Sankara|3000.0|
|[5c780db0d44e3fc2...|Kanadukathan|101.0|  Rasee|3100.0|
+--------------------+------------+-----+-------+------+


scala> df.select("city","id","name","salary").show
+------------+-----+-------+------+
|        city|   id|   name|salary|
+------------+-----+-------+------+
|   Pallathur|100.0|Sankara|3000.0|
|Kanadukathan|101.0|  Rasee|3100.0|
+------------+-----+-------+------+

scala> df.createOrReplaceTempView("tblPerson") // SparkSQL

scala> spark.sql("select * from tblPerson").show
+--------------------+------------+-----+-------+------+
|                 _id|        city|   id|   name|salary|
+--------------------+------------+-----+-------+------+
|[5c780db0d44e3fc2...|   Pallathur|100.0|Sankara|3000.0|
|[5c780db0d44e3fc2...|Kanadukathan|101.0|  Rasee|3100.0|
+--------------------+------------+-----+-------+------+

scala> spark.sql("select id,name,city,salary from tblPerson").filter("salary==3000").show
+-----+-------+---------+------+
|   id|   name|     city|salary|
+-----+-------+---------+------+
|100.0|Sankara|Pallathur|3000.0|
+-----+-------+---------+------+


// Making In-memory collection in Spark
scala> case class Person(id:Int, name:String, city:String, salary:Int)
defined class Person

scala> val ob1 = new Person(500,"Sathya","Bangalore",3000)
ob1: Person = Person(500,Sathya,Bangalore,3000)

scala> val ob2 = new Person(501,"Lakshmi","Chennai",3100)
ob2: Person = Person(501,Lakshmi,Chennai,3100)

scala> val ob3 = new Person(502,"Kalai","Perai",3200)
ob3: Person = Person(502,Kalai,Perai,3200)

scala> val r1 = sc.parallelize(List(ob1,ob2,ob3))
r1: org.apache.spark.rdd.RDD[Person] = ParallelCollectionRDD[40] at parallelize at <console>:40

scala> r1.collect.foreach(println)
Person(500,Sathya,Bangalore,3000)
Person(501,Lakshmi,Chennai,3100)
Person(502,Kalai,Perai,3200)

// Converting inmemory collection into Dataframe
scala> val dfCollection = r1.toDF
dfCollection: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

scala> dfCollection.printSchema
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: integer (nullable = false)


scala> dfCollection.show
+---+-------+---------+------+
| id|   name|     city|salary|
+---+-------+---------+------+
|500| Sathya|Bangalore|  3000|
|501|Lakshmi|  Chennai|  3100|
|502|  Kalai|    Perai|  3200|
+---+-------+---------+------+



// Going to write in-memory collection into Mongo
scala> import com.mongodb.spark.config.{ReadConfig, WriteConfig}
import com.mongodb.spark.config.{ReadConfig, WriteConfig}

// Writer Configuration : test.person
scala> val writeConfig = WriteConfig(Map("uri" -> "mongodb://127.0.0.1/test.person"))
writeConfig: com.mongodb.spark.config.WriteConfig.Self = WriteConfig(test,person,Some(mongodb://127.0.0.1/test.person),true,512,15,WriteConcernConfig(None,None,None,None),None,false,true)

scala> MongoSpark.save(dfCollection.write.mode("append"), writeConfig)


// Back to Mongo
db.person.find().pretty()
{
"_id" : ObjectId("5c7815d1d44e3fc2ebd26d47"),
"id" : 100,
"name" : "Sankara",
"salary" : 3000,
"city" : "Pallathur"
}
{
"_id" : ObjectId("5c7815d1d44e3fc2ebd26d48"),
"id" : 101,
"name" : "Rasee",
"salary" : 3100,
"city" : "Kanadukathan"
}
{
"_id" : ObjectId("5c7816f165e80e7efcae207d"),
"id" : 500,
"name" : "Sathya",
"city" : "Bangalore",
"salary" : 3000
}
{
"_id" : ObjectId("5c7816f165e80e7efcae207e"),
"id" : 501,
"name" : "Lakshmi",
"city" : "Chennai",
"salary" : 3100
}
{
"_id" : ObjectId("5c7816f165e80e7efcae207f"),
"id" : 502,
"name" : "Kalai",
"city" : "Perai",
"salary" : 3200
}

// Back to Spark
scala> df.show
+--------------------+------------+---+-------+------+
|                 _id|        city| id|   name|salary|
+--------------------+------------+---+-------+------+
|[5c7815d1d44e3fc2...|   Pallathur|100|Sankara|  3000|
|[5c7815d1d44e3fc2...|Kanadukathan|101|  Rasee|  3100|
|[5c7816f165e80e7e...|   Bangalore|500| Sathya|  3000|
|[5c7816f165e80e7e...|     Chennai|501|Lakshmi|  3100|
|[5c7816f165e80e7e...|       Perai|502|  Kalai|  3200|
+--------------------+------------+---+-------+------+

// Dropping in Mongo
>db.person.drop()
true

// Dataframe is empty
scala> df.show
+---+----+---+----+------+
|_id|city| id|name|salary|
+---+----+---+----+------+
+---+----+---+----+------+

Spark with MongoDB integration


<!-- https://mvnrepository.com/artifact/org.mongodb.spark/mongo-spark-connector -->
<dependency>
    <groupId>org.mongodb.spark</groupId>
    <artifactId>mongo-spark-connector_2.11</artifactId>
    <version>2.4.0</version>
</dependency>



spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0


scala> import com.mongodb.spark._
import com.mongodb.spark._

scala> import com.mongodb.spark.config.ReadConfig
import com.mongodb.spark.config.ReadConfig

scala> import com.mongodb.spark.sql._
import com.mongodb.spark.sql._

scala> import org.bson.Document
import org.bson.Document

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> import org.apache.spark.sql.functions.{max, min}
import org.apache.spark.sql.functions.{max, min}

scala> val spark = SparkSession.builder().appName("MongoPlayers").master("local[*]").getOrCreate()

scala> val readConfig = ReadConfig(Map("uri" -> "mongodb://127.0.0.1/", "database" -> "test", "collection" -> "players"))

scala>  val df = spark.read.mongo(readConfig)

scala>  df.printSchema()
root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- age: double (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- birthplace: string (nullable = true)
 |-- height: string (nullable = true)
 |-- id: double (nullable = true)
 |-- imageUrl: string (nullable = true)
 |-- name: string (nullable = true)
 |-- number: double (nullable = true)
 |-- position: string (nullable = true)
 |-- twitterHandle: string (nullable = true)
 |-- twitterURL: string (nullable = true)
 |-- weight: double (nullable = true)


scala> df.show()

Playing with MongoDB

$ mongo // start CLI
MongoDB shell version v3.6.3
connecting to: mongodb://127.0.0.1:27017
MongoDB server version: 3.6.3

// create a json object
 post = { "position":"Right Wing", "id": 84, "weight":200, "height":"6' 0\"", "imageurl" : "url", "birthplace" : "Seria,BRN", "age":37, "name":"Craig Adams", "birthdate":"April 26,1977", "number":27 }

// insert single
db.players.insert(post); 

//insert multiple
db.players.insert([{ 
         "position":"Right Wing",
         "id":8465166,
         "weight":200,
         "height":"6' 0\"",
         "imageUrl":"http://1.cdn.nhle.com/photos/mugs/8465166.jpg",
         "birthplace":"Seria, BRN",
         "age":37,
         "name":"Craig Adams",
         "birthdate":"April 26, 1977",
         "number":27
      },
      { 
         "position":"Right Wing",
         "id":8475761,
         "weight":195,
         "height":"6' 2\"",
         "imageUrl":"http://1.cdn.nhle.com/photos/mugs/8475761.jpg",
         "birthplace":"Gardena, CA, USA",
         "age":23,
         "name":"Beau Bennett",
         "birthdate":"November 27, 1991",
         "number":19
      }])

BulkWriteResult({
"writeErrors" : [ ],
"writeConcernErrors" : [ ],
"nInserted" : 2,
"nUpserted" : 0,
"nMatched" : 0,
"nModified" : 0,
"nRemoved" : 0,
"upserted" : [ ]
})

// select all
db.players.find()
{ "_id" : ObjectId("5c77b1dbd44e3fc2ebd26d2a"), "position" : "Right Wing", "id" : 8465166, "weight" : 200, "height" : "6' 0\"", "imageUrl" : "http://1.cdn.nhle.com/photos/mugs/8465166.jpg", "birthplace" : "Seria, BRN", "age" : 37, "name" : "Craig Adams", "birthdate" : "April 26, 1977", "number" : 27 }
{ "_id" : ObjectId("5c77b1dbd44e3fc2ebd26d2b"), "position" : "Right Wing", "id" : 8475761, "weight" : 195, "height" : "6' 2\"", "imageUrl" : "http://1.cdn.nhle.com/photos/mugs/8475761.jpg", "birthplace" : "Gardena, CA, USA", "age" : 23, "name" : "Beau Bennett", "birthdate" : "November 27, 1991", "number" : 19 }
{ "_id" : ObjectId("5c77b1dbd44e3fc2ebd26d2c"), "position" : "Left Wing", "id" : 8471260, "weight" : 202, "height" : "6' 1\"", "imageUrl" : "http://3.cdn.nhle.com/photos/mugs/8471260.jpg", "birthplace" : "Meadow Lake, SK, CAN", "age" : 29, "name" : "Blake Comeau", "birthdate" : "February 18, 1986", "number" : 17 }

// select with condition
>  db.players.find(
... {"position":"Goalie"}
... )
{ "_id" : ObjectId("5c77b1dbd44e3fc2ebd26d41"), "position" : "Goalie", "id" : 8470594, "weight" : 180, "height" : "6' 2\"", "imageUrl" : "http://3.cdn.nhle.com/photos/mugs/8470594.jpg", "birthplace" : "Sorel, QC, CAN", "age" : 30, "name" : "Marc-Andre Fleury", "birthdate" : "November 28, 1984", "number" : 29 }
{ "_id" : ObjectId("5c77b1dbd44e3fc2ebd26d42"), "position" : "Goalie", "id" : 8471306, "weight" : 220, "height" : "6' 1\"", "imageUrl" : "http://1.cdn.nhle.com/photos/mugs/8471306.jpg", "birthplace" : "Fussen, DEU", "age" : 29, "name" : "Thomas Greiss", "birthdate" : "January 29, 1986", "number" : 1 }


// display the db
> db
test

// display it in pretty way
>  db.players.find().pretty()
{
"_id" : ObjectId("5c77b1dbd44e3fc2ebd26d2a"),
"position" : "Right Wing",
"id" : 8465166,
"weight" : 200,
"height" : "6' 0\"",
"imageUrl" : "http://1.cdn.nhle.com/photos/mugs/8465166.jpg",
"birthplace" : "Seria, BRN",
"age" : 37,
"name" : "Craig Adams",
"birthdate" : "April 26, 1977",
"number" : 27
}
{
"_id" : ObjectId("5c77b1dbd44e3fc2ebd26d2b"),
"position" : "Right Wing",
"id" : 8475761,
"weight" : 195,
"height" : "6' 2\"",
"imageUrl" : "http://1.cdn.nhle.com/photos/mugs/8475761.jpg",
"birthplace" : "Gardena, CA, USA",
"age" : 23,
"name" : "Beau Bennett",
"birthdate" : "November 27, 1991",
"number" : 19
}

// show all tables
>  show collections
players



>  db.players.findOne();
{
"_id" : ObjectId("5c77b1dbd44e3fc2ebd26d2a"),
"position" : "Right Wing",
"id" : 8465166,
"weight" : 200,
"height" : "6' 0\"",
"imageUrl" : "http://1.cdn.nhle.com/photos/mugs/8465166.jpg",
"birthplace" : "Seria, BRN",
"age" : 37,
"name" : "Craig Adams",
"birthdate" : "April 26, 1977",
"number" : 27
}

// remove a record
> db.players.remove({"id":8465166 })
WriteResult({ "nRemoved" : 1 })

// drop a document
> db.players.drop()
true

MongoDB Installation steps in Ubuntu 18.10

MongoDB Installation:
-----------------------
$ sudo apt update

$ sudo apt install mongodb

$ sudo systemctl status mongodb

$ sudo systemctl start / stop / restart / disable / enable mongodb

$ sudo systemctl start mongodb // start it

$ mongo  // to start with CLI
MongoDB shell version v3.6.3
connecting to: mongodb://127.0.0.1:27017
MongoDB server version: 3.6.3



Wednesday, 27 February 2019

MySQL to RDD or Dataframe to Cassandra Integration

// MySQL Table creation with sample data insertion
hadoop@hadoop:~$ sudo mysql;

mysql> create database store;
Query OK, 1 row affected (0.00 sec)

mysql> use store;
Database changed

mysql> create table customer(id int primary key, name varchar(50), city varchar(50));
Query OK, 0 rows affected (0.03 sec)

mysql> insert into customer (id,name,city) values (100,'Sara','Karaikudi');
Query OK, 1 row affected (0.08 sec)

mysql> insert into customer (id,name,city) values (101,'Lara','Manachai');
Query OK, 1 row affected (0.02 sec)

mysql> insert into customer (id,name,city) values (102,'Kalai','Vadagudi');
Query OK, 1 row affected (0.00 sec)

mysql> select * from customer;
+-----+-------+-----------+
| id  | name  | city      |
+-----+-------+-----------+
| 100 | Sara  | Karaikudi |
| 101 | Lara  | Manachai  |
| 102 | Kalai | Vadagudi  |
+-----+-------+-----------+
3 rows in set (0.00 sec)


Create KeySpace and Table in Cassandra : Don't insert any rows there
--------------------------------------------------------------------
$ sudo service cassandra start

$ sudo update-rc.d cassandra defaults


//start CLI for Cassandra
$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh>  describe cluster;

Cluster: Test Cluster
Partitioner: Murmur3Partitioner

cqlsh> CREATE KEYSPACE store with replication = {'class':'SimpleStrategy','replication_factor':1};

cqlsh> use store;

cqlsh:store>  CREATE TABLE customer (id int,name varchar, city varchar, primary key (id));


         

// Spark Shell ->

$ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost


val dfcustomer = spark.read.format("jdbc").
      option("driver","com.mysql.jdbc.Driver").
      option("url","jdbc:mysql://localhost:3306").
      option("dbtable","store.customer").
      option("user","hadoop").
      option("password","hadoop").
      load()
 
dfcustomer.show()

+---+-----+---------+
| id| name|     city|
+---+-----+---------+
|100| Sara|Karaikudi|
|101| Lara| Manachai|
|102|Kalai| Vadagudi|
+---+-----+---------+

dfcustomer.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "customer", "keyspace" -> "store")).save()


// Here we verify the Cassandra data:
cqlsh:store> select * from customer;

 id  | city      | name
-----+-----------+-------
 100 | Karaikudi |  Sara
 102 |  Vadagudi | Kalai
 101 |  Manachai |  Lara

// delete all records
cqlsh:store> delete from customer where id in (100,101,102);


cqlsh:store> select * from customer;

 id | city | name
----+------+------


// Here we make RDD from Dataframe and save RDD + Case Class Schema => Cassandra Table
scala> case class customer(id:Int, name:String, city:String);
defined class customer

scala> val r1 = dfcustomer.rdd
r1: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at rdd at <console>:28

scala> r1.collect.foreach(println)
[100,Sara,Karaikudi]
[101,Lara,Manachai]
[102,Kalai,Vadagudi]

scala> val r = r1.map (x => {
     |        val id = x(0).toString.toInt
     |        val name = x(1).toString
     |        val city = x(2).toString
     |        customer(id,name,city)
     |        })
r: org.apache.spark.rdd.RDD[customer] = MapPartitionsRDD[10] at map at <console>:30

scala> r.collect.foreach(println)
customer(100,Sara,Karaikudi)
customer(101,Lara,Manachai)
customer(102,Kalai,Vadagudi)

// here we write RDD into Cassandra Table
scala> r.saveToCassandra("store","customer");


// Here we verify the Cassandra data:
cqlsh:store> select * from customer;

 id  | city      | name
-----+-----------+-------
 100 | Karaikudi |  Sara
 102 |  Vadagudi | Kalai
 101 |  Manachai |  Lara


----+------+------

Sunday, 24 February 2019

Integrating Cassandra with Spark - Import / Export data between Spark and Cassandra

Cassandra's default port number : 9042

// Start Cassandra server
$ sudo service cassandra start

// Verify Cassandra is up
$ netstat -ln | grep 9042
tcp        0      0 127.0.0.1:9042          0.0.0.0:* 

// to start Cassandra Query Language
hadoop@hadoop:~$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.


Cassandra - Columnar Storage NoSQL
  - hbase is also a columnar
Cassandra and Hbase are same family members

Hadoop :
Master, Slave Architecture
    Name Node, Data Node
HBase:
purely based on Hadoop
Master : HMaster
Slave : HRegion server

Cassandra : Peer To Peer Architecture
Nodes are logically connected as Circle

Every can interact with every other nodes
There is no Master, Slave things
(cassandra server daemon runs on each nodes)

Hive, MySQL - data stored in the form of Tables
Hive, MySQL, RDBMS: Database (Schema)-> Tables -> Rows -> Columns

KeySpace : Schema
KeySpace -> Tables

pure sql language - cql - cassandra query language

schema is known as Keyspace in Cassandra.

// show all schemas (databases)
cqlsh> describe keyspaces;

people  system_schema  system_auth  system  system_distributed  system_traces

//SimpleStrategy means (Single DataCentre and Single Rack)
cqlsh> create schema test1 with replication = {'class':'SimpleStrategy','replication_factor':1};

// Must Include DataCentre here
//NetworkTopologyStrategy with DataCentre : Multiple DataCentre and Multiple Racks
cqlsh> create keyspace if not exists test2 with replication = {'class':'NetworkTopologyStrategy','datacentre':1};

cqlsh> describe keyspaces;

test1   system_schema  system              system_traces
people  system_auth    system_distributed

CREATE KEYSPACE test1 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

test1  people         system_auth  system_distributed
test2  system_schema  system       system_traces 


// while creating cassandra table, primary key must be included - primary key is mandatory

 cqlsh> create table test1.employee(id int primary key, name text, salary int, dept text);
cqlsh> describe test1.employee;

CREATE TABLE test1.employee (
    id int PRIMARY KEY,
    dept text,
    name text,
    salary int
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';


cqlsh> insert into test1.employee(id,name,salary,dept) values (101,'siva',3000,'bigdata');
cqlsh> insert into test1.employee(id,name,salary,dept) values (102,'sakthi',3500,'spark');
cqlsh> insert into test1.employee(id,name,salary,dept) values (103,'prakash',3600,'Java');

cqlsh> select * from test1.employee;

 id  | dept    | name    | salary
-----+---------+---------+--------
 102 |   spark |  sakthi |   3500
 101 | bigdata |    siva |   3000
 103 |    Java | prakash |   3600


cqlsh> create table test1.student(id int primary key, name text, course text, age int);
cqlsh> insert into test1.student(id,name,course,age) values (200,'Sanmugh','Spark',25);
cqlsh> insert into test1.student(id,name,age,course) values (201,'David',22,'Cassandra');
cqlsh> insert into test1.student(name,id,age,course) values ('stella',203,33,'Kafka');
cqlsh> insert into test1.student(name,id,age) values ('John',204,22);

cqlsh> describe test1;

CREATE KEYSPACE test1 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

CREATE TABLE test1.employee (
    id int PRIMARY KEY,
    dept text,
    name text,
    salary int
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

CREATE TABLE test1.student (
    id int PRIMARY KEY,
    age int,
    course text,
    name text
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';


cqlsh> select * from test1.student;

 id  | age | course    | name
-----+-----+-----------+---------
 201 |  22 | Cassandra |   David
 204 |  22 |      null |    John
 203 |  33 |     Kafka |  stella
 200 |  25 |     Spark | Sanmugh


cqlsh> insert into test1.student(id) values (202);
cqlsh> select * from test1.student;

 id  | age  | course    | name
-----+------+-----------+---------
 201 |   22 | Cassandra |   David
 204 |   22 |      null |    John
 203 |   33 |     Kafka |  stella
 200 |   25 |     Spark | Sanmugh
 202 | null |      null |    null

Start spark in one more termina:
----------------------------------
$  spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._


scala> val r1 = sc.cassandraTable("test1","employee")
r1: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:19

scala> r1.collect.foreach(println)
CassandraRow{id: 102, dept: spark, name: sakthi, salary: 3500}               
CassandraRow{id: 101, dept: bigdata, name: siva, salary: 3000}
CassandraRow{id: 103, dept: Java, name: prakash, salary: 3600}

scala> val r2 = sc.cassandraTable("test1","student")
r2: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[1] at RDD at CassandraRDD.scala:19

scala> r2.collect.foreach(println)
CassandraRow{id: 202, age: null, course: null, name: null}                   
CassandraRow{id: 203, age: 33, course: Kafka, name: stella}
CassandraRow{id: 200, age: 25, course: Spark, name: Sanmugh}
CassandraRow{id: 201, age: 22, course: Cassandra, name: David}
CassandraRow{id: 204, age: 22, course: null, name: John}

//Without using Case Class:
// Adding schema to the RDD  just mention the data types only.
scala> val r1 = sc.cassandraTable[(Int,String,String,Int)]("test1","employee")
r1: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Int, String, String, Int)] = CassandraTableScanRDD[2] at RDD at CassandraRDD.scala:19

// Now it is Tuple Here
scala> r1.collect.foreach(println)
(102,spark,sakthi,3500)                                                       
(101,bigdata,siva,3000)
(103,Java,prakash,3600)

// Converting RDD into Dataframe
scala> val df1 = r1.toDF("id","dept","name","salary");
df1: org.apache.spark.sql.DataFrame = [id: int, dept: string ... 2 more fields]

scala> df1.show
2019-02-25 12:28:56 WARN  ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
+---+-------+-------+------+
| id|   dept|   name|salary|
+---+-------+-------+------+
|102|  spark| sakthi| 3500 |
|101|bigdata|   siva| 3000 |
|103|   Java|prakash| 3600 |
+---+-------+-------+------+


//With Using Case Class
scala> case class Emp(id:Int, Dept:String, Name:String, Salary:Int)
defined class Emp

scala> val r1 = sc.cassandraTable[Emp]("test1","employee")
r1: com.datastax.spark.connector.rdd.CassandraTableScanRDD[Emp] = CassandraTableScanRDD[8] at RDD at CassandraRDD.scala:19

// show the records as tuple
scala> r1.collect.foreach(println)
Emp(102,spark,sakthi,3500)                                                   
Emp(101,bigdata,siva,3000)
Emp(103,Java,prakash,3600)

//Making Dataframe from RDD
scala> val df = r1.toDF();
df: org.apache.spark.sql.DataFrame = [id: int, Dept: string ... 2 more fields]

scala> df.show
+---+-------+-------+------+                                                 
| id|   Dept|   Name|Salary|
+---+-------+-------+------+
|102|  spark| sakthi|  3500|
|101|bigdata|   siva|  3000|
|103|   Java|prakash|  3600|
+---+-------+-------+------+

// Before applying schema
scala> val r = sc.cassandraTable("test1","student")
r: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[13] at RDD at CassandraRDD.scala:19

scala> r.collect.foreach(println)
2019-02-25 12:38:03 WARN  ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
CassandraRow{id: 202, age: null, course: null, name: null}
CassandraRow{id: 203, age: 33, course: Kafka, name: stella}
CassandraRow{id: 200, age: 25, course: Spark, name: Sanmugh}
CassandraRow{id: 201, age: 22, course: Cassandra, name: David}
CassandraRow{id: 204, age: 22, course: null, name: John}

// Applying Schema here
scala> val r = sc.cassandraTable[(Int,Int,String,String)]("test1","student")
r: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Int, Int, String, String)] = CassandraTableScanRDD[14] at RDD at CassandraRDD.scala:19

// We have null values in our data, so we will get exception here
scala> r.collect.foreach(println)
com.datastax.spark.connector.types.TypeConversionException: Failed to convert column age of test1.student to Int: null


//If Non null value is there in table it will work. If Null value is there, it wont work
//Int cannot bring data from null
//We have a record which has except id all the columns are null

//Here we applied Option[DataType] for necessary column to avoid exception
scala> val r = sc.cassandraTable[(Option[Int],Option[Int],Option[String],Option[String])]("test1","student")
r: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Option[Int], Option[Int], Option[String], Option[String])] = CassandraTableScanRDD[15] at RDD at CassandraRDD.scala:19

// null value will be displayed as None
scala> r.collect.foreach(println)
2019-02-25 12:42:40 WARN  ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
(Some(202),None,None,None)                                                   
(Some(203),Some(33),Some(Kafka),Some(stella))
(Some(200),Some(25),Some(Spark),Some(Sanmugh))
(Some(201),Some(22),Some(Cassandra),Some(David))
(Some(204),Some(22),None,Some(John))

// converting RDD to Dataframe with column headers
scala> val df = r.toDF("id","age","course","name")
df: org.apache.spark.sql.DataFrame = [id: int, age: int ... 2 more fields]

//Null values are displayed here
scala> df.show
+---+----+----------+-------+
| id| age|course    |   name|
+---+----+----------+-------+
|202|null|      null|   null|
|203|  33|     Kafka| stella|
|200|  25|     Spark|Sanmugh|
|201|  22| Cassandra|  David|
|204|  22|      null|   John|
+---+----+----------+-------+

// Replacing nulls in course column with Bigdata
scala> val df1 = df.na.fill("Bigdata",Array("course"))
df1: org.apache.spark.sql.DataFrame = [id: int, age: int ... 2 more fields]


scala> df1.show
+---+----+---------+-------+
| id| age|   course|   name|
+---+----+---------+-------+
|202|null|  Bigdata|   null|
|203|  33|    Kafka| stella|
|200|  25|    Spark|Sanmugh|
|201|  22|Cassandra|  David|
|204|  22|  Bigdata|   John|
+---+----+---------+-------+


//We didnt mention column names, so what ever string fields has null will be replaced with 'Bigdata'
scala> df1.na.fill("Bigdata").show
+---+----+---------+-------+
| id| age|   course|   name|
+---+----+---------+-------+
|202|null|  Bigdata|Bigdata|  // Here Name is Bigdata -- wrong approach
|203|  33|    Kafka| stella|
|200|  25|    Spark|Sanmugh|
|201|  22|Cassandra|  David|
|204|  22|  Bigdata|   John|
+---+----+---------+-------+

// We didnt specify column names, so whatever numeric fields which has null values will be replaced with 100
scala> df1.na.fill(100).show
2019-02-25 12:52:11 WARN  ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
+---+---+---------+-------+                                                   
| id|age|   course|   name|
+---+---+---------+-------+
|202|100|  Bigdata|   null| // Here age is 100 - wrong data
|203| 33|    Kafka| stella|
|200| 25|    Spark|Sanmugh|
|201| 22|Cassandra|  David|
|204| 22|  Bigdata|   John|
+---+---+---------+-------+


// it will drop which ever rows has null in whatever columns
scala> df.na.drop().show

+---+---+---------+-------+
| id|age|   course|   name|
+---+---+---------+-------+
|203| 33|    Kafka| stella|
|200| 25|    Spark|Sanmugh|
|201| 22|Cassandra|  David|
+---+---+---------+-------+


// Here we are going to Export inmemory RDD content into Cassandra


//Here we create some in memory collection objects

Start spark in one more terminal:
----------------------------------
$  spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._

scala> case class Emp(id:Int, Dept:String,Name:String, Salary:Int)
defined class Emp

scala> val ob1 = new Emp(121,"accounts","Hari",5000);
ob1: Emp = Emp(121,accounts,Hari,5000)

scala> val ob2 = new Emp(122,"HR","Rani",6000);
ob2: Emp = Emp(122,HR,Rani,6000)

scala> val ob3 = new Emp(123,"Marketing","Suresh",6500);
ob3: Emp = Emp(123,Marketing,Suresh,6500)

scala> val r1 = sc.makeRDD(Seq(ob1,ob2,ob3));
r1: org.apache.spark.rdd.RDD[Emp] = ParallelCollectionRDD[40] at makeRDD at <console>:33

scala> r1.collect.foreach(println);
Emp(121,accounts,Hari,5000)
Emp(122,HR,Rani,6000)
Emp(123,Marketing,Suresh,6500)

// Export RDD content into Cassandra
scala> r1.saveToCassandra("test1","employee");

// verify the newly inserted rows in Cassandra
cqlsh> select * from test1.employee;
-----+-----------+---------+--------
 id  | dept      | name    | salary
-----+-----------+---------+--------
 123 | Marketing |  Suresh |   6500 // newly inserted from spark to Cassandra
 122 |        HR |    Rani |   6000 // newly inserted from spark to Cassandra
 121 |  accounts |    Hari |   5000 // newly inserted from spark to Cassandra
 102 |     spark |  sakthi |   3500
 101 |   bigdata |    siva |   3000
 103 |      Java | prakash |   3600

Export Spark RDD into Cassandra Table

// Here we are going to Export inmemory RDD content into Cassandra


//Here we create some in memory collection objects

Start spark in one more terminal:
----------------------------------
$  spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._

scala> case class Emp(id:Int, Dept:String,Name:String, Salary:Int)
defined class Emp

scala> val ob1 = new Emp(121,"accounts","Hari",5000);
ob1: Emp = Emp(121,accounts,Hari,5000)

scala> val ob2 = new Emp(122,"HR","Rani",6000);
ob2: Emp = Emp(122,HR,Rani,6000)

scala> val ob3 = new Emp(123,"Marketing","Suresh",6500);
ob3: Emp = Emp(123,Marketing,Suresh,6500)

scala> val r1 = sc.makeRDD(Seq(ob1,ob2,ob3));
r1: org.apache.spark.rdd.RDD[Emp] = ParallelCollectionRDD[40] at makeRDD at <console>:33

scala> r1.collect.foreach(println);
Emp(121,accounts,Hari,5000)
Emp(122,HR,Rani,6000)
Emp(123,Marketing,Suresh,6500)

// Export RDD content into Cassandra
scala> r1.saveToCassandra("test1","employee");

// verify the newly inserted rows in Cassandra
cqlsh> select * from test1.employee;
-----+-----------+---------+--------
 id  | dept      | name    | salary
-----+-----------+---------+--------
 123 | Marketing |  Suresh |   6500 // newly inserted from spark to Cassandra
 122 |        HR |    Rani |   6000 // newly inserted from spark to Cassandra
 121 |  accounts |    Hari |   5000 // newly inserted from spark to Cassandra
 102 |     spark |  sakthi |   3500
 101 |   bigdata |    siva |   3000
 103 |      Java | prakash |   3600

Saturday, 23 February 2019

Cassandra and Spark Integration

//start cassandra server
$ sudo service cassandra start

$ sudo update-rc.d cassandra defaults


//start CLI for Cassandra
$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh>  describe cluster;

Cluster: Test Cluster
Partitioner: Murmur3Partitioner

cqlsh>  describe keyspaces;

system_traces  system_schema  system_auth  system  system_distributed


cqlsh> CREATE KEYSPACE people with replication = {'class':'SimpleStrategy','replication_factor':1};
cqlsh> use people;
cqlsh:people> describe people;

CREATE TABLE users (
          ... id varchar,
          ... first_name varchar,
          ... last_name varchar,
          ... city varchar,
          ... emails varchar,
          ... PRIMARY KEY (id));


cqlsh:people> insert into users(id,first_name,last_name,city,emails) values ('101','Sankara','narayanan','PLTR','sa@sa.com');


cqlsh:people> insert into users(id,first_name,last_name,city,emails) values ('102','Harish','Kalyan','CHN','ha@ka.in');


cqlsh:people> select * from users;

 id  | city | emails    | first_name | last_name
-----+------+-----------+------------+-----------
 102 |  CHN |  ha@ka.in |     Harish |    Kalyan
 101 | PLTR | sa@sa.com |    Sankara | narayanan

cqlsh:people> describe users;

CREATE TABLE people.users (
    id text PRIMARY KEY,
    city text,
    emails text,
    first_name text,
    last_name text
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';


//Get Spark Cassandra Connector from Maven repository:

<!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector -->
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.11</artifactId>
    <version>2.4.0</version>
</dependency>

Make this:
com.datastax.spark:spark-cassandra-connector_2.11:2.4.0
----------------------------------
It is download connector jar from : https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector_2.11/2.4.0/spark-cassandra-connector_2.11-2.4.0.jar
// https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.4.0"
----------------------------------



// Run Spark with the above package:
spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0



scala> import com.datastax.spark.connector._
scala> import org.apache.spark.SparkConf
scala> import org.apache.spark.SparkContext
scala> import org.apache.spark.SparkContext._

scala> val conf = new SparkConf().setMaster("local").setAppName("sample cassandra app").set("spark.cassandra.connection.host","localhost").set("spark.driver.allowMultipleContexts","true")

scala> val sc = new SparkContext(conf)
scala> val personRDD = sc.cassandraTable("people","users") // KeySpace, Table name

scala> personRDD.take(2).foreach(println)
CassandraRow{id: 101a, city: PLTR, emails: sa@sa.com, first_name: Sankara, last_name: narayanan}
CassandraRow{id: 102, city: CHN, emails: ha@ka.in, first_name: Harish, last_name: Kalyan}

scala> personRDD.count
res1: Long = 2                                                                 

scala> val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map("keyspace" -> "people", "table" -> "users")).load

scala> df.show
+---+----+---------+----------+---------+                                     
| id|city|   emails|first_name|last_name|
+---+----+---------+----------+---------+
|101|PLTR|sa@sa.com|   Sankara|narayanan|
|102| CHN| ha@ka.in|    Harish|   Kalyan|
+---+----+---------+----------+---------+

Cassandra Installation on Ubuntu Steps

$ sudo apt-get upgrade

$ sudo apt autoremove

$ sudo apt-key adv --keyserver pool.sks-keyservers.net --recv-key A278B781FE4B2BDA

$ sudo apt-get update

// verify Python version
$ python -V
Python 2.7.15+



Installing Casandra:
-----------------------
$ echo "deb http://www.apache.org/dist/cassandra/debian 311x main" | sudo tee -a /etc/apt/sources.list.d/cassandra.sources.list
deb http://www.apache.org/dist/cassandra/debian 311x main

$ curl https://www.apache.org/dist/cassandra/KEYS | sudo apt-key add -

$ sudo apt-get update

$ sudo apt-get install cassandra

//start cassandra server
$ sudo service cassandra start

$ sudo update-rc.d cassandra defaults

hadoop@hadoop:/usr/local$ nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  103.67 KiB  256          100.0%            a8c39288-3d56-4768-ba56-9910e9ce02e2  rack1

//start CLI for Cassandra
$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.

Friday, 22 February 2019

Spark Streaming with Flume Integration Example

Start IntelliJ in Windows:
------------------------

build.sbt:
----------
name := "SparkSampleProgram"

version := "0.1"

scalaVersion := "2.11.12"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"

// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume-assembly
libraryDependencies += "org.apache.spark" %% "spark-streaming-flume-assembly" % "2.3.2"


build.properties:
-----------------
sbt.version = 1.0.3


Add scala file : FlumeSpark.sc
--------------------------------
import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level
object FlumeSpark {
  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("spark=flumeintegeration(pure based approach").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val BatchInterval = 20
    val host = "192.168.0.100"
    val portno = 7777
    val ssc = new StreamingContext(sc, Seconds(BatchInterval))

    //Pulling the data from flume application
    val flumedata = FlumeUtils.createStream(ssc, host, portno)
    val res = flumedata.map { x =>
      val event = x.event
      val messageBody = new String(event.getBody.array())
      messageBody
    }
    res.print()
    ssc.start()
    ssc.awaitTermination()
  }
}



In Ubuntu Linux:
----------------
// We are going to run flume in Ubuntu
// In windows we are going to run Spark
// Find the windows local Machine's IPv4 Address
Ethernet adapter Ethernet:

   Connection-specific DNS Suffix  . :
   Link-local IPv6 Address . . . . . : fe80::90c1:53dd:fb3d:51d%3
   IPv4 Address. . . . . . . . . . . : 192.168.0.100
   // we are going to mention it in  agent1.sinks.flumesink.hostname


//Ping to verify it within VM where we are going to run flume 
hadoop@hadoop:~/Desktop/vow$ ping 192.168.0.100
PING 192.168.0.100 (192.168.0.100) 56(84) bytes of data.
64 bytes from 192.168.0.100: icmp_seq=1 ttl=128 time=0.371 ms

hadoop@hadoop:~/Desktop/vow$ gedit flumespark1.conf

flumespark1.conf:
-------------------
//creation of components
agent1.sources = flumesource
agent1.channels = flumechannel
agent1.sinks = flumesink

//Source Configuration
agent1.sources.flumesource.type = netcat
agent1.sources.flumesource.bind = localhost
agent1.sources.flumesource.port = 1234
agent1.sources.flumesource.channels = flumechannel

//Channel Configuration
agent1.channels.flumechannel.type = memory
agent1.channels.flumechannel.capacity=1000
agent1.channels.flumechannel.transactionCapacity=100

//Sink Configuration
agent1.sinks.flumesink.type = avro
agent1.sinks.flumesink.hostname = 192.168.0.100
agent1.sinks.flumesink.port = 7777
agent1.sinks.flumesink.channel = flumechannel





Run the flume in Ubuntu:
--------------------------------
$ flume-ng agent --name agent1 --conf /home/hadoop/Desktop/vow --conf-file /home/hadoop/Desktop/vow/flumespark1.conf -Dflume.root.logger=DEBUG,console



// Give sample text on Ubuntu console to feed Flume
curl telnet://localhost:1234
hadoop@hadoop:~/Desktop/vow$ curl telnet://localhost:1234
i love india
OK
super cool
OK






Run the spark program within IntelliJ IDEA in Windows:
------------------------------------------------------------

Output in IntelliJ console
-------------------------------------------
Time: 1550906600000 ms
-------------------------------------------
i love india // values are coming from flume

-------------------------------------------
Time: 1550906620000 ms
-------------------------------------------
super cool







Flume with Telnet input as source

Before starting Flume with Telnet, make sure you installed telnet.
Or do follow the steps below to install Telnet in Ubuntu.

sudo apt-get -y install telnet
sudo apt-get install xinetd telnetd
sudo /etc/init.d/xinetd restart

sudo apt install net-tools
sudo apt install curl


flume.conf
----------------
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
                           
// Make sure Hadoop daemons are running currently. Else do start-all.sh
//Run Flume
flume-ng agent -n a1 -c /home/hadoop/Desktop/vow -f /home/hadoop/Desktop/vow/flume.conf

2019-02-23 07:21:12,088 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
2019-02-23 07:22:54,114 INFO sink.LoggerSink: Event: { headers:{} body: 69 20 6C 6F 76 65 20 69 6E 64 69 61             i love india }
2019-02-23 07:22:56,003 INFO sink.LoggerSink: Event: { headers:{} body: 77 68 6F 20 69 73 20 62 65 61 75 74 79 3F       who is beauty? }

//Run Telnet
curl telnet://localhost:44444
hadoop@hadoop:~/Desktop/vow$ curl telnet://localhost:44444
i love india
OK
who is beauty?
OK

Wednesday, 20 February 2019

Scala : Try, Success, Failure Examples

// Try..Success..Failure Examples

scala> import scala.util.{Try,Success,Failure}
import scala.util.{Try, Success, Failure}

scala> import scala.io.Source
import scala.io.Source

scala>  def readFile(path:String) : Try[List[String]] = {
     |  Try(Source.fromFile(path).getLines.toList)
     |  }
readFile: (path: String)scala.util.Try[List[String]]

scala> val  validFilePath = "E:\\DataSets\\guns.csv"
invalidFilePath: String = E:\DataSets\gunsnotfound.csv

scala> readFile(validFilePath)
res0: scala.util.Try[List[String]] = Success(List("","year","month","intent","police","sex","age","race","hispanic","place","education", "1",2012,"01"
,"Suicide",0,"M",34,"Asian/Pacific Islander",100,"Home",4, "2",2012,"01","Suicide",0,"F",21,"White",100,"Street",3, "3",2012,"01","Suicide",0,"M",60,"
White",100,"Other specified",4, "4",2012,"02","Suicide",0,"M",64,"White",100,"Home",4, "5",2012,"02","Suicide",0,"M",31,"White",100,"Other specified",
2, "6",2012,"02","Suicide",0,"M",17,"Native American/Native Alaskan",100,"Home",1, "7",2012,"02","Undetermined",0,"M",48,"White",100,"Home",2, "8",201
2,"03","Suicide",0,"M",41,"Native American/Native Alaskan",100,"Home",2, "9",2012,"02","Accidental",0,"M",50,"White",100,"Other specified",3, "10",201
2,"02","Suicide",0,"M",NA,"Black",998,"Home",5,...
scala>

scala> val invalidFilePath = "E:\\DataSets\\gunsnotfound.csv"
invalidFilePath: String = E:\DataSets\gunsnotfound.csv

scala> readFile(invalidFilePath)
res1: scala.util.Try[List[String]] = Failure(java.io.FileNotFoundException: E:\DataSets\gunsnotfound.csv (The system cannot find the file specified))


scala>  readFile(filePath) match {
     | case Success(a) => a.foreach(println)
     | case Failure(b) => println(b.getMessage)
     | }
{"athelete":"Michael Phelps","age":19,"country":"United States","year":"2004","closing":"08-29-04","sport":"Swimming","gold":6,"silver":0,"bronze":2,"
total":8}
{"athelete":"Michael Phelps","age":23,"country":"United States","year":"2008","closing":"08-24-08","sport":"Swimming","gold":8,"silver":0,"bronze":0,"
total":8}



 scala> def parseInt(a:String) : Try[Int] = Try(a.toInt)
parseInt: (a: String)scala.util.Try[Int]

scala> parseInt("2")
res5: scala.util.Try[Int] = Success(2)

scala> parseInt("I Love India")
res6: scala.util.Try[Int] = Failure(java.lang.NumberFormatException: For input string: "I Love India")


scala>  parseInt("2") match {
     |  case Success(a) => println(a)
     |  case Failure(b) => println(b.getMessage)
     |  }
2

scala> parseInt("333").toOption
res10: Option[Int] = Some(333)

scala> parseInt("IloveIndia").toOption
res11: Option[Int] = None

Scala : Either, Left, Right Example

// Either... Left.. Right
scala>  def menu(day:String): Either[Int,String] = {
     |  if (day == "Sunday") Right("Fish Curry")
     |  else if (day == "Monday") Right("Veg Fry")
     |  else Left(0)
     |  }
menu: (day: String)Either[Int,String]

scala> println(menu("Sunday"))
Right(Fish Curry)

scala> println(menu("Monday"))
Right(Veg Fry)

scala> println(menu("Tuesday"))
Left(0)


  menu(input) match {
 case Left(a) => println("Left Answer is : " + a)
 case Right(b) => println("Right Answer is : " + b)
 }
 Right Answer is : Fish Curry

 scala> val c = menu("Monday")
c: Either[Int,String] = Right(Veg Fry)

scala> c.isRight
res3: Boolean = true

scala> c.isLeft
res4: Boolean = false


scala> val c = menu("Monday")
c: Either[Int,String] = Right(Veg Fry)

scala> c.isLeft
res5: Boolean = false
scala>
scala> c.isRight
res6: Boolean = true

scala> val input = "Sunday"
input: String = Sunday

scala> if (menu(input).isLeft) menu(input).left.map( a => println("Answer is : " + a))
res7: Any = ()


scala>  if (menu(input).isRight) menu(input).right.map( b => println("Answer is : " + b))
Answer is : Fish Curry
res1: Any = Right(())



scala> val c = menu("Monday")
c: Either[Int,String] = Right(Veg Fry)

scala> c
res3: Either[Int,String] = Right(Veg Fry)


scala> c.left.toOption
res8: Option[Int] = None

scala> c.right.toOption
res9: Option[String] = Some(Veg Fry)



def divideXByY(x: Int, y: Int): Either[String, Int] = {
      if (y == 0) Left("Dude, can't divide by 0")
      else Right(x / y)
  }
 
  // a few different ways to use Either, Left, and Right
  println(divideXByY(1, 0))
  println(divideXByY(1, 1))
  divideXByY(1, 0) match {
      case Left(s) => println("Answer: " + s)
      case Right(i) => println("Answer: " + i)
  }

}

Scala : Option Some None Example

Examples for Option..Some..None
-----------------------------------

// Simple Example
scala> def menu(day:String): Option[String] = {
     | day match {
     | case "Sunday" => Some("Fish Curry")
     | case "Monday" => Some("Veg Curry")
     | case none => None
     | }
     | }
menu: (day: String)Option[String]

scala> val foodItem = menu("Monday")
foodItem: Option[String] = Some(Veg Curry)


scala> val foodItem = menu("Sunday").getOrElse("General Food")
foodItem: String = Fish Curry

scala> val foodItem = menu("Monday").getOrElse("General Food")
foodItem: String = Veg Curry

scala> val foodItem = menu("Tuesday").getOrElse("General Food")
foodItem: String = General Food





// Regex Pattern Matching Example
scala> import scala.util.matching.Regex
import scala.util.matching.Regex

scala> val np = new Regex("[0-9]+")
np: scala.util.matching.Regex = [0-9]+

scala> val address = "1024, Vana IT Services"
address: String = 1024, Vana IT Services

scala> val match1 = np.findFirstIn(address)
match1: Option[String] = Some(1024)

scala> val address2 = "Pradham Gardenia"
address2: String = Pradham Gardenia

scala> val match2 = np.findFirstIn(address2)
match2: Option[String] = None

scala> val match1 = np.findFirstIn(address).getOrElse("No House Number found")
match1: String = 1024

scala> val match1 = np.findFirstIn(address2).getOrElse("No House Number found")
match1: String = No House Number found





// Extract numbers from a List and calcualte the sum
scala> def toInt(s:String):Option[Int] ={
     | try{
     | Some(s.toInt)
     | }
     | catch {
     | case e: Exception => None
     | }
     | }
toInt: (s: String)Option[Int]

scala> println(toInt("1").getOrElse(0))
1

scala> println(toInt("a").getOrElse(0))
0

scala> val l1 = List("1","2","Lion","4","Tiger")
l1: List[String] = List(1, 2, Lion, 4, Tiger)

scala> val sum = l1.flatMap(toInt).sum
sum: Int = 7

// class example #1
package com.spark.learning

class Person(firstName:String, lastName:String, email:Option[String]){
  var myemail = email
  def displayOutput = {
    println("First Name : " +  firstName)
    println("Last Name : " + lastName)
    println("Email : " + myemail.getOrElse("No Email"))
  }
}

object OptionSomeNoneExa extends App {
  val p = new Person("Super","Star",None)
  p.displayOutput

  p.myemail = Some("test@test.com")
  p.displayOutput
}


Result:
-------
First Name : Super
Last Name : Star
Email : No Email

First Name : Super
Last Name : Star
Email : test@test.com




// class example #2
package com.spark.learning
case class Address(city:String, state:String, country:String)
class UserInfo(emailID:String, phone:String) {
  var firstName = None: Option[String]
  var lastName = None: Option[String]
  var address = None: Option[Address]
}

object Test extends App {
  var u = new UserInfo("test@test.com","9886177375")
  u.firstName = Some("Sankara ")
  u.lastName = Some("Narayanan")
  u.address = Some(Address("Pallathur","TamilNadu","India"))

  println(s"First Name is : ${u.firstName.getOrElse("First Name Not found")}")
  println(s"Last Name is : ${u.lastName.getOrElse("Last Name Not found")}")

  u.address.foreach { x =>
    println("City is : " +  x.city)
    println("State is : " + x.state)
    println("Country is : " + x.country)
  }


    u = new UserInfo("try@test.com","9886177374")

  u.address = Some(Address("Pallathur","","India"))

  println(s" First Name is : ${u.firstName.getOrElse("First Name Not found")}")
  println(s" Last Name is : ${u.lastName.getOrElse("Last Name Not found")}")

  u.address.foreach { x =>
    println("City is : " +  x.city)
    println("State is : " + x.state)
    println("Country is : " + x.country)
  }
}

Result:
--------

First Name is : Sankara
Last Name is : Narayanan
City is : Pallathur
State is : TamilNadu
Country is : India


First Name is : First Name Not found
Last Name is : Last Name Not found
City is : Pallathur
State is :
Country is : India

Tuesday, 19 February 2019

Multi line JSON parser using Spark Dataframe

Input file
tags_sample.json:
-----------------------

{
  "stackoverflow": [{
    "tag": {
      "id": 1,
      "name": "scala",
      "author": "Martin Odersky",
      "frameworks": [
        {
          "id": 1,
          "name": "Play Framework"
        },
        {
          "id": 2,
          "name": "Akka Framework"
        }
      ]
    }
  },
    {
      "tag": {
        "id": 2,
        "name": "java",
        "author": "James Gosling",
        "frameworks": [
          {
            "id": 1,
            "name": "Apache Tomcat"
          },
          {
            "id": 2,
            "name": "Spring Boot"
          }
        ]
      }
    }
  ]
}


scala> import spark.sqlContext.implicits._
import spark.sqlContext.implicits._

scala> val tagsDF = spark.read.option("multiLine", true).option("inferSchema", true).format("json").load("E:\\DataSets\\tags_sample.json")
tagsDF: org.apache.spark.sql.DataFrame = [stackoverflow: array<struct<tag:struct<author:string,frameworks:array<struct<id:bigint,name:string>>,id:bigi
nt,name:string>>>]


scala>

scala> tagsDF.printSchema
root
 |-- stackoverflow: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- tag: struct (nullable = true)
 |    |    |    |-- author: string (nullable = true)
 |    |    |    |-- frameworks: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)

scala> tagsDF.show  // It is not correct
+--------------------+
|       stackoverflow|
+--------------------+
|[[[Martin Odersky...|
+--------------------+


scala> val df = tagsDF.select(explode($"stackoverflow") as "stags")
df: org.apache.spark.sql.DataFrame = [stags: struct<tag: struct<author: string, frameworks:

scala> df.printSchema
root
 |-- stags: struct (nullable = true)
 |    |-- tag: struct (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- frameworks: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)




scala> val df1 = df.select(
     |               $"stags.tag.id" as "id",
     |               $"stags.tag.author" as "author",
     |               $"stags.tag.name" as "tag_name",
     |               $"stags.tag.frameworks.id" as "frameworks_id",
     |               $"stags.tag.frameworks.name" as "frameworks_name"
     |             )
df1: org.apache.spark.sql.DataFrame = [id: bigint, author: string ... 3 more fields]

scala> df1.show
+---+--------------+--------+-------------+--------------------+
| id|        author|tag_name|frameworks_id|     frameworks_name|
+---+--------------+--------+-------------+--------------------+
|  1|Martin Odersky|   scala|       [1, 2]|[Play Framework, ...|
|  2| James Gosling|    java|       [1, 2]|[Apache Tomcat, S...|
+---+--------------+--------+-------------+--------------------+


Sunday, 17 February 2019

Producer and Consumer Programming in Spark Streaming

Kafka_Producer.sc:
------------------


package Kafka_Stream

import java.util.Properties
import scala.util.Random
import org.apache.kafka.clients.producer.{KafkaProducer,ProducerRecord}

object Kafka_Producer {
  def main(args: Array[String]): Unit = {
    val brokers = "localhost:9092"

    //change the topic name if required
    val topic = "kafka-sparkstream-test"
    val messagesPerSec = 1
    val wordsPerMessage = 15
    val random = new Random()
    val names = Seq("Hi", "Hello", "How", "are", "you", "test", "yes", "no", "fine", "great", "ok", "not", "that")

    //ZooKeeper connection properties
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    //send some messages
    while (true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => random.shuffle(names).head).mkString(" ")
        println(str)
        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }
      Thread.sleep(1000)
    }
  }
}

build.properties:
-----------------
sbt.version = 1.2.8

build.sbt dependency packages:
--------------------------------
name := "TwitInt"

version := "0.1"

scalaVersion := "2.11.12"


// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"





Kafka_Stream.sc:
---------------
package Kafka_Stream
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils

import org.apache.log4j.Logger
import org.apache.log4j.Level

object Kafka_Stream{
  def main(args:Array[String]):Unit={
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)

    val conf = new SparkConf()
    conf.set("spark.master","local[3]")
    conf.set("spark.app.name","KafkaReceiver")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(20))

    val KafkaStream = KafkaUtils.createStream(ssc,"localhost:2181","spark-streaming-consumer-group",Map("kafka-sparkstream-test" -> 5))

    //need to change the topic name and port number accordingly
    val words = KafkaStream.flatMap(x => x._2.split(" "))
    val wordCounts = words.map(x => (x,1)).reduceByKey(_+_)

   // KafkaStream.print() // prints the Stream of data received
    wordCounts.print() // prints the wordcount result of the streaming


    ssc.start()
    ssc.awaitTermination()
  }
}

Step 1:

//Make sure zookeeper is running:

hadoop@hadoop:/usr/local/kafka$ sh bin/zookeeper-server-start.sh config/zookeer.properties


Step 2:

//Start the kafka server:

hadoop@hadoop:/usr/local/kafka$  sh bin/kafka-server-start.sh config/server.properties



Step 3:
// create kafka topic

hadoop@hadoop:/usr/local/kafka/bin$ sh kafka-topics.sh --create --topic kafka-sparkstream-test --zookeeper localhost:2181 --replication-factor 1 --partitions 3
Created topic "kafka-sparkstream-test".

// list the kafka topics to verify
hadoop@hadoop:/usr/local/kafka/bin$ sh kafka-topics.sh --zookeeper localhost:2181 --list
kafka-sparkstream-test






hadoop@hadoop:/tmp/kafka-logs/kafka-sparkstream-test-0$ ls -l
total 20488
-rw-r--r-- 1 hadoop hadoop 10485760 Feb 17 20:52 00000000000000000000.index
-rw-r--r-- 1 hadoop hadoop     2109 Feb 17 20:56 00000000000000000000.log
-rw-r--r-- 1 hadoop hadoop 10485756 Feb 17 20:52 00000000000000000000.timeindex
-rw-r--r-- 1 hadoop hadoop        8 Feb 17 20:52 leader-epoch-checkpoint


[IJ]sbt:TwitInt> compile


sbt>> package

[IJ]sbt:TwitInt> show discoveredMainClasses
[info] * IntTwit
[info] * Kafka_Stream.Kafka_Producer
[info] * Kafka_Stream.Kafka_Stream
[success] Total time: 2 s, completed 17 Feb, 2019 9:29:23 PM


<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.11</artifactId>
    <version>1.6.3</version>
</dependency>
org.apache.spark:spark-streaming-kafka_2.11:1.6.3


//start an  instance of spark shell
$ spark-shell --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3

scala> :load Kafka_Producer.scala

scala> Kafka_Producer.main(null)
ok you test are How you not you that ok great you are How How
How fine How not test yes fine not Hi great great great that fine great
that Hello ok fine yes you yes you not fine that are yes Hello that
not great test you fine are are no that ok not no Hello that test
test ok that no fine that that How not great ok ok ok not Hello


//create new instance of spark-shell
$ spark-shell --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3

scala> :load Kafka_Stream.scala
Loading Kafka_Stream.scala...
Kafka_Stream.scala:1: error: illegal start of definition
package Kafka_Stream
^
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
defined object Kafka_Stream

scala> Kafka_Stream.main(null)

// result of wordcount will be displayed here.

spark-submit --class Kafka_Stream.Kafka_Producer /home/hadoop/Desktop/vow/TwitInt/target/scala-2.11/twitint_2.11-0.1.jar

spark-submit --class Kafka_Stream.Kafka_Stream /home/hadoop/Desktop/vow/TwitInt/target/scala-2.11/twitint_2.11-0.1.jar

Friday, 15 February 2019

Kafka Console Producer and Consumer Step by Step

Kafka
-------
ZooKeeper
Broker
Topic
Partition
Replica

Producer

Consumer

[cloudera@quickstart bin]$ su
Password: cloudera

[root@quickstart bin]# pwd
/home/cloudera/kafka_2.11-1.0.0/bin

[root@quickstart kafka_2.11-1.0.0]# cd config

[root@quickstart config]# ls
connect-console-sink.properties    consumer.properties
connect-console-source.properties  log4j.properties
connect-distributed.properties     producer.properties
connect-file-sink.properties       server.properties
connect-file-source.properties     tools-log4j.properties
connect-log4j.properties           zookeeper.properties
connect-standalone.properties


All broker should refer the same port number mentioned in ZooKeeper


Spark Streaming and Kafka Integration steps:
1. start ZooKeeper server
2. Start Kafka Brokers (one or more)
3. Create Topic
4. Start Console Producer (To write Message into Broker's topic)
5. start console consumer (To test)
6. create spark streaming context, which streams from kafka topic
7. perform transfromations or aggregations
8. output operation


[root@quickstart kafka_2.11-1.0.0]# gedit config/zookeeper.properties
change port number : 2181 to 2182
save it and close it

pwd
/home/cloudera/kafka_2.11-1.0.0
[root@quickstart kafka_2.11-1.0.0]#

start zookeeper server:
Step #1: sh bin/zookeeper-server-start.sh config/zookeeper.properties
(at the end : [2018-10-20 01:55:47,072] INFO binding to port 0.0.0.0/0.0.0.0:2182 (org.apache.zookeeper.server.NIOServerCnxnFactory)


[root@quickstart kafka_2.11-1.0.0]# gedit config/server.properties
change zookeeper.connect=localhost:2181 ==> zookeeper.connect=localhost:2182

start Kafka Server:
step #2: # sh bin/kafka-server-start.sh config/server.properties
(at the end : INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

create a kafka topic:
step #3: sh bin/kafka-topics.sh --create  --zookeeper quickstart.cloudera:2182 --replication-factor 1 --partitions 3 --topic testtopic
sh bin/kafka-topics.sh --create  --zookeeper quickstart.cloudera:2182 --replication-factor 1 --partitions 3 --topic mytopic
Created topic "mytopic".


to list available topic in kafka broker:
step #4: sh bin/kafka-topics.sh --list --zookeeper localhost:2182
mytopic
testtopic

to write messages into kafka topic: (with the help of kafka console producer)
step #5: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
>hello
>how are you
>bye

step #6: sh bin/kafka-console-consumer.sh --zookeeper quickstart.cloudera:2182 --topic mytopic --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
bye
hello
how are you


Twitter Streaming access via Spark Streaming

import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level

object IntTwit{
  def main(args:Array[String]):Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)
    //create a configuration object
    val conf = new SparkConf()
    conf.set("spark.master","local[2]")
    conf.set("spark.app.name","streamingApp1")

    //creation of spark context object
    val sc = new SparkContext(conf)


         //AWS Credentials
//sc.hadoopConfiguration.set("fs.s3a.access.key","AKIAJ7....")
//sc.hadoopConfiguration.set("fs.s3a.secret.key","Btqy7XO...")

// collect these keys from https://developer.twitter.com
    System.setProperty("twitter4j.oauth.consumerKey","5GtJQ213......")
    System.setProperty("twitter4j.oauth.consumerSecret","1k1phi12f....")
    System.setProperty("twitter4j.oauth.accessToken","19807726-7pxmUfe...")
    System.setProperty("twitter4j.oauth.accessTokenSecret","AmJ0K5gQ4pIxK....")

    val ssc = new StreamingContext(sc,Seconds(10))

    //Receiving tweets from twitter
    val ds1 = TwitterUtils.createStream(ssc,None)
     ds1.print()
    val ds2 = ds1.map (x => x.getText).filter(x => x.contains("RIPBraveHearts"))

    ds2.print()

         //Write gathered tweets into Amazon bucket
//ds2.saveAsTextFiles("s3a://sparksamplebucket/trumptweets")

      //Write gathered tweents into hadoop file system
ds2.saveAsTextFiles("hdfs://localhost:9000/trumptweets")

    ssc.start()
    ssc.awaitTermination()
  }
}



build.sbt:
-------------
name := "TwitInt"

version := "0.1"

scalaVersion := "2.11.12"


// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.3"




build.properties:
sbt.version = 1.2.8


Download spark-core_2.11-1.5.2.logging.jar from https://raw.githubusercontent.com/swordsmanliu/SparkStreamingHbase/master/lib/spark-core_2.11-1.5.2.logging.jar

Add that .jar file in IntelliJ

Click File from the toolbar
Project Structure
Select Modules at the left panel
Dependencies tab
'+' → JARs or directories
Select spark-core_2.11-1.5.2.logging.jar

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...