Saturday, 8 August 2020
SQOOP Export with Staging table Example - HDFS to MySQL data export using SQOOP with staging-table
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> show tables;
+---------------------+
| Tables_in_retail_db |
+---------------------+
| categories |
| customers |
| departments |
| order_items |
| orders |
| products |
+---------------------+
6 rows in set (0.00 sec)
mysql> describe orders;
+-------------------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------------+-------------+------+-----+---------+----------------+
| order_id | int(11) | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| order_customer_id | int(11) | NO | | NULL | |
| order_status | varchar(45) | NO | | NULL | |
+-------------------+-------------+------+-----+---------+----------------+
// create a staging table with the same structure of orders table
mysql> create table retail_db.orders_staging as select * from retail_db.orders where 1 = 0;
Query OK, 0 rows affected (0.10 sec)
Records: 0 Duplicates: 0 Warnings: 0
// create a new table : orders_exported which has the same structure of retail_db.orders table
mysql> create table retail_db.orders_exported as select * from retail_db.orders where 1 = 0;
Query OK, 0 rows affected (0.00 sec)
Records: 0 Duplicates: 0 Warnings: 0
// verify the structure of newly created orders_staging table
mysql> describe orders_staging;
+-------------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------------+-------------+------+-----+---------+-------+
| order_id | int(11) | NO | | 0 | |
| order_date | datetime | NO | | NULL | |
| order_customer_id | int(11) | NO | | NULL | |
| order_status | varchar(45) | NO | | NULL | |
+-------------------+-------------+------+-----+---------+-------+
4 rows in set (0.00 sec)
// I already have the orders data in HDFS
$ hdfs dfs -ls /user/cloudera/orders
Found 5 items
-rw-r--r-- 1 cloudera cloudera 0 2020-08-06 05:52 /user/cloudera/orders/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 741614 2020-08-06 05:52 /user/cloudera/orders/part-m-00000
-rw-r--r-- 1 cloudera cloudera 753022 2020-08-06 05:52 /user/cloudera/orders/part-m-00001
-rw-r--r-- 1 cloudera cloudera 752368 2020-08-06 05:52 /user/cloudera/orders/part-m-00002
-rw-r--r-- 1 cloudera cloudera 752940 2020-08-06 05:52 /user/cloudera/orders/part-m-00003
// Export HDFS data into MySQL table using Staging stable approach
sqoop export \
-connect "jdbc:mysql://localhost/retail_db" \
-username root \
-password cloudera \
-table orders_exported \
-export-dir hdfs://localhost:8020/user/cloudera/orders/ \
-staging-table orders_staging \
-clear-staging-table \
-input-fields-terminated-by ','
20/08/08 00:24:45 INFO mapreduce.ExportJobBase: Exported 68883 records.
20/08/08 00:24:45 INFO mapreduce.ExportJobBase: Starting to migrate data from staging table to destination.
20/08/08 00:24:45 INFO manager.SqlManager: Migrated 68883 records from `orders_staging` to `orders_exported`
mysql> select * from retail_db.orders_exported limit 5;
+----------+---------------------+-------------------+-----------------+
| order_id | order_date | order_customer_id | order_status |
+----------+---------------------+-------------------+-----------------+
| 1 | 2013-07-25 00:00:00 | 11599 | CLOSED |
| 2 | 2013-07-25 00:00:00 | 256 | PENDING_PAYMENT |
| 3 | 2013-07-25 00:00:00 | 12111 | COMPLETE |
| 4 | 2013-07-25 00:00:00 | 8827 | CLOSED |
| 5 | 2013-07-25 00:00:00 | 11318 | COMPLETE |
+----------+---------------------+-------------------+-----------------+
5 rows in set (0.00 sec)
mysql> select count(1) from retail_db.orders_exported limit 5;
+----------+
| count(1) |
+----------+
| 68883 |
+----------+
1 row in set (0.01 sec)
Line count in HDFS:
$ hdfs dfs -cat hdfs://localhost:8020/user/cloudera/orders/* | wc -l
68883
Friday, 7 August 2020
Create, Execute, Delete SQOOP Jobs
// I have a ohm.person table in MySQL with the following records
mysql> select * from ohm.person;
+----+-----------------+---------------------+
| id | name | last_mod_dt |
+----+-----------------+---------------------+
| 1 | Raja | 2020-08-07 01:17:17 |
| 2 | Ravi | 2020-08-07 01:17:30 |
| 3 | Kalai | 2020-08-07 01:17:34 |
| 4 | Sudha | 2020-08-07 01:17:39 |
| 5 | Priya | 2020-08-07 01:31:28 |
| 6 | Vanitha | 2020-08-07 01:31:34 |
| 7 | Kasturi | 2020-08-07 01:31:40 |
| 8 | Lakshmi | 2020-08-07 01:31:45 |
| 9 | Suriya Devi | 2020-08-07 01:31:51 |
| 10 | Nanjil Vijayan | 2020-08-07 01:40:53 |
| 11 | Elizabeth Helen | 2020-08-07 01:41:14 |
| 12 | Peter Paul | 2020-08-07 01:41:20 |
| 13 | Ravindran | 2020-08-07 01:41:35 |
+----+-----------------+---------------------+
// Create a new Sqoop Job:
$ sqoop job \
-create person_inc_job \
-- import \
-connect jdbc:mysql://localhost:3306/ohm \
-table person \
-username root \
-password cloudera \
-check-column last_mod_dt \
-incremental append \
-last-value '01:17:17' \
-target-dir /user/cloudera/person_test \
-m 1
// Display all job names already created
$ sqoop job --list
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
20/08/07 02:05:54 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
Available jobs:
person_inc_job
// delete the target folder if exists
$ hdfs dfs -rm -r /user/cloudera/person_test
rm: `/user/cloudera/person_test': No such file or directory
// run the sqoop job for the first time
$ sqoop job --exec person_inc_job
Retrieved 13 records.
$ hdfs dfs -cat /user/cloudera/person_test/*
1,Raja,2020-08-07 01:17:17.0
2,Ravi,2020-08-07 01:17:30.0
3,Kalai,2020-08-07 01:17:34.0
4,Sudha,2020-08-07 01:17:39.0
5,Priya,2020-08-07 01:31:28.0
6,Vanitha,2020-08-07 01:31:34.0
7,Kasturi,2020-08-07 01:31:40.0
8,Lakshmi,2020-08-07 01:31:45.0
9,Suriya Devi,2020-08-07 01:31:51.0
10,Nanjil Vijayan,2020-08-07 01:40:53.0
11,Elizabeth Helen,2020-08-07 01:41:14.0
12,Peter Paul,2020-08-07 01:41:20.0
13,Ravindran,2020-08-07 01:41:35.0
// Now again add some more new records in MySQL
mysql> insert into ohm.person(name) values ('Anbu');
Query OK, 1 row affected (0.00 sec)
mysql> insert into ohm.person(name) values ('Sudha');
Query OK, 1 row affected (0.01 sec)
mysql> insert into ohm.person(name) values ('Aish');
Query OK, 1 row affected (0.00 sec)
mysql> insert into ohm.person(name) values ('Vijay');
Query OK, 1 row affected (0.00 sec)
mysql> insert into ohm.person(name) values ('Balaji');
Query OK, 1 row affected (0.01 sec)
mysql> select * from ohm.person; -- MySQL console
+----+-----------------+---------------------+
| id | name | last_mod_dt |
+----+-----------------+---------------------+
| 1 | Raja | 2020-08-07 01:17:17 |
| 2 | Ravi | 2020-08-07 01:17:30 |
| 3 | Kalai | 2020-08-07 01:17:34 |
| 4 | Sudha | 2020-08-07 01:17:39 |
| 5 | Priya | 2020-08-07 01:31:28 |
| 6 | Vanitha | 2020-08-07 01:31:34 |
| 7 | Kasturi | 2020-08-07 01:31:40 |
| 8 | Lakshmi | 2020-08-07 01:31:45 |
| 9 | Suriya Devi | 2020-08-07 01:31:51 |
| 10 | Nanjil Vijayan | 2020-08-07 01:40:53 |
| 11 | Elizabeth Helen | 2020-08-07 01:41:14 |
| 12 | Peter Paul | 2020-08-07 01:41:20 |
| 13 | Ravindran | 2020-08-07 01:41:35 | -- last value
| 14 | Anbu | 2020-08-07 02:23:25 | -- newly added records
| 15 | Sudha | 2020-08-07 02:23:29 |
| 16 | Aish | 2020-08-07 02:23:36 |
| 17 | Vijay | 2020-08-07 02:23:44 |
| 18 | Balaji | 2020-08-07 02:23:47 |
+----+-----------------+---------------------+
18 rows in set (0.00 sec)
// run the sqoop job again to fetch incremental records
$ sqoop job --exec person_inc_job
Retrieved 5 records.
// Display the imported data available in HDFS
$ hdfs dfs -cat /user/cloudera/person_test/*
1,Raja,2020-08-07 01:17:17.0
2,Ravi,2020-08-07 01:17:30.0
3,Kalai,2020-08-07 01:17:34.0
4,Sudha,2020-08-07 01:17:39.0
5,Priya,2020-08-07 01:31:28.0
6,Vanitha,2020-08-07 01:31:34.0
7,Kasturi,2020-08-07 01:31:40.0
8,Lakshmi,2020-08-07 01:31:45.0
9,Suriya Devi,2020-08-07 01:31:51.0
10,Nanjil Vijayan,2020-08-07 01:40:53.0
11,Elizabeth Helen,2020-08-07 01:41:14.0
12,Peter Paul,2020-08-07 01:41:20.0
13,Ravindran,2020-08-07 01:41:35.0 -- loaded from FULL LOAD
14,Anbu,2020-08-07 02:23:25.0 -- incremental records
15,Sudha,2020-08-07 02:23:29.0
16,Aish,2020-08-07 02:23:36.0
17,Vijay,2020-08-07 02:23:44.0
18,Balaji,2020-08-07 02:23:47.0
// To delete the sqoop job :
$ sqoop job --delete person_inc_job;
Monday, 3 August 2020
SQOOP Export Example : Hive to MySQL using SQOOP Export
Incremental append : MySQL to HDFS
List Databases, Tables and Display the Structure of a table using SQOOP
Import data from MySQL to Hive with the help of SQOOP.
Friday, 31 July 2020
Import a table from MySQL to Hbase using SQOOP Import
Import a table from MySQL to Hive using SQOOP Import
Sunday, 24 May 2020
MySQL, Derby Metastore configuration for Hive
Tuesday, 19 May 2020
StructField, StructType Example in PySpark. Create Dataframe using in memory object in PySpark
Read Json and Write into MySQL table using PySpark
Saturday, 16 May 2020
PySpark with MySQL Connection - Example
+----------+---------+--------+---------+ |rollnumber|firstname|lastname| city| +----------+---------+--------+---------+ | 3| Rajini| Kanth|Bangalore| +----------+---------+--------+---------+
in Ubuntu:Start Pyspark with jars option pyspark --jars /home/hadoop/apache-hive-3.1.2-bin/lib/mysql-connector-java-8.0.20.jar from pyspark.sql import SparkSession sparkdriver = SparkSession.builder.master("local").appName("demoApp").\ config("spark.jars.packages","mysql:mysql-connector-java-8.0.20").\ getOrCreate() df = sparkdriver.read.format("jdbc").\ option("url","jdbc:mysql://localhost:3306").\ option("driver","com.mysql.jdbc.Driver").\ option("user","hadoop").\ option("password","p@ssw0rd").\ option("query","select * from school.student").\ load() df.show() +---+-----+---------+ | id| name| city| +---+-----+---------+ |100| Raja|Pallathur| |101|Silva|Singapore| +---+-----+---------+
Monday, 11 May 2020
Python with MySQL database programs
Saturday, 16 March 2019
Json Input To Kafka Broker to Spark Streaming to MySQL using KafkaProducer, kafkaUtils.CreateStream
// Here we read a json input file from file system and put it into Kafka Broker
// Spark Streaming fetch messages from Kafka Broker and write it into MySQL table
input file:
nameList.json:
--------------
{"id":992,"name":"Herman","city":"Iles","country":"Colombia","Skills":"CVE"},
{"id":993,"name":"Burton","city":"Santo Tomas","country":"Philippines","Skills":"VMware vSphere"},
{"id":994,"name":"Correna","city":"Shirgjan","country":"Albania","Skills":"Wyse"},
{"id":995,"name":"Cathi","city":"Dorūd","country":"Iran","Skills":"SSCP"},
{"id":996,"name":"Lena","city":"Al Judayrah","country":"Palestinian Territory","Skills":"Commercial Kitchen Design"},
{"id":997,"name":"Madalena","city":"Livadiya","country":"Ukraine","Skills":"Software Development"},
{"id":998,"name":"Jo-anne","city":"Khatsyezhyna","country":"Belarus","Skills":"TPD"},
{"id":999,"name":"Georgi","city":"Pasuruan","country":"Indonesia","Skills":"Project Engineering"},
{"id":1000,"name":"Scott","city":"Gyumri","country":"Armenia","Skills":"RHEV"}
start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties
start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic jsonTopic --partitions 1 --replication-factor 1 --zookeeper localhost:2182
//view the topics available in Kafka Broker
hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2182
jsonTopic
// create a database and table in MySQL:
$ mysql -uroot -pcloudera
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 4
Server version: 5.7.25-0ubuntu0.18.10.2 (Ubuntu)
Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> create database KafkaDB;
Query OK, 1 row affected (0.05 sec)
mysql> use KafkaDB;
Database changed
// create a table
mysql> create table jsonTable (id int, name varchar(50), city varchar(50), country varchar(50), Skills varchar(50));
Query OK, 0 rows affected (0.20 sec)
//Produce reads json file and publish them in kafka topic
Producer Programming in Scala:
-------------------------------
JsonProducer.scala:
-------------------------
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}
object JsonProducer {
def main(args:Array[String]):Unit = {
val props = new Properties()
props.put("bootstrap.servers","localhost:9092")
props.put("acks","all")
props.put("client.id","ProducerApp")
props.put("retries","4")
props.put("batch.size","32768")
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
val topic = "jsonTopic"
val producer = new KafkaProducer[String,String](props)
val file = scala.io.Source.fromFile("/home/cloudera/Desktop/vow/nameList.json")
for (line <- file.getLines()) {
val msg = new ProducerRecord[String,String](topic,line)
producer.send(msg)
}
producer.close()
}
}
[cloudera@quickstart kafka]$ spark-shell
scala> sc.stop()
scala> :load JsonProducer.scala
scala> JsonProducer.main(null)
hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic jsonTopic --bootstrap-server localhost:9092 --from-beginning
{"id":1,"name":"Sharline","city":"Uppsala","country":"Sweden","Skills":"Eyelash Extensions"},
{"id":2,"name":"Marris","city":"São Domingos","country":"Cape Verde","Skills":"VMI Programs"},
{"id":3,"name":"Gregg","city":"Qaxbaş","country":"Azerbaijan","Skills":"Historical Research"},
{"id":4,"name":"Christye","city":"Guarapari","country":"Brazil","Skills":"Army"},
{"id":5,"name":"Modesta","city":"Paltamo","country":"Finland","Skills":"Avaya Technologies"},
Kafka2MySQLStreaming.scala:
-----------------------------
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
import java.util.Properties
import org.apache.spark.sql.{SQLContext,SaveMode}
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}
object Kafka2MySQLStreaming{
def main(args:Array[String]):Unit ={
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("SparkStreamingJson").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlc = new SQLContext(sc)
val batchInterval = 20
val zk = "localhost:2182"
val consumerGroupID = "jsonGroup"
val topic = "jsonTopic"
val partition = 1
val perTopicPartitions = Map(topic -> partition)
val ssc = new StreamingContext(sc,Seconds(batchInterval))
val KafkaData = KafkaUtils.createStream(ssc,zk,consumerGroupID,perTopicPartitions)
val ks = KafkaData.map (x => x._2)
ks.foreachRDD { x =>
val df = sqlc.read.json(x)
val props = new Properties()
props.put("driver","com.mysql.jdbc.Driver")
props.put("user","root")
props.put("password","cloudera")
df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/KafkaDB","jsonTable",props)
df.count()
}
ssc.start()
ssc.awaitTermination()
}
}
[cloudera@quickstart kafka]$ spark-shell
scala> sc.stop()
scala> :load Kafka2MySQLStreaming.scala
scala> Kafka2MySQLStreaming.main(null)
// MySQL result:
--------------
[cloudera@quickstart kafka]$ mysql -uroot -pcloudera
mysql> use KafkaDB;
mysql> select * from jsonTable;
+------+------------+--------------------------+-----------------------+-------------------------------+
| id | name | city | country | Skills |
+------+------------+--------------------------+-----------------------+-------------------------------+
| 1 | Sharline | Uppsala | Sweden | Eyelash Extensions |
| 2 | Marris | São Domingos | Cape Verde | VMI Programs |
| 3 | Gregg | Qaxbaş | Azerbaijan | Historical Research |
| 4 | Christye | Guarapari | Brazil | Army |
| 5 | Modesta | Paltamo | Finland | Avaya Technologies |
Wednesday, 27 February 2019
MySQL to RDD or Dataframe to Cassandra Integration
hadoop@hadoop:~$ sudo mysql;
mysql> create database store;
Query OK, 1 row affected (0.00 sec)
mysql> use store;
Database changed
mysql> create table customer(id int primary key, name varchar(50), city varchar(50));
Query OK, 0 rows affected (0.03 sec)
mysql> insert into customer (id,name,city) values (100,'Sara','Karaikudi');
Query OK, 1 row affected (0.08 sec)
mysql> insert into customer (id,name,city) values (101,'Lara','Manachai');
Query OK, 1 row affected (0.02 sec)
mysql> insert into customer (id,name,city) values (102,'Kalai','Vadagudi');
Query OK, 1 row affected (0.00 sec)
mysql> select * from customer;
+-----+-------+-----------+
| id | name | city |
+-----+-------+-----------+
| 100 | Sara | Karaikudi |
| 101 | Lara | Manachai |
| 102 | Kalai | Vadagudi |
+-----+-------+-----------+
3 rows in set (0.00 sec)
Create KeySpace and Table in Cassandra : Don't insert any rows there
--------------------------------------------------------------------
$ sudo service cassandra start
$ sudo update-rc.d cassandra defaults
//start CLI for Cassandra
$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh> describe cluster;
Cluster: Test Cluster
Partitioner: Murmur3Partitioner
cqlsh> CREATE KEYSPACE store with replication = {'class':'SimpleStrategy','replication_factor':1};
cqlsh> use store;
cqlsh:store> CREATE TABLE customer (id int,name varchar, city varchar, primary key (id));
// Spark Shell ->
$ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost
val dfcustomer = spark.read.format("jdbc").
option("driver","com.mysql.jdbc.Driver").
option("url","jdbc:mysql://localhost:3306").
option("dbtable","store.customer").
option("user","hadoop").
option("password","hadoop").
load()
dfcustomer.show()
+---+-----+---------+
| id| name| city|
+---+-----+---------+
|100| Sara|Karaikudi|
|101| Lara| Manachai|
|102|Kalai| Vadagudi|
+---+-----+---------+
dfcustomer.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "customer", "keyspace" -> "store")).save()
// Here we verify the Cassandra data:
cqlsh:store> select * from customer;
id | city | name
-----+-----------+-------
100 | Karaikudi | Sara
102 | Vadagudi | Kalai
101 | Manachai | Lara
// delete all records
cqlsh:store> delete from customer where id in (100,101,102);
cqlsh:store> select * from customer;
id | city | name
----+------+------
// Here we make RDD from Dataframe and save RDD + Case Class Schema => Cassandra Table
scala> case class customer(id:Int, name:String, city:String);
defined class customer
scala> val r1 = dfcustomer.rdd
r1: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at rdd at <console>:28
scala> r1.collect.foreach(println)
[100,Sara,Karaikudi]
[101,Lara,Manachai]
[102,Kalai,Vadagudi]
scala> val r = r1.map (x => {
| val id = x(0).toString.toInt
| val name = x(1).toString
| val city = x(2).toString
| customer(id,name,city)
| })
r: org.apache.spark.rdd.RDD[customer] = MapPartitionsRDD[10] at map at <console>:30
scala> r.collect.foreach(println)
customer(100,Sara,Karaikudi)
customer(101,Lara,Manachai)
customer(102,Kalai,Vadagudi)
// here we write RDD into Cassandra Table
scala> r.saveToCassandra("store","customer");
// Here we verify the Cassandra data:
cqlsh:store> select * from customer;
id | city | name
-----+-----------+-------
100 | Karaikudi | Sara
102 | Vadagudi | Kalai
101 | Manachai | Lara
----+------+------
Flume - Simple Demo
// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...
-
How to fetch Spark Application Id programmaticall while running the Spark Job? scala> spark.sparkContext.applicationId res124: String = l...
-
input data: ---------- customerID, itemID, amount 44,8602,37.19 35,5368,65.89 2,3391,40.64 47,6694,14.98 29,680,13.08 91,8900,24.59 ...
-
pattern matching is similar to switch statements in C#, Java no fall-through - at least one condition matched no breaks object PatternExa { ...