Sunday, 24 February 2019

Export Spark RDD into Cassandra Table

// Here we are going to Export inmemory RDD content into Cassandra


//Here we create some in memory collection objects

Start spark in one more terminal:
----------------------------------
$  spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._

scala> case class Emp(id:Int, Dept:String,Name:String, Salary:Int)
defined class Emp

scala> val ob1 = new Emp(121,"accounts","Hari",5000);
ob1: Emp = Emp(121,accounts,Hari,5000)

scala> val ob2 = new Emp(122,"HR","Rani",6000);
ob2: Emp = Emp(122,HR,Rani,6000)

scala> val ob3 = new Emp(123,"Marketing","Suresh",6500);
ob3: Emp = Emp(123,Marketing,Suresh,6500)

scala> val r1 = sc.makeRDD(Seq(ob1,ob2,ob3));
r1: org.apache.spark.rdd.RDD[Emp] = ParallelCollectionRDD[40] at makeRDD at <console>:33

scala> r1.collect.foreach(println);
Emp(121,accounts,Hari,5000)
Emp(122,HR,Rani,6000)
Emp(123,Marketing,Suresh,6500)

// Export RDD content into Cassandra
scala> r1.saveToCassandra("test1","employee");

// verify the newly inserted rows in Cassandra
cqlsh> select * from test1.employee;
-----+-----------+---------+--------
 id  | dept      | name    | salary
-----+-----------+---------+--------
 123 | Marketing |  Suresh |   6500 // newly inserted from spark to Cassandra
 122 |        HR |    Rani |   6000 // newly inserted from spark to Cassandra
 121 |  accounts |    Hari |   5000 // newly inserted from spark to Cassandra
 102 |     spark |  sakthi |   3500
 101 |   bigdata |    siva |   3000
 103 |      Java | prakash |   3600

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...