Friday, 7 August 2020

Incremental Import in SQOOP with Date column

// Create a database named : ohm 

// Create a table named : person   with id (auto increment), name string, last_mod_dt (default value is current_timestamp)


mysql> create database if not exists ohm;

Query OK, 1 row affected (0.00 sec)

 

mysql> use ohm;


CREATE TABLE if not exists ohm.person (

  id BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,

  name VARCHAR(500) NOT NULL,

  last_mod_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

  PRIMARY KEY (id),

  UNIQUE KEY person (last_mod_dt)

)  ;


mysql> describe ohm.person;

+-------------+---------------------+------+-----+-------------------+----------------+

| Field       | Type                | Null | Key | Default           | Extra          |

+-------------+---------------------+------+-----+-------------------+----------------+

| id          | bigint(20) unsigned | NO   | PRI | NULL              | auto_increment |

| name        | varchar(500)        | NO   |     | NULL              |                |

| last_mod_dt | timestamp           | NO   | UNI | CURRENT_TIMESTAMP |                |

+-------------+---------------------+------+-----+-------------------+----------------+



// Add 4 records

mysql> insert into ohm.person(name) values ('Raja');

Query OK, 1 row affected (0.00 sec)


mysql> insert into ohm.person(name) values ('Ravi');

Query OK, 1 row affected (0.00 sec)


mysql> insert into ohm.person(name) values ('Kalai');

Query OK, 1 row affected (0.00 sec)


mysql> insert into ohm.person(name) values ('Sudha');

Query OK, 1 row affected (0.00 sec)


mysql> insert into ohm.person(name) values ('Priya');

Query OK, 1 row affected (0.00 sec)


mysql> insert into ohm.person(name) values ('Vanitha');

Query OK, 1 row affected (0.00 sec)


mysql> insert into ohm.person(name) values ('Kasturi');

Query OK, 1 row affected (0.00 sec)


mysql> insert into ohm.person(name) values ('Lakshmi');

Query OK, 1 row affected (0.00 sec)


mysql> insert into ohm.person(name) values ('Suriya Devi');



mysql> select * from ohm.person;

+----+-------------+---------------------+

| id | name        | last_mod_dt         |

+----+-------------+---------------------+

|  1 | Raja        | 2020-08-07 01:17:17 |

|  2 | Ravi        | 2020-08-07 01:17:30 |

|  3 | Kalai       | 2020-08-07 01:17:34 |

|  4 | Sudha       | 2020-08-07 01:17:39 |

|  5 | Priya       | 2020-08-07 01:31:28 |

|  6 | Vanitha     | 2020-08-07 01:31:34 |

|  7 | Kasturi     | 2020-08-07 01:31:40 |

|  8 | Lakshmi     | 2020-08-07 01:31:45 |

|  9 | Suriya Devi | 2020-08-07 01:31:51 |

+----+-------------+---------------------+





Now do SQOOP Import full load 


$ sqoop import \

-connect jdbc:mysql://localhost:3306/ohm \

-table person \

-username root \

-password cloudera \

-check-column last_mod_dt \

-incremental append   \

-last-value '2020-08-07 01:17:17' \

-target-dir /user/cloudera/person_test



 BoundingValsQuery: SELECT MIN(`id`), MAX(`id`) FROM `person` WHERE ( `last_mod_dt` > '2020-08-07 01:17:17' AND `last_mod_dt` <= '2020-08-07 01:31:51.0' )



$  hdfs dfs -ls /user/cloudera/person_test

Found 5 items

-rw-r--r--   1 cloudera cloudera          0 2020-08-07 01:28 /user/cloudera/person_test/_SUCCESS

-rw-r--r--   1 cloudera cloudera         29 2020-08-07 01:28 /user/cloudera/person_test/part-m-00000

-rw-r--r--   1 cloudera cloudera         29 2020-08-07 01:28 /user/cloudera/person_test/part-m-00001

-rw-r--r--   1 cloudera cloudera         30 2020-08-07 01:28 /user/cloudera/person_test/part-m-00002

-rw-r--r--   1 cloudera cloudera         30 2020-08-07 01:28 /user/cloudera/person_test/part-m-00003


// Here we have imported all 4 records using FULL LOAD

$ hdfs dfs -cat /user/cloudera/person_test/*

2,Ravi,2020-08-07 01:17:30.0

3,Kalai,2020-08-07 01:17:34.0

4,Sudha,2020-08-07 01:17:39.0

5,Priya,2020-08-07 01:31:28.0

6,Vanitha,2020-08-07 01:31:34.0

7,Kasturi,2020-08-07 01:31:40.0

8,Lakshmi,2020-08-07 01:31:45.0

9,Suriya Devi,2020-08-07 01:31:51.0


 

// Adding new records in MySQL


mysql> insert into ohm.person(name) values ('Nanjil Vijayan');

Query OK, 1 row affected (0.01 sec)


mysql> insert into ohm.person(name) values ('Elizabeth Helen');

Query OK, 1 row affected (0.01 sec)


mysql> insert into ohm.person(name) values ('Peter Paul');

Query OK, 1 row affected (0.00 sec)


mysql> insert into ohm.person(name) values ('Ravindran');

Query OK, 1 row affected (0.00 sec)


mysql> select * from ohm.person;

+----+-----------------+---------------------+

| id | name            | last_mod_dt         |

+----+-----------------+---------------------+

|  1 | Raja            | 2020-08-07 01:17:17 |

|  2 | Ravi            | 2020-08-07 01:17:30 |

|  3 | Kalai           | 2020-08-07 01:17:34 |

|  4 | Sudha           | 2020-08-07 01:17:39 |

|  5 | Priya           | 2020-08-07 01:31:28 |

|  6 | Vanitha         | 2020-08-07 01:31:34 |

|  7 | Kasturi         | 2020-08-07 01:31:40 |

|  8 | Lakshmi         | 2020-08-07 01:31:45 |

|  9 | Suriya Devi     | 2020-08-07 01:31:51 |  -- Last-value of the previous import

| 10 | Nanjil Vijayan  | 2020-08-07 01:40:53 |

| 11 | Elizabeth Helen | 2020-08-07 01:41:14 |

| 12 | Peter Paul      | 2020-08-07 01:41:20 |

| 13 | Ravindran       | 2020-08-07 01:41:35 |

+----+-----------------+---------------------+





Now do SQOOP incremental Import with incremental append mode with last-modified option


$ sqoop import \

-connect jdbc:mysql://localhost:3306/ohm \

-table person \

-username root \

-password cloudera \

-check-column last_mod_dt \

-incremental append   \

-last-value '2020-08-07 01:31:51' \

-target-dir /user/cloudera/person_test \

-m 1  



 hdfs dfs -ls /user/cloudera/person_test

Found 5 items

-rw-r--r--   1 cloudera cloudera         59 2020-08-07 01:37 /user/cloudera/person_test/part-m-00000

-rw-r--r--   1 cloudera cloudera         60 2020-08-07 01:38 /user/cloudera/person_test/part-m-00001

-rw-r--r--   1 cloudera cloudera         64 2020-08-07 01:38 /user/cloudera/person_test/part-m-00002

-rw-r--r--   1 cloudera cloudera         68 2020-08-07 01:38 /user/cloudera/person_test/part-m-00003

-rw-r--r--   1 cloudera cloudera        152 2020-08-07 01:43 /user/cloudera/person_test/part-m-00004  -- new file which has only incremental records



$ hdfs dfs -cat /user/cloudera/person_test/*

2,Ravi,2020-08-07 01:17:30.0

3,Kalai,2020-08-07 01:17:34.0

4,Sudha,2020-08-07 01:17:39.0

5,Priya,2020-08-07 01:31:28.0

6,Vanitha,2020-08-07 01:31:34.0

7,Kasturi,2020-08-07 01:31:40.0

8,Lakshmi,2020-08-07 01:31:45.0

9,Suriya Devi,2020-08-07 01:31:51.0

10,Nanjil Vijayan,2020-08-07 01:40:53.0  -- incremental records 

11,Elizabeth Helen,2020-08-07 01:41:14.0

12,Peter Paul,2020-08-07 01:41:20.0

13,Ravindran,2020-08-07 01:41:35.0



No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...