// MySQL Table creation with sample data insertion
hadoop@hadoop:~$ sudo mysql;
mysql> create database store;
Query OK, 1 row affected (0.00 sec)
mysql> use store;
Database changed
mysql> create table customer(id int primary key, name varchar(50), city varchar(50));
Query OK, 0 rows affected (0.03 sec)
mysql> insert into customer (id,name,city) values (100,'Sara','Karaikudi');
Query OK, 1 row affected (0.08 sec)
mysql> insert into customer (id,name,city) values (101,'Lara','Manachai');
Query OK, 1 row affected (0.02 sec)
mysql> insert into customer (id,name,city) values (102,'Kalai','Vadagudi');
Query OK, 1 row affected (0.00 sec)
mysql> select * from customer;
+-----+-------+-----------+
| id | name | city |
+-----+-------+-----------+
| 100 | Sara | Karaikudi |
| 101 | Lara | Manachai |
| 102 | Kalai | Vadagudi |
+-----+-------+-----------+
3 rows in set (0.00 sec)
Create KeySpace and Table in Cassandra : Don't insert any rows there
--------------------------------------------------------------------
$ sudo service cassandra start
$ sudo update-rc.d cassandra defaults
//start CLI for Cassandra
$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh> describe cluster;
Cluster: Test Cluster
Partitioner: Murmur3Partitioner
cqlsh> CREATE KEYSPACE store with replication = {'class':'SimpleStrategy','replication_factor':1};
cqlsh> use store;
cqlsh:store> CREATE TABLE customer (id int,name varchar, city varchar, primary key (id));
// Spark Shell ->
$ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost
val dfcustomer = spark.read.format("jdbc").
option("driver","com.mysql.jdbc.Driver").
option("url","jdbc:mysql://localhost:3306").
option("dbtable","store.customer").
option("user","hadoop").
option("password","hadoop").
load()
dfcustomer.show()
+---+-----+---------+
| id| name| city|
+---+-----+---------+
|100| Sara|Karaikudi|
|101| Lara| Manachai|
|102|Kalai| Vadagudi|
+---+-----+---------+
dfcustomer.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "customer", "keyspace" -> "store")).save()
// Here we verify the Cassandra data:
cqlsh:store> select * from customer;
id | city | name
-----+-----------+-------
100 | Karaikudi | Sara
102 | Vadagudi | Kalai
101 | Manachai | Lara
// delete all records
cqlsh:store> delete from customer where id in (100,101,102);
cqlsh:store> select * from customer;
id | city | name
----+------+------
// Here we make RDD from Dataframe and save RDD + Case Class Schema => Cassandra Table
scala> case class customer(id:Int, name:String, city:String);
defined class customer
scala> val r1 = dfcustomer.rdd
r1: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at rdd at <console>:28
scala> r1.collect.foreach(println)
[100,Sara,Karaikudi]
[101,Lara,Manachai]
[102,Kalai,Vadagudi]
scala> val r = r1.map (x => {
| val id = x(0).toString.toInt
| val name = x(1).toString
| val city = x(2).toString
| customer(id,name,city)
| })
r: org.apache.spark.rdd.RDD[customer] = MapPartitionsRDD[10] at map at <console>:30
scala> r.collect.foreach(println)
customer(100,Sara,Karaikudi)
customer(101,Lara,Manachai)
customer(102,Kalai,Vadagudi)
// here we write RDD into Cassandra Table
scala> r.saveToCassandra("store","customer");
// Here we verify the Cassandra data:
cqlsh:store> select * from customer;
id | city | name
-----+-----------+-------
100 | Karaikudi | Sara
102 | Vadagudi | Kalai
101 | Manachai | Lara
----+------+------
hadoop@hadoop:~$ sudo mysql;
mysql> create database store;
Query OK, 1 row affected (0.00 sec)
mysql> use store;
Database changed
mysql> create table customer(id int primary key, name varchar(50), city varchar(50));
Query OK, 0 rows affected (0.03 sec)
mysql> insert into customer (id,name,city) values (100,'Sara','Karaikudi');
Query OK, 1 row affected (0.08 sec)
mysql> insert into customer (id,name,city) values (101,'Lara','Manachai');
Query OK, 1 row affected (0.02 sec)
mysql> insert into customer (id,name,city) values (102,'Kalai','Vadagudi');
Query OK, 1 row affected (0.00 sec)
mysql> select * from customer;
+-----+-------+-----------+
| id | name | city |
+-----+-------+-----------+
| 100 | Sara | Karaikudi |
| 101 | Lara | Manachai |
| 102 | Kalai | Vadagudi |
+-----+-------+-----------+
3 rows in set (0.00 sec)
Create KeySpace and Table in Cassandra : Don't insert any rows there
--------------------------------------------------------------------
$ sudo service cassandra start
$ sudo update-rc.d cassandra defaults
//start CLI for Cassandra
$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh> describe cluster;
Cluster: Test Cluster
Partitioner: Murmur3Partitioner
cqlsh> CREATE KEYSPACE store with replication = {'class':'SimpleStrategy','replication_factor':1};
cqlsh> use store;
cqlsh:store> CREATE TABLE customer (id int,name varchar, city varchar, primary key (id));
// Spark Shell ->
$ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost
val dfcustomer = spark.read.format("jdbc").
option("driver","com.mysql.jdbc.Driver").
option("url","jdbc:mysql://localhost:3306").
option("dbtable","store.customer").
option("user","hadoop").
option("password","hadoop").
load()
dfcustomer.show()
+---+-----+---------+
| id| name| city|
+---+-----+---------+
|100| Sara|Karaikudi|
|101| Lara| Manachai|
|102|Kalai| Vadagudi|
+---+-----+---------+
dfcustomer.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "customer", "keyspace" -> "store")).save()
// Here we verify the Cassandra data:
cqlsh:store> select * from customer;
id | city | name
-----+-----------+-------
100 | Karaikudi | Sara
102 | Vadagudi | Kalai
101 | Manachai | Lara
// delete all records
cqlsh:store> delete from customer where id in (100,101,102);
cqlsh:store> select * from customer;
id | city | name
----+------+------
// Here we make RDD from Dataframe and save RDD + Case Class Schema => Cassandra Table
scala> case class customer(id:Int, name:String, city:String);
defined class customer
scala> val r1 = dfcustomer.rdd
r1: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at rdd at <console>:28
scala> r1.collect.foreach(println)
[100,Sara,Karaikudi]
[101,Lara,Manachai]
[102,Kalai,Vadagudi]
scala> val r = r1.map (x => {
| val id = x(0).toString.toInt
| val name = x(1).toString
| val city = x(2).toString
| customer(id,name,city)
| })
r: org.apache.spark.rdd.RDD[customer] = MapPartitionsRDD[10] at map at <console>:30
scala> r.collect.foreach(println)
customer(100,Sara,Karaikudi)
customer(101,Lara,Manachai)
customer(102,Kalai,Vadagudi)
// here we write RDD into Cassandra Table
scala> r.saveToCassandra("store","customer");
// Here we verify the Cassandra data:
cqlsh:store> select * from customer;
id | city | name
-----+-----------+-------
100 | Karaikudi | Sara
102 | Vadagudi | Kalai
101 | Manachai | Lara
----+------+------
No comments:
Post a Comment