Saturday, 8 August 2020
SQOOP Export with Staging table Example - HDFS to MySQL data export using SQOOP with staging-table
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> show tables;
+---------------------+
| Tables_in_retail_db |
+---------------------+
| categories |
| customers |
| departments |
| order_items |
| orders |
| products |
+---------------------+
6 rows in set (0.00 sec)
mysql> describe orders;
+-------------------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------------+-------------+------+-----+---------+----------------+
| order_id | int(11) | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| order_customer_id | int(11) | NO | | NULL | |
| order_status | varchar(45) | NO | | NULL | |
+-------------------+-------------+------+-----+---------+----------------+
// create a staging table with the same structure of orders table
mysql> create table retail_db.orders_staging as select * from retail_db.orders where 1 = 0;
Query OK, 0 rows affected (0.10 sec)
Records: 0 Duplicates: 0 Warnings: 0
// create a new table : orders_exported which has the same structure of retail_db.orders table
mysql> create table retail_db.orders_exported as select * from retail_db.orders where 1 = 0;
Query OK, 0 rows affected (0.00 sec)
Records: 0 Duplicates: 0 Warnings: 0
// verify the structure of newly created orders_staging table
mysql> describe orders_staging;
+-------------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------------+-------------+------+-----+---------+-------+
| order_id | int(11) | NO | | 0 | |
| order_date | datetime | NO | | NULL | |
| order_customer_id | int(11) | NO | | NULL | |
| order_status | varchar(45) | NO | | NULL | |
+-------------------+-------------+------+-----+---------+-------+
4 rows in set (0.00 sec)
// I already have the orders data in HDFS
$ hdfs dfs -ls /user/cloudera/orders
Found 5 items
-rw-r--r-- 1 cloudera cloudera 0 2020-08-06 05:52 /user/cloudera/orders/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 741614 2020-08-06 05:52 /user/cloudera/orders/part-m-00000
-rw-r--r-- 1 cloudera cloudera 753022 2020-08-06 05:52 /user/cloudera/orders/part-m-00001
-rw-r--r-- 1 cloudera cloudera 752368 2020-08-06 05:52 /user/cloudera/orders/part-m-00002
-rw-r--r-- 1 cloudera cloudera 752940 2020-08-06 05:52 /user/cloudera/orders/part-m-00003
// Export HDFS data into MySQL table using Staging stable approach
sqoop export \
-connect "jdbc:mysql://localhost/retail_db" \
-username root \
-password cloudera \
-table orders_exported \
-export-dir hdfs://localhost:8020/user/cloudera/orders/ \
-staging-table orders_staging \
-clear-staging-table \
-input-fields-terminated-by ','
20/08/08 00:24:45 INFO mapreduce.ExportJobBase: Exported 68883 records.
20/08/08 00:24:45 INFO mapreduce.ExportJobBase: Starting to migrate data from staging table to destination.
20/08/08 00:24:45 INFO manager.SqlManager: Migrated 68883 records from `orders_staging` to `orders_exported`
mysql> select * from retail_db.orders_exported limit 5;
+----------+---------------------+-------------------+-----------------+
| order_id | order_date | order_customer_id | order_status |
+----------+---------------------+-------------------+-----------------+
| 1 | 2013-07-25 00:00:00 | 11599 | CLOSED |
| 2 | 2013-07-25 00:00:00 | 256 | PENDING_PAYMENT |
| 3 | 2013-07-25 00:00:00 | 12111 | COMPLETE |
| 4 | 2013-07-25 00:00:00 | 8827 | CLOSED |
| 5 | 2013-07-25 00:00:00 | 11318 | COMPLETE |
+----------+---------------------+-------------------+-----------------+
5 rows in set (0.00 sec)
mysql> select count(1) from retail_db.orders_exported limit 5;
+----------+
| count(1) |
+----------+
| 68883 |
+----------+
1 row in set (0.01 sec)
Line count in HDFS:
$ hdfs dfs -cat hdfs://localhost:8020/user/cloudera/orders/* | wc -l
68883
Friday, 7 August 2020
Import all the tables from MySQL to Hive with Snappy Compression and Parquet file creation using SQOOP
Here we are going to import all the tables of retail_db database of MySQL into Hive.
Additional options we have used are : Snappy Compression, Save as Parquet file
// Delete the database in Hive
hive> drop database ohm cascade;
OK
Time taken: 1.187 seconds
hive> create database ohm;
OK
sqoop import-all-tables \
-connect jdbc:mysql://localhost:3306/retail_db \
-username root \
-password cloudera \
-warehouse-dir /user/hive/warehouse \
-m 1 \
-hive-database ohm \
-hive-import \
-hive-overwrite \
-create-hive-table \
-compress \
-compression-codec snappy \
-as-parquetfile \
-outdir java_out ;
hive> use ohm;
OK
Time taken: 0.01 seconds
hive> show tables;
OK
categories
customers
departments
order_items
orders
products
Time taken: 0.029 seconds, Fetched: 6 row(s)
hive> describe database ohm;
OK
ohm hdfs://quickstart.cloudera:8020/user/hive/warehouse/ohm.db cloudera USER
Time taken: 0.011 seconds, Fetched: 1 row(s)
Create, Execute, Delete SQOOP Jobs
// I have a ohm.person table in MySQL with the following records
mysql> select * from ohm.person;
+----+-----------------+---------------------+
| id | name | last_mod_dt |
+----+-----------------+---------------------+
| 1 | Raja | 2020-08-07 01:17:17 |
| 2 | Ravi | 2020-08-07 01:17:30 |
| 3 | Kalai | 2020-08-07 01:17:34 |
| 4 | Sudha | 2020-08-07 01:17:39 |
| 5 | Priya | 2020-08-07 01:31:28 |
| 6 | Vanitha | 2020-08-07 01:31:34 |
| 7 | Kasturi | 2020-08-07 01:31:40 |
| 8 | Lakshmi | 2020-08-07 01:31:45 |
| 9 | Suriya Devi | 2020-08-07 01:31:51 |
| 10 | Nanjil Vijayan | 2020-08-07 01:40:53 |
| 11 | Elizabeth Helen | 2020-08-07 01:41:14 |
| 12 | Peter Paul | 2020-08-07 01:41:20 |
| 13 | Ravindran | 2020-08-07 01:41:35 |
+----+-----------------+---------------------+
// Create a new Sqoop Job:
$ sqoop job \
-create person_inc_job \
-- import \
-connect jdbc:mysql://localhost:3306/ohm \
-table person \
-username root \
-password cloudera \
-check-column last_mod_dt \
-incremental append \
-last-value '01:17:17' \
-target-dir /user/cloudera/person_test \
-m 1
// Display all job names already created
$ sqoop job --list
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
20/08/07 02:05:54 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
Available jobs:
person_inc_job
// delete the target folder if exists
$ hdfs dfs -rm -r /user/cloudera/person_test
rm: `/user/cloudera/person_test': No such file or directory
// run the sqoop job for the first time
$ sqoop job --exec person_inc_job
Retrieved 13 records.
$ hdfs dfs -cat /user/cloudera/person_test/*
1,Raja,2020-08-07 01:17:17.0
2,Ravi,2020-08-07 01:17:30.0
3,Kalai,2020-08-07 01:17:34.0
4,Sudha,2020-08-07 01:17:39.0
5,Priya,2020-08-07 01:31:28.0
6,Vanitha,2020-08-07 01:31:34.0
7,Kasturi,2020-08-07 01:31:40.0
8,Lakshmi,2020-08-07 01:31:45.0
9,Suriya Devi,2020-08-07 01:31:51.0
10,Nanjil Vijayan,2020-08-07 01:40:53.0
11,Elizabeth Helen,2020-08-07 01:41:14.0
12,Peter Paul,2020-08-07 01:41:20.0
13,Ravindran,2020-08-07 01:41:35.0
// Now again add some more new records in MySQL
mysql> insert into ohm.person(name) values ('Anbu');
Query OK, 1 row affected (0.00 sec)
mysql> insert into ohm.person(name) values ('Sudha');
Query OK, 1 row affected (0.01 sec)
mysql> insert into ohm.person(name) values ('Aish');
Query OK, 1 row affected (0.00 sec)
mysql> insert into ohm.person(name) values ('Vijay');
Query OK, 1 row affected (0.00 sec)
mysql> insert into ohm.person(name) values ('Balaji');
Query OK, 1 row affected (0.01 sec)
mysql> select * from ohm.person; -- MySQL console
+----+-----------------+---------------------+
| id | name | last_mod_dt |
+----+-----------------+---------------------+
| 1 | Raja | 2020-08-07 01:17:17 |
| 2 | Ravi | 2020-08-07 01:17:30 |
| 3 | Kalai | 2020-08-07 01:17:34 |
| 4 | Sudha | 2020-08-07 01:17:39 |
| 5 | Priya | 2020-08-07 01:31:28 |
| 6 | Vanitha | 2020-08-07 01:31:34 |
| 7 | Kasturi | 2020-08-07 01:31:40 |
| 8 | Lakshmi | 2020-08-07 01:31:45 |
| 9 | Suriya Devi | 2020-08-07 01:31:51 |
| 10 | Nanjil Vijayan | 2020-08-07 01:40:53 |
| 11 | Elizabeth Helen | 2020-08-07 01:41:14 |
| 12 | Peter Paul | 2020-08-07 01:41:20 |
| 13 | Ravindran | 2020-08-07 01:41:35 | -- last value
| 14 | Anbu | 2020-08-07 02:23:25 | -- newly added records
| 15 | Sudha | 2020-08-07 02:23:29 |
| 16 | Aish | 2020-08-07 02:23:36 |
| 17 | Vijay | 2020-08-07 02:23:44 |
| 18 | Balaji | 2020-08-07 02:23:47 |
+----+-----------------+---------------------+
18 rows in set (0.00 sec)
// run the sqoop job again to fetch incremental records
$ sqoop job --exec person_inc_job
Retrieved 5 records.
// Display the imported data available in HDFS
$ hdfs dfs -cat /user/cloudera/person_test/*
1,Raja,2020-08-07 01:17:17.0
2,Ravi,2020-08-07 01:17:30.0
3,Kalai,2020-08-07 01:17:34.0
4,Sudha,2020-08-07 01:17:39.0
5,Priya,2020-08-07 01:31:28.0
6,Vanitha,2020-08-07 01:31:34.0
7,Kasturi,2020-08-07 01:31:40.0
8,Lakshmi,2020-08-07 01:31:45.0
9,Suriya Devi,2020-08-07 01:31:51.0
10,Nanjil Vijayan,2020-08-07 01:40:53.0
11,Elizabeth Helen,2020-08-07 01:41:14.0
12,Peter Paul,2020-08-07 01:41:20.0
13,Ravindran,2020-08-07 01:41:35.0 -- loaded from FULL LOAD
14,Anbu,2020-08-07 02:23:25.0 -- incremental records
15,Sudha,2020-08-07 02:23:29.0
16,Aish,2020-08-07 02:23:36.0
17,Vijay,2020-08-07 02:23:44.0
18,Balaji,2020-08-07 02:23:47.0
// To delete the sqoop job :
$ sqoop job --delete person_inc_job;
Incremental Import in SQOOP with Date column
// Create a database named : ohm
// Create a table named : person with id (auto increment), name string, last_mod_dt (default value is current_timestamp)
mysql> create database if not exists ohm;
Query OK, 1 row affected (0.00 sec)
mysql> use ohm;
CREATE TABLE if not exists ohm.person (
id BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
name VARCHAR(500) NOT NULL,
last_mod_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE KEY person (last_mod_dt)
) ;
mysql> describe ohm.person;
+-------------+---------------------+------+-----+-------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+---------------------+------+-----+-------------------+----------------+
| id | bigint(20) unsigned | NO | PRI | NULL | auto_increment |
| name | varchar(500) | NO | | NULL | |
| last_mod_dt | timestamp | NO | UNI | CURRENT_TIMESTAMP | |
+-------------+---------------------+------+-----+-------------------+----------------+
// Add 4 records
mysql> insert into ohm.person(name) values ('Raja');
Query OK, 1 row affected (0.00 sec)
mysql> insert into ohm.person(name) values ('Ravi');
Query OK, 1 row affected (0.00 sec)
mysql> insert into ohm.person(name) values ('Kalai');
Query OK, 1 row affected (0.00 sec)
mysql> insert into ohm.person(name) values ('Sudha');
Query OK, 1 row affected (0.00 sec)
mysql> insert into ohm.person(name) values ('Priya');
Query OK, 1 row affected (0.00 sec)
mysql> insert into ohm.person(name) values ('Vanitha');
Query OK, 1 row affected (0.00 sec)
mysql> insert into ohm.person(name) values ('Kasturi');
Query OK, 1 row affected (0.00 sec)
mysql> insert into ohm.person(name) values ('Lakshmi');
Query OK, 1 row affected (0.00 sec)
mysql> insert into ohm.person(name) values ('Suriya Devi');
mysql> select * from ohm.person;
+----+-------------+---------------------+
| id | name | last_mod_dt |
+----+-------------+---------------------+
| 1 | Raja | 2020-08-07 01:17:17 |
| 2 | Ravi | 2020-08-07 01:17:30 |
| 3 | Kalai | 2020-08-07 01:17:34 |
| 4 | Sudha | 2020-08-07 01:17:39 |
| 5 | Priya | 2020-08-07 01:31:28 |
| 6 | Vanitha | 2020-08-07 01:31:34 |
| 7 | Kasturi | 2020-08-07 01:31:40 |
| 8 | Lakshmi | 2020-08-07 01:31:45 |
| 9 | Suriya Devi | 2020-08-07 01:31:51 |
+----+-------------+---------------------+
Now do SQOOP Import full load
$ sqoop import \
-connect jdbc:mysql://localhost:3306/ohm \
-table person \
-username root \
-password cloudera \
-check-column last_mod_dt \
-incremental append \
-last-value '2020-08-07 01:17:17' \
-target-dir /user/cloudera/person_test
BoundingValsQuery: SELECT MIN(`id`), MAX(`id`) FROM `person` WHERE ( `last_mod_dt` > '2020-08-07 01:17:17' AND `last_mod_dt` <= '2020-08-07 01:31:51.0' )
$ hdfs dfs -ls /user/cloudera/person_test
Found 5 items
-rw-r--r-- 1 cloudera cloudera 0 2020-08-07 01:28 /user/cloudera/person_test/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 29 2020-08-07 01:28 /user/cloudera/person_test/part-m-00000
-rw-r--r-- 1 cloudera cloudera 29 2020-08-07 01:28 /user/cloudera/person_test/part-m-00001
-rw-r--r-- 1 cloudera cloudera 30 2020-08-07 01:28 /user/cloudera/person_test/part-m-00002
-rw-r--r-- 1 cloudera cloudera 30 2020-08-07 01:28 /user/cloudera/person_test/part-m-00003
// Here we have imported all 4 records using FULL LOAD
$ hdfs dfs -cat /user/cloudera/person_test/*
2,Ravi,2020-08-07 01:17:30.0
3,Kalai,2020-08-07 01:17:34.0
4,Sudha,2020-08-07 01:17:39.0
5,Priya,2020-08-07 01:31:28.0
6,Vanitha,2020-08-07 01:31:34.0
7,Kasturi,2020-08-07 01:31:40.0
8,Lakshmi,2020-08-07 01:31:45.0
9,Suriya Devi,2020-08-07 01:31:51.0
// Adding new records in MySQL
mysql> insert into ohm.person(name) values ('Nanjil Vijayan');
Query OK, 1 row affected (0.01 sec)
mysql> insert into ohm.person(name) values ('Elizabeth Helen');
Query OK, 1 row affected (0.01 sec)
mysql> insert into ohm.person(name) values ('Peter Paul');
Query OK, 1 row affected (0.00 sec)
mysql> insert into ohm.person(name) values ('Ravindran');
Query OK, 1 row affected (0.00 sec)
mysql> select * from ohm.person;
+----+-----------------+---------------------+
| id | name | last_mod_dt |
+----+-----------------+---------------------+
| 1 | Raja | 2020-08-07 01:17:17 |
| 2 | Ravi | 2020-08-07 01:17:30 |
| 3 | Kalai | 2020-08-07 01:17:34 |
| 4 | Sudha | 2020-08-07 01:17:39 |
| 5 | Priya | 2020-08-07 01:31:28 |
| 6 | Vanitha | 2020-08-07 01:31:34 |
| 7 | Kasturi | 2020-08-07 01:31:40 |
| 8 | Lakshmi | 2020-08-07 01:31:45 |
| 9 | Suriya Devi | 2020-08-07 01:31:51 | -- Last-value of the previous import
| 10 | Nanjil Vijayan | 2020-08-07 01:40:53 |
| 11 | Elizabeth Helen | 2020-08-07 01:41:14 |
| 12 | Peter Paul | 2020-08-07 01:41:20 |
| 13 | Ravindran | 2020-08-07 01:41:35 |
+----+-----------------+---------------------+
Now do SQOOP incremental Import with incremental append mode with last-modified option
$ sqoop import \
-connect jdbc:mysql://localhost:3306/ohm \
-table person \
-username root \
-password cloudera \
-check-column last_mod_dt \
-incremental append \
-last-value '2020-08-07 01:31:51' \
-target-dir /user/cloudera/person_test \
-m 1
hdfs dfs -ls /user/cloudera/person_test
Found 5 items
-rw-r--r-- 1 cloudera cloudera 59 2020-08-07 01:37 /user/cloudera/person_test/part-m-00000
-rw-r--r-- 1 cloudera cloudera 60 2020-08-07 01:38 /user/cloudera/person_test/part-m-00001
-rw-r--r-- 1 cloudera cloudera 64 2020-08-07 01:38 /user/cloudera/person_test/part-m-00002
-rw-r--r-- 1 cloudera cloudera 68 2020-08-07 01:38 /user/cloudera/person_test/part-m-00003
-rw-r--r-- 1 cloudera cloudera 152 2020-08-07 01:43 /user/cloudera/person_test/part-m-00004 -- new file which has only incremental records
$ hdfs dfs -cat /user/cloudera/person_test/*
2,Ravi,2020-08-07 01:17:30.0
3,Kalai,2020-08-07 01:17:34.0
4,Sudha,2020-08-07 01:17:39.0
5,Priya,2020-08-07 01:31:28.0
6,Vanitha,2020-08-07 01:31:34.0
7,Kasturi,2020-08-07 01:31:40.0
8,Lakshmi,2020-08-07 01:31:45.0
9,Suriya Devi,2020-08-07 01:31:51.0
10,Nanjil Vijayan,2020-08-07 01:40:53.0 -- incremental records
11,Elizabeth Helen,2020-08-07 01:41:14.0
12,Peter Paul,2020-08-07 01:41:20.0
13,Ravindran,2020-08-07 01:41:35.0
Wednesday, 5 August 2020
Import all tables from MySQL to HDFS (Exclude Specific tables while importing)
Monday, 3 August 2020
SQOOP Export Example : Hive to MySQL using SQOOP Export
SQOOP Export Example : HDFS to MySQL using SQOOP Export
SQOOP Import Problem and Solution #3
SQOOP Import Problem and Solution #2
SQOOP Import problem and solution #1
Incremental append : MySQL to HDFS
List Databases, Tables and Display the Structure of a table using SQOOP
NULL Handling while SQOOP import
Importing data from MySQL to HDFS using SQOOP based on conditions Where, Query, Columns options
Split-by and boundary-query example in SQOOP
Sunday, 2 August 2020
Compression methods in SQOOP : Gzip, Snappy, Deflate, bzip2, lz4
Create Avro, Parquet, Sequence files using the SQOOP output
Import Data from MySQL to HDFS using SQOOP - Revisiting SQOOP Commands
Friday, 31 July 2020
Import a table from MySQL to Hbase using SQOOP Import
Import a table from MySQL to Hive using SQOOP Import
Flume - Simple Demo
// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...
-
How to fetch Spark Application Id programmaticall while running the Spark Job? scala> spark.sparkContext.applicationId res124: String = l...
-
input data: ---------- customerID, itemID, amount 44,8602,37.19 35,5368,65.89 2,3391,40.64 47,6694,14.98 29,680,13.08 91,8900,24.59 ...
-
pattern matching is similar to switch statements in C#, Java no fall-through - at least one condition matched no breaks object PatternExa { ...