Wednesday, 27 February 2019

MySQL to RDD or Dataframe to Cassandra Integration

// MySQL Table creation with sample data insertion
hadoop@hadoop:~$ sudo mysql;

mysql> create database store;
Query OK, 1 row affected (0.00 sec)

mysql> use store;
Database changed

mysql> create table customer(id int primary key, name varchar(50), city varchar(50));
Query OK, 0 rows affected (0.03 sec)

mysql> insert into customer (id,name,city) values (100,'Sara','Karaikudi');
Query OK, 1 row affected (0.08 sec)

mysql> insert into customer (id,name,city) values (101,'Lara','Manachai');
Query OK, 1 row affected (0.02 sec)

mysql> insert into customer (id,name,city) values (102,'Kalai','Vadagudi');
Query OK, 1 row affected (0.00 sec)

mysql> select * from customer;
+-----+-------+-----------+
| id  | name  | city      |
+-----+-------+-----------+
| 100 | Sara  | Karaikudi |
| 101 | Lara  | Manachai  |
| 102 | Kalai | Vadagudi  |
+-----+-------+-----------+
3 rows in set (0.00 sec)


Create KeySpace and Table in Cassandra : Don't insert any rows there
--------------------------------------------------------------------
$ sudo service cassandra start

$ sudo update-rc.d cassandra defaults


//start CLI for Cassandra
$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh>  describe cluster;

Cluster: Test Cluster
Partitioner: Murmur3Partitioner

cqlsh> CREATE KEYSPACE store with replication = {'class':'SimpleStrategy','replication_factor':1};

cqlsh> use store;

cqlsh:store>  CREATE TABLE customer (id int,name varchar, city varchar, primary key (id));


         

// Spark Shell ->

$ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost


val dfcustomer = spark.read.format("jdbc").
      option("driver","com.mysql.jdbc.Driver").
      option("url","jdbc:mysql://localhost:3306").
      option("dbtable","store.customer").
      option("user","hadoop").
      option("password","hadoop").
      load()
 
dfcustomer.show()

+---+-----+---------+
| id| name|     city|
+---+-----+---------+
|100| Sara|Karaikudi|
|101| Lara| Manachai|
|102|Kalai| Vadagudi|
+---+-----+---------+

dfcustomer.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "customer", "keyspace" -> "store")).save()


// Here we verify the Cassandra data:
cqlsh:store> select * from customer;

 id  | city      | name
-----+-----------+-------
 100 | Karaikudi |  Sara
 102 |  Vadagudi | Kalai
 101 |  Manachai |  Lara

// delete all records
cqlsh:store> delete from customer where id in (100,101,102);


cqlsh:store> select * from customer;

 id | city | name
----+------+------


// Here we make RDD from Dataframe and save RDD + Case Class Schema => Cassandra Table
scala> case class customer(id:Int, name:String, city:String);
defined class customer

scala> val r1 = dfcustomer.rdd
r1: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at rdd at <console>:28

scala> r1.collect.foreach(println)
[100,Sara,Karaikudi]
[101,Lara,Manachai]
[102,Kalai,Vadagudi]

scala> val r = r1.map (x => {
     |        val id = x(0).toString.toInt
     |        val name = x(1).toString
     |        val city = x(2).toString
     |        customer(id,name,city)
     |        })
r: org.apache.spark.rdd.RDD[customer] = MapPartitionsRDD[10] at map at <console>:30

scala> r.collect.foreach(println)
customer(100,Sara,Karaikudi)
customer(101,Lara,Manachai)
customer(102,Kalai,Vadagudi)

// here we write RDD into Cassandra Table
scala> r.saveToCassandra("store","customer");


// Here we verify the Cassandra data:
cqlsh:store> select * from customer;

 id  | city      | name
-----+-----------+-------
 100 | Karaikudi |  Sara
 102 |  Vadagudi | Kalai
 101 |  Manachai |  Lara


----+------+------

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...