Showing posts with label cassandra. Show all posts
Showing posts with label cassandra. Show all posts

Wednesday, 27 February 2019

MySQL to RDD or Dataframe to Cassandra Integration

// MySQL Table creation with sample data insertion
hadoop@hadoop:~$ sudo mysql;

mysql> create database store;
Query OK, 1 row affected (0.00 sec)

mysql> use store;
Database changed

mysql> create table customer(id int primary key, name varchar(50), city varchar(50));
Query OK, 0 rows affected (0.03 sec)

mysql> insert into customer (id,name,city) values (100,'Sara','Karaikudi');
Query OK, 1 row affected (0.08 sec)

mysql> insert into customer (id,name,city) values (101,'Lara','Manachai');
Query OK, 1 row affected (0.02 sec)

mysql> insert into customer (id,name,city) values (102,'Kalai','Vadagudi');
Query OK, 1 row affected (0.00 sec)

mysql> select * from customer;
+-----+-------+-----------+
| id  | name  | city      |
+-----+-------+-----------+
| 100 | Sara  | Karaikudi |
| 101 | Lara  | Manachai  |
| 102 | Kalai | Vadagudi  |
+-----+-------+-----------+
3 rows in set (0.00 sec)


Create KeySpace and Table in Cassandra : Don't insert any rows there
--------------------------------------------------------------------
$ sudo service cassandra start

$ sudo update-rc.d cassandra defaults


//start CLI for Cassandra
$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh>  describe cluster;

Cluster: Test Cluster
Partitioner: Murmur3Partitioner

cqlsh> CREATE KEYSPACE store with replication = {'class':'SimpleStrategy','replication_factor':1};

cqlsh> use store;

cqlsh:store>  CREATE TABLE customer (id int,name varchar, city varchar, primary key (id));


         

// Spark Shell ->

$ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost


val dfcustomer = spark.read.format("jdbc").
      option("driver","com.mysql.jdbc.Driver").
      option("url","jdbc:mysql://localhost:3306").
      option("dbtable","store.customer").
      option("user","hadoop").
      option("password","hadoop").
      load()
 
dfcustomer.show()

+---+-----+---------+
| id| name|     city|
+---+-----+---------+
|100| Sara|Karaikudi|
|101| Lara| Manachai|
|102|Kalai| Vadagudi|
+---+-----+---------+

dfcustomer.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "customer", "keyspace" -> "store")).save()


// Here we verify the Cassandra data:
cqlsh:store> select * from customer;

 id  | city      | name
-----+-----------+-------
 100 | Karaikudi |  Sara
 102 |  Vadagudi | Kalai
 101 |  Manachai |  Lara

// delete all records
cqlsh:store> delete from customer where id in (100,101,102);


cqlsh:store> select * from customer;

 id | city | name
----+------+------


// Here we make RDD from Dataframe and save RDD + Case Class Schema => Cassandra Table
scala> case class customer(id:Int, name:String, city:String);
defined class customer

scala> val r1 = dfcustomer.rdd
r1: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at rdd at <console>:28

scala> r1.collect.foreach(println)
[100,Sara,Karaikudi]
[101,Lara,Manachai]
[102,Kalai,Vadagudi]

scala> val r = r1.map (x => {
     |        val id = x(0).toString.toInt
     |        val name = x(1).toString
     |        val city = x(2).toString
     |        customer(id,name,city)
     |        })
r: org.apache.spark.rdd.RDD[customer] = MapPartitionsRDD[10] at map at <console>:30

scala> r.collect.foreach(println)
customer(100,Sara,Karaikudi)
customer(101,Lara,Manachai)
customer(102,Kalai,Vadagudi)

// here we write RDD into Cassandra Table
scala> r.saveToCassandra("store","customer");


// Here we verify the Cassandra data:
cqlsh:store> select * from customer;

 id  | city      | name
-----+-----------+-------
 100 | Karaikudi |  Sara
 102 |  Vadagudi | Kalai
 101 |  Manachai |  Lara


----+------+------

Sunday, 24 February 2019

Integrating Cassandra with Spark - Import / Export data between Spark and Cassandra

Cassandra's default port number : 9042

// Start Cassandra server
$ sudo service cassandra start

// Verify Cassandra is up
$ netstat -ln | grep 9042
tcp        0      0 127.0.0.1:9042          0.0.0.0:* 

// to start Cassandra Query Language
hadoop@hadoop:~$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.


Cassandra - Columnar Storage NoSQL
  - hbase is also a columnar
Cassandra and Hbase are same family members

Hadoop :
Master, Slave Architecture
    Name Node, Data Node
HBase:
purely based on Hadoop
Master : HMaster
Slave : HRegion server

Cassandra : Peer To Peer Architecture
Nodes are logically connected as Circle

Every can interact with every other nodes
There is no Master, Slave things
(cassandra server daemon runs on each nodes)

Hive, MySQL - data stored in the form of Tables
Hive, MySQL, RDBMS: Database (Schema)-> Tables -> Rows -> Columns

KeySpace : Schema
KeySpace -> Tables

pure sql language - cql - cassandra query language

schema is known as Keyspace in Cassandra.

// show all schemas (databases)
cqlsh> describe keyspaces;

people  system_schema  system_auth  system  system_distributed  system_traces

//SimpleStrategy means (Single DataCentre and Single Rack)
cqlsh> create schema test1 with replication = {'class':'SimpleStrategy','replication_factor':1};

// Must Include DataCentre here
//NetworkTopologyStrategy with DataCentre : Multiple DataCentre and Multiple Racks
cqlsh> create keyspace if not exists test2 with replication = {'class':'NetworkTopologyStrategy','datacentre':1};

cqlsh> describe keyspaces;

test1   system_schema  system              system_traces
people  system_auth    system_distributed

CREATE KEYSPACE test1 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

test1  people         system_auth  system_distributed
test2  system_schema  system       system_traces 


// while creating cassandra table, primary key must be included - primary key is mandatory

 cqlsh> create table test1.employee(id int primary key, name text, salary int, dept text);
cqlsh> describe test1.employee;

CREATE TABLE test1.employee (
    id int PRIMARY KEY,
    dept text,
    name text,
    salary int
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';


cqlsh> insert into test1.employee(id,name,salary,dept) values (101,'siva',3000,'bigdata');
cqlsh> insert into test1.employee(id,name,salary,dept) values (102,'sakthi',3500,'spark');
cqlsh> insert into test1.employee(id,name,salary,dept) values (103,'prakash',3600,'Java');

cqlsh> select * from test1.employee;

 id  | dept    | name    | salary
-----+---------+---------+--------
 102 |   spark |  sakthi |   3500
 101 | bigdata |    siva |   3000
 103 |    Java | prakash |   3600


cqlsh> create table test1.student(id int primary key, name text, course text, age int);
cqlsh> insert into test1.student(id,name,course,age) values (200,'Sanmugh','Spark',25);
cqlsh> insert into test1.student(id,name,age,course) values (201,'David',22,'Cassandra');
cqlsh> insert into test1.student(name,id,age,course) values ('stella',203,33,'Kafka');
cqlsh> insert into test1.student(name,id,age) values ('John',204,22);

cqlsh> describe test1;

CREATE KEYSPACE test1 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

CREATE TABLE test1.employee (
    id int PRIMARY KEY,
    dept text,
    name text,
    salary int
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

CREATE TABLE test1.student (
    id int PRIMARY KEY,
    age int,
    course text,
    name text
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';


cqlsh> select * from test1.student;

 id  | age | course    | name
-----+-----+-----------+---------
 201 |  22 | Cassandra |   David
 204 |  22 |      null |    John
 203 |  33 |     Kafka |  stella
 200 |  25 |     Spark | Sanmugh


cqlsh> insert into test1.student(id) values (202);
cqlsh> select * from test1.student;

 id  | age  | course    | name
-----+------+-----------+---------
 201 |   22 | Cassandra |   David
 204 |   22 |      null |    John
 203 |   33 |     Kafka |  stella
 200 |   25 |     Spark | Sanmugh
 202 | null |      null |    null

Start spark in one more termina:
----------------------------------
$  spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._


scala> val r1 = sc.cassandraTable("test1","employee")
r1: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:19

scala> r1.collect.foreach(println)
CassandraRow{id: 102, dept: spark, name: sakthi, salary: 3500}               
CassandraRow{id: 101, dept: bigdata, name: siva, salary: 3000}
CassandraRow{id: 103, dept: Java, name: prakash, salary: 3600}

scala> val r2 = sc.cassandraTable("test1","student")
r2: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[1] at RDD at CassandraRDD.scala:19

scala> r2.collect.foreach(println)
CassandraRow{id: 202, age: null, course: null, name: null}                   
CassandraRow{id: 203, age: 33, course: Kafka, name: stella}
CassandraRow{id: 200, age: 25, course: Spark, name: Sanmugh}
CassandraRow{id: 201, age: 22, course: Cassandra, name: David}
CassandraRow{id: 204, age: 22, course: null, name: John}

//Without using Case Class:
// Adding schema to the RDD  just mention the data types only.
scala> val r1 = sc.cassandraTable[(Int,String,String,Int)]("test1","employee")
r1: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Int, String, String, Int)] = CassandraTableScanRDD[2] at RDD at CassandraRDD.scala:19

// Now it is Tuple Here
scala> r1.collect.foreach(println)
(102,spark,sakthi,3500)                                                       
(101,bigdata,siva,3000)
(103,Java,prakash,3600)

// Converting RDD into Dataframe
scala> val df1 = r1.toDF("id","dept","name","salary");
df1: org.apache.spark.sql.DataFrame = [id: int, dept: string ... 2 more fields]

scala> df1.show
2019-02-25 12:28:56 WARN  ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
+---+-------+-------+------+
| id|   dept|   name|salary|
+---+-------+-------+------+
|102|  spark| sakthi| 3500 |
|101|bigdata|   siva| 3000 |
|103|   Java|prakash| 3600 |
+---+-------+-------+------+


//With Using Case Class
scala> case class Emp(id:Int, Dept:String, Name:String, Salary:Int)
defined class Emp

scala> val r1 = sc.cassandraTable[Emp]("test1","employee")
r1: com.datastax.spark.connector.rdd.CassandraTableScanRDD[Emp] = CassandraTableScanRDD[8] at RDD at CassandraRDD.scala:19

// show the records as tuple
scala> r1.collect.foreach(println)
Emp(102,spark,sakthi,3500)                                                   
Emp(101,bigdata,siva,3000)
Emp(103,Java,prakash,3600)

//Making Dataframe from RDD
scala> val df = r1.toDF();
df: org.apache.spark.sql.DataFrame = [id: int, Dept: string ... 2 more fields]

scala> df.show
+---+-------+-------+------+                                                 
| id|   Dept|   Name|Salary|
+---+-------+-------+------+
|102|  spark| sakthi|  3500|
|101|bigdata|   siva|  3000|
|103|   Java|prakash|  3600|
+---+-------+-------+------+

// Before applying schema
scala> val r = sc.cassandraTable("test1","student")
r: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[13] at RDD at CassandraRDD.scala:19

scala> r.collect.foreach(println)
2019-02-25 12:38:03 WARN  ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
CassandraRow{id: 202, age: null, course: null, name: null}
CassandraRow{id: 203, age: 33, course: Kafka, name: stella}
CassandraRow{id: 200, age: 25, course: Spark, name: Sanmugh}
CassandraRow{id: 201, age: 22, course: Cassandra, name: David}
CassandraRow{id: 204, age: 22, course: null, name: John}

// Applying Schema here
scala> val r = sc.cassandraTable[(Int,Int,String,String)]("test1","student")
r: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Int, Int, String, String)] = CassandraTableScanRDD[14] at RDD at CassandraRDD.scala:19

// We have null values in our data, so we will get exception here
scala> r.collect.foreach(println)
com.datastax.spark.connector.types.TypeConversionException: Failed to convert column age of test1.student to Int: null


//If Non null value is there in table it will work. If Null value is there, it wont work
//Int cannot bring data from null
//We have a record which has except id all the columns are null

//Here we applied Option[DataType] for necessary column to avoid exception
scala> val r = sc.cassandraTable[(Option[Int],Option[Int],Option[String],Option[String])]("test1","student")
r: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Option[Int], Option[Int], Option[String], Option[String])] = CassandraTableScanRDD[15] at RDD at CassandraRDD.scala:19

// null value will be displayed as None
scala> r.collect.foreach(println)
2019-02-25 12:42:40 WARN  ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
(Some(202),None,None,None)                                                   
(Some(203),Some(33),Some(Kafka),Some(stella))
(Some(200),Some(25),Some(Spark),Some(Sanmugh))
(Some(201),Some(22),Some(Cassandra),Some(David))
(Some(204),Some(22),None,Some(John))

// converting RDD to Dataframe with column headers
scala> val df = r.toDF("id","age","course","name")
df: org.apache.spark.sql.DataFrame = [id: int, age: int ... 2 more fields]

//Null values are displayed here
scala> df.show
+---+----+----------+-------+
| id| age|course    |   name|
+---+----+----------+-------+
|202|null|      null|   null|
|203|  33|     Kafka| stella|
|200|  25|     Spark|Sanmugh|
|201|  22| Cassandra|  David|
|204|  22|      null|   John|
+---+----+----------+-------+

// Replacing nulls in course column with Bigdata
scala> val df1 = df.na.fill("Bigdata",Array("course"))
df1: org.apache.spark.sql.DataFrame = [id: int, age: int ... 2 more fields]


scala> df1.show
+---+----+---------+-------+
| id| age|   course|   name|
+---+----+---------+-------+
|202|null|  Bigdata|   null|
|203|  33|    Kafka| stella|
|200|  25|    Spark|Sanmugh|
|201|  22|Cassandra|  David|
|204|  22|  Bigdata|   John|
+---+----+---------+-------+


//We didnt mention column names, so what ever string fields has null will be replaced with 'Bigdata'
scala> df1.na.fill("Bigdata").show
+---+----+---------+-------+
| id| age|   course|   name|
+---+----+---------+-------+
|202|null|  Bigdata|Bigdata|  // Here Name is Bigdata -- wrong approach
|203|  33|    Kafka| stella|
|200|  25|    Spark|Sanmugh|
|201|  22|Cassandra|  David|
|204|  22|  Bigdata|   John|
+---+----+---------+-------+

// We didnt specify column names, so whatever numeric fields which has null values will be replaced with 100
scala> df1.na.fill(100).show
2019-02-25 12:52:11 WARN  ReplicationStrategy$NetworkTopologyStrategy:200 - Error while computing token map for keyspace test2 with datacenter datacentre: could not achieve replication factor 1 (found 0 replicas only), check your keyspace replication settings.
+---+---+---------+-------+                                                   
| id|age|   course|   name|
+---+---+---------+-------+
|202|100|  Bigdata|   null| // Here age is 100 - wrong data
|203| 33|    Kafka| stella|
|200| 25|    Spark|Sanmugh|
|201| 22|Cassandra|  David|
|204| 22|  Bigdata|   John|
+---+---+---------+-------+


// it will drop which ever rows has null in whatever columns
scala> df.na.drop().show

+---+---+---------+-------+
| id|age|   course|   name|
+---+---+---------+-------+
|203| 33|    Kafka| stella|
|200| 25|    Spark|Sanmugh|
|201| 22|Cassandra|  David|
+---+---+---------+-------+


// Here we are going to Export inmemory RDD content into Cassandra


//Here we create some in memory collection objects

Start spark in one more terminal:
----------------------------------
$  spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._

scala> case class Emp(id:Int, Dept:String,Name:String, Salary:Int)
defined class Emp

scala> val ob1 = new Emp(121,"accounts","Hari",5000);
ob1: Emp = Emp(121,accounts,Hari,5000)

scala> val ob2 = new Emp(122,"HR","Rani",6000);
ob2: Emp = Emp(122,HR,Rani,6000)

scala> val ob3 = new Emp(123,"Marketing","Suresh",6500);
ob3: Emp = Emp(123,Marketing,Suresh,6500)

scala> val r1 = sc.makeRDD(Seq(ob1,ob2,ob3));
r1: org.apache.spark.rdd.RDD[Emp] = ParallelCollectionRDD[40] at makeRDD at <console>:33

scala> r1.collect.foreach(println);
Emp(121,accounts,Hari,5000)
Emp(122,HR,Rani,6000)
Emp(123,Marketing,Suresh,6500)

// Export RDD content into Cassandra
scala> r1.saveToCassandra("test1","employee");

// verify the newly inserted rows in Cassandra
cqlsh> select * from test1.employee;
-----+-----------+---------+--------
 id  | dept      | name    | salary
-----+-----------+---------+--------
 123 | Marketing |  Suresh |   6500 // newly inserted from spark to Cassandra
 122 |        HR |    Rani |   6000 // newly inserted from spark to Cassandra
 121 |  accounts |    Hari |   5000 // newly inserted from spark to Cassandra
 102 |     spark |  sakthi |   3500
 101 |   bigdata |    siva |   3000
 103 |      Java | prakash |   3600

Export Spark RDD into Cassandra Table

// Here we are going to Export inmemory RDD content into Cassandra


//Here we create some in memory collection objects

Start spark in one more terminal:
----------------------------------
$  spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._

scala> case class Emp(id:Int, Dept:String,Name:String, Salary:Int)
defined class Emp

scala> val ob1 = new Emp(121,"accounts","Hari",5000);
ob1: Emp = Emp(121,accounts,Hari,5000)

scala> val ob2 = new Emp(122,"HR","Rani",6000);
ob2: Emp = Emp(122,HR,Rani,6000)

scala> val ob3 = new Emp(123,"Marketing","Suresh",6500);
ob3: Emp = Emp(123,Marketing,Suresh,6500)

scala> val r1 = sc.makeRDD(Seq(ob1,ob2,ob3));
r1: org.apache.spark.rdd.RDD[Emp] = ParallelCollectionRDD[40] at makeRDD at <console>:33

scala> r1.collect.foreach(println);
Emp(121,accounts,Hari,5000)
Emp(122,HR,Rani,6000)
Emp(123,Marketing,Suresh,6500)

// Export RDD content into Cassandra
scala> r1.saveToCassandra("test1","employee");

// verify the newly inserted rows in Cassandra
cqlsh> select * from test1.employee;
-----+-----------+---------+--------
 id  | dept      | name    | salary
-----+-----------+---------+--------
 123 | Marketing |  Suresh |   6500 // newly inserted from spark to Cassandra
 122 |        HR |    Rani |   6000 // newly inserted from spark to Cassandra
 121 |  accounts |    Hari |   5000 // newly inserted from spark to Cassandra
 102 |     spark |  sakthi |   3500
 101 |   bigdata |    siva |   3000
 103 |      Java | prakash |   3600

Saturday, 23 February 2019

Cassandra and Spark Integration

//start cassandra server
$ sudo service cassandra start

$ sudo update-rc.d cassandra defaults


//start CLI for Cassandra
$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh>  describe cluster;

Cluster: Test Cluster
Partitioner: Murmur3Partitioner

cqlsh>  describe keyspaces;

system_traces  system_schema  system_auth  system  system_distributed


cqlsh> CREATE KEYSPACE people with replication = {'class':'SimpleStrategy','replication_factor':1};
cqlsh> use people;
cqlsh:people> describe people;

CREATE TABLE users (
          ... id varchar,
          ... first_name varchar,
          ... last_name varchar,
          ... city varchar,
          ... emails varchar,
          ... PRIMARY KEY (id));


cqlsh:people> insert into users(id,first_name,last_name,city,emails) values ('101','Sankara','narayanan','PLTR','sa@sa.com');


cqlsh:people> insert into users(id,first_name,last_name,city,emails) values ('102','Harish','Kalyan','CHN','ha@ka.in');


cqlsh:people> select * from users;

 id  | city | emails    | first_name | last_name
-----+------+-----------+------------+-----------
 102 |  CHN |  ha@ka.in |     Harish |    Kalyan
 101 | PLTR | sa@sa.com |    Sankara | narayanan

cqlsh:people> describe users;

CREATE TABLE people.users (
    id text PRIMARY KEY,
    city text,
    emails text,
    first_name text,
    last_name text
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';


//Get Spark Cassandra Connector from Maven repository:

<!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector -->
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.11</artifactId>
    <version>2.4.0</version>
</dependency>

Make this:
com.datastax.spark:spark-cassandra-connector_2.11:2.4.0
----------------------------------
It is download connector jar from : https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector_2.11/2.4.0/spark-cassandra-connector_2.11-2.4.0.jar
// https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.4.0"
----------------------------------



// Run Spark with the above package:
spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0



scala> import com.datastax.spark.connector._
scala> import org.apache.spark.SparkConf
scala> import org.apache.spark.SparkContext
scala> import org.apache.spark.SparkContext._

scala> val conf = new SparkConf().setMaster("local").setAppName("sample cassandra app").set("spark.cassandra.connection.host","localhost").set("spark.driver.allowMultipleContexts","true")

scala> val sc = new SparkContext(conf)
scala> val personRDD = sc.cassandraTable("people","users") // KeySpace, Table name

scala> personRDD.take(2).foreach(println)
CassandraRow{id: 101a, city: PLTR, emails: sa@sa.com, first_name: Sankara, last_name: narayanan}
CassandraRow{id: 102, city: CHN, emails: ha@ka.in, first_name: Harish, last_name: Kalyan}

scala> personRDD.count
res1: Long = 2                                                                 

scala> val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map("keyspace" -> "people", "table" -> "users")).load

scala> df.show
+---+----+---------+----------+---------+                                     
| id|city|   emails|first_name|last_name|
+---+----+---------+----------+---------+
|101|PLTR|sa@sa.com|   Sankara|narayanan|
|102| CHN| ha@ka.in|    Harish|   Kalyan|
+---+----+---------+----------+---------+

Cassandra Installation on Ubuntu Steps

$ sudo apt-get upgrade

$ sudo apt autoremove

$ sudo apt-key adv --keyserver pool.sks-keyservers.net --recv-key A278B781FE4B2BDA

$ sudo apt-get update

// verify Python version
$ python -V
Python 2.7.15+



Installing Casandra:
-----------------------
$ echo "deb http://www.apache.org/dist/cassandra/debian 311x main" | sudo tee -a /etc/apt/sources.list.d/cassandra.sources.list
deb http://www.apache.org/dist/cassandra/debian 311x main

$ curl https://www.apache.org/dist/cassandra/KEYS | sudo apt-key add -

$ sudo apt-get update

$ sudo apt-get install cassandra

//start cassandra server
$ sudo service cassandra start

$ sudo update-rc.d cassandra defaults

hadoop@hadoop:/usr/local$ nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  103.67 KiB  256          100.0%            a8c39288-3d56-4768-ba56-9910e9ce02e2  rack1

//start CLI for Cassandra
$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...