Showing posts with label mysql. Show all posts
Showing posts with label mysql. Show all posts

Saturday, 8 August 2020

SQOOP Export with Staging table Example - HDFS to MySQL data export using SQOOP with staging-table

mysql> use retail_db;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A


Database changed
mysql> show tables;
+---------------------+
| Tables_in_retail_db |
+---------------------+
| categories |
| customers |
| departments |
| order_items |
| orders |
| products |
+---------------------+
6 rows in set (0.00 sec)


mysql> describe orders;
+-------------------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------------+-------------+------+-----+---------+----------------+
| order_id | int(11) | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| order_customer_id | int(11) | NO | | NULL | |
| order_status | varchar(45) | NO | | NULL | |
+-------------------+-------------+------+-----+---------+----------------+




// create a staging table with the same structure of orders table


mysql> create table retail_db.orders_staging as select * from retail_db.orders where 1 = 0;
Query OK, 0 rows affected (0.10 sec)
Records: 0 Duplicates: 0 Warnings: 0


// create a new table : orders_exported which has the same structure of retail_db.orders table


mysql> create table retail_db.orders_exported as select * from retail_db.orders where 1 = 0;
Query OK, 0 rows affected (0.00 sec)
Records: 0 Duplicates: 0 Warnings: 0






// verify the structure of newly created orders_staging table
mysql> describe orders_staging;
+-------------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------------+-------------+------+-----+---------+-------+
| order_id | int(11) | NO | | 0 | |
| order_date | datetime | NO | | NULL | |
| order_customer_id | int(11) | NO | | NULL | |
| order_status | varchar(45) | NO | | NULL | |
+-------------------+-------------+------+-----+---------+-------+
4 rows in set (0.00 sec)


// I already have the orders data in HDFS
$ hdfs dfs -ls /user/cloudera/orders
Found 5 items
-rw-r--r-- 1 cloudera cloudera 0 2020-08-06 05:52 /user/cloudera/orders/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 741614 2020-08-06 05:52 /user/cloudera/orders/part-m-00000
-rw-r--r-- 1 cloudera cloudera 753022 2020-08-06 05:52 /user/cloudera/orders/part-m-00001
-rw-r--r-- 1 cloudera cloudera 752368 2020-08-06 05:52 /user/cloudera/orders/part-m-00002
-rw-r--r-- 1 cloudera cloudera 752940 2020-08-06 05:52 /user/cloudera/orders/part-m-00003




// Export HDFS data into MySQL table using Staging stable approach
sqoop export \
-connect "jdbc:mysql://localhost/retail_db" \
-username root \
-password cloudera \
-table orders_exported \
-export-dir hdfs://localhost:8020/user/cloudera/orders/ \
-staging-table orders_staging \
-clear-staging-table \
-input-fields-terminated-by ','




20/08/08 00:24:45 INFO mapreduce.ExportJobBase: Exported 68883 records.
20/08/08 00:24:45 INFO mapreduce.ExportJobBase: Starting to migrate data from staging table to destination.
20/08/08 00:24:45 INFO manager.SqlManager: Migrated 68883 records from `orders_staging` to `orders_exported`




mysql> select * from retail_db.orders_exported limit 5;
+----------+---------------------+-------------------+-----------------+
| order_id | order_date | order_customer_id | order_status |
+----------+---------------------+-------------------+-----------------+
| 1 | 2013-07-25 00:00:00 | 11599 | CLOSED |
| 2 | 2013-07-25 00:00:00 | 256 | PENDING_PAYMENT |
| 3 | 2013-07-25 00:00:00 | 12111 | COMPLETE |
| 4 | 2013-07-25 00:00:00 | 8827 | CLOSED |
| 5 | 2013-07-25 00:00:00 | 11318 | COMPLETE |
+----------+---------------------+-------------------+-----------------+
5 rows in set (0.00 sec)


mysql> select count(1) from retail_db.orders_exported limit 5;
+----------+
| count(1) |
+----------+
| 68883 |
+----------+
1 row in set (0.01 sec)




Line count in HDFS:
$ hdfs dfs -cat hdfs://localhost:8020/user/cloudera/orders/* | wc -l
68883







Friday, 7 August 2020

Create, Execute, Delete SQOOP Jobs

 // I have a ohm.person table in MySQL with the following records 


mysql> select * from ohm.person;

+----+-----------------+---------------------+

| id | name            | last_mod_dt         |

+----+-----------------+---------------------+

|  1 | Raja            | 2020-08-07 01:17:17 |

|  2 | Ravi            | 2020-08-07 01:17:30 |

|  3 | Kalai           | 2020-08-07 01:17:34 |

|  4 | Sudha           | 2020-08-07 01:17:39 |

|  5 | Priya           | 2020-08-07 01:31:28 |

|  6 | Vanitha         | 2020-08-07 01:31:34 |

|  7 | Kasturi         | 2020-08-07 01:31:40 |

|  8 | Lakshmi         | 2020-08-07 01:31:45 |

|  9 | Suriya Devi     | 2020-08-07 01:31:51 |

| 10 | Nanjil Vijayan  | 2020-08-07 01:40:53 |

| 11 | Elizabeth Helen | 2020-08-07 01:41:14 |

| 12 | Peter Paul      | 2020-08-07 01:41:20 |

| 13 | Ravindran       | 2020-08-07 01:41:35 |

+----+-----------------+---------------------+




// Create a new Sqoop Job:


$ sqoop job \

-create person_inc_job \

-- import \

-connect jdbc:mysql://localhost:3306/ohm \

-table person \

-username root \

-password cloudera \

-check-column last_mod_dt \

-incremental append   \

-last-value '01:17:17' \

-target-dir /user/cloudera/person_test \

-m 1  



// Display all job names already created


$ sqoop job --list


Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.

Please set $ACCUMULO_HOME to the root of your Accumulo installation.

20/08/07 02:05:54 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0

Available jobs:


  person_inc_job


// delete the target folder if exists


$ hdfs dfs -rm -r /user/cloudera/person_test

rm: `/user/cloudera/person_test': No such file or directory




// run the sqoop job for the first time 

$ sqoop job --exec person_inc_job


 Retrieved 13 records.



$ hdfs dfs -cat /user/cloudera/person_test/*

1,Raja,2020-08-07 01:17:17.0

2,Ravi,2020-08-07 01:17:30.0

3,Kalai,2020-08-07 01:17:34.0

4,Sudha,2020-08-07 01:17:39.0

5,Priya,2020-08-07 01:31:28.0

6,Vanitha,2020-08-07 01:31:34.0

7,Kasturi,2020-08-07 01:31:40.0

8,Lakshmi,2020-08-07 01:31:45.0

9,Suriya Devi,2020-08-07 01:31:51.0

10,Nanjil Vijayan,2020-08-07 01:40:53.0

11,Elizabeth Helen,2020-08-07 01:41:14.0

12,Peter Paul,2020-08-07 01:41:20.0

13,Ravindran,2020-08-07 01:41:35.0



// Now again add some more new records in MySQL


mysql> insert into ohm.person(name) values ('Anbu');

Query OK, 1 row affected (0.00 sec)


mysql> insert into ohm.person(name) values ('Sudha');

Query OK, 1 row affected (0.01 sec)


mysql> insert into ohm.person(name) values ('Aish');

Query OK, 1 row affected (0.00 sec)


mysql> insert into ohm.person(name) values ('Vijay');

Query OK, 1 row affected (0.00 sec)


mysql> insert into ohm.person(name) values ('Balaji');

Query OK, 1 row affected (0.01 sec)



mysql> select * from ohm.person;  -- MySQL console

+----+-----------------+---------------------+

| id | name            | last_mod_dt         |

+----+-----------------+---------------------+

|  1 | Raja            | 2020-08-07 01:17:17 |

|  2 | Ravi            | 2020-08-07 01:17:30 |

|  3 | Kalai           | 2020-08-07 01:17:34 |

|  4 | Sudha           | 2020-08-07 01:17:39 |

|  5 | Priya           | 2020-08-07 01:31:28 |

|  6 | Vanitha         | 2020-08-07 01:31:34 |

|  7 | Kasturi         | 2020-08-07 01:31:40 |

|  8 | Lakshmi         | 2020-08-07 01:31:45 |

|  9 | Suriya Devi     | 2020-08-07 01:31:51 |

| 10 | Nanjil Vijayan  | 2020-08-07 01:40:53 |

| 11 | Elizabeth Helen | 2020-08-07 01:41:14 |

| 12 | Peter Paul      | 2020-08-07 01:41:20 |

| 13 | Ravindran       | 2020-08-07 01:41:35 |  -- last value

| 14 | Anbu            | 2020-08-07 02:23:25 |  -- newly added records 

| 15 | Sudha           | 2020-08-07 02:23:29 |

| 16 | Aish            | 2020-08-07 02:23:36 |

| 17 | Vijay           | 2020-08-07 02:23:44 |

| 18 | Balaji          | 2020-08-07 02:23:47 |

+----+-----------------+---------------------+

18 rows in set (0.00 sec)



// run the sqoop job again to fetch incremental records

$ sqoop job --exec person_inc_job


Retrieved 5 records.


// Display the imported data available in HDFS

$ hdfs dfs -cat /user/cloudera/person_test/*

1,Raja,2020-08-07 01:17:17.0

2,Ravi,2020-08-07 01:17:30.0

3,Kalai,2020-08-07 01:17:34.0

4,Sudha,2020-08-07 01:17:39.0

5,Priya,2020-08-07 01:31:28.0

6,Vanitha,2020-08-07 01:31:34.0

7,Kasturi,2020-08-07 01:31:40.0

8,Lakshmi,2020-08-07 01:31:45.0

9,Suriya Devi,2020-08-07 01:31:51.0

10,Nanjil Vijayan,2020-08-07 01:40:53.0

11,Elizabeth Helen,2020-08-07 01:41:14.0

12,Peter Paul,2020-08-07 01:41:20.0

13,Ravindran,2020-08-07 01:41:35.0 -- loaded from FULL LOAD

14,Anbu,2020-08-07 02:23:25.0   -- incremental records

15,Sudha,2020-08-07 02:23:29.0

16,Aish,2020-08-07 02:23:36.0

17,Vijay,2020-08-07 02:23:44.0

18,Balaji,2020-08-07 02:23:47.0



// To delete the sqoop job :

$ sqoop job --delete person_inc_job;

Monday, 3 August 2020

SQOOP Export Example : Hive to MySQL using SQOOP Export

// We are going to export ohm.cust_from_mysql (hive table ) into cust_exported (MySQL)

hive> select * from ohm.cust_from_mysql limit 5;
OK
1 Richard Hernandez XXXXXXXXX XXXXXXXXX 6303 Heather Plaza Brownsville TX 78521 
2 Mary Barrett XXXXXXXXX XXXXXXXXX 9526 Noble Embers Ridge Littleton CO 80126
3 Ann Smith XXXXXXXXX XXXXXXXXX 3422 Blue Pioneer Bend Caguas PR 00725
4 Mary Jones XXXXXXXXX XXXXXXXXX 8324 Little Common San Marcos CA 92069
5 Robert Hudson XXXXXXXXX XXXXXXXXX 10 Crystal River Mall Caguas PR 00725

hive> select count(1) from ohm.cust_from_mysql;

12435 -- Hive Table has 12435 Rows in it

// We must create target table in destination database before export 
Create an empty table in retail_db of MySQL
mysql> create table retail_db.cust_exported as select * from customers where 1 =0 

// Presently retail_db.cust_exported table has zero rows.

$ sqoop export \
-connect "jdbc:mysql://localhost/retail_db" \
-username root \
-password cloudera \
-table cust_exported \
-hcatalog-table cust_from_mysql \
-hcatalog-database ohm

Exported 12435 records.


mysql> select * from cust_exported  limit 5;
+-------------+----------------+----------------+----------------+-------------------+------------------------+---------------+----------------+------------------+
| customer_id | customer_fname | customer_lname | customer_email | customer_password | customer_street        | customer_city | customer_state | customer_zipcode |
+-------------+----------------+----------------+----------------+-------------------+------------------------+---------------+----------------+------------------+
|        9327 | Donna          | Smith          | XXXXXXXXX      | XXXXXXXXX         | 4114 Clear Nectar Isle | Caguas        | PR             | 00725            |
|           1 | Richard        | Hernandez      | XXXXXXXXX      | XXXXXXXXX         | 6303 Heather Plaza     | Brownsville   | TX             | 78521            |
|        9328 | Mary           | Perez          | XXXXXXXXX      | XXXXXXXXX         | 376 Golden Orchard     | Moreno Valley | CA             | 92553            |
|        9329 | Eugene         | Powell         | XXXXXXXXX      | XXXXXXXXX         | 2161 Burning Maze      | Metairie      | LA             | 70003            |
|        9330 | Mary           | Conley         | XXXXXXXXX      | XXXXXXXXX         | 3046 Broad Sky Dale    | Caguas        | PR             | 00725            |
+-------------+----------------+----------------+----------------+-------------------+------------------------+---------------+----------------+------------------+

mysql> select count(1) from retail_db.cust_exported;
+----------+
| count(1) |
+----------+
|    12435 |
+----------+

Incremental append : MySQL to HDFS

#1. In the very first phase, we are doing FULL LOAD (init version)
#2. In the 2nd phase, we are doing INCREMENTAL APPEND 
$ mysql -uroot -pcloudera -hlocalhost

mysql>use ohm; 

mysql>create table person (id int, name varchar(50));

mysql>insert into person (id,name) values (100,'Raja'),(101,'Siva'),('102','Lal');

mysql> select * from ohm.person;
+------+------+
| id   | name |
+------+------+
|  100 | Raja |
|  101 | Siva |
|  102 | Lal  |
+------+------+
3 rows in set (0.00 sec)


// Initial full load
sqoop import \
-connect jdbc:mysql://localhost:3306/ohm \
-username root \
-password cloudera \
-table person \
-target-dir  user/cloudera/person_incremental \
-split-by id


hdfs dfs -ls user/cloudera/person_incremental
Found 4 items
-rw-r--r--   1 cloudera cloudera          0 2020-08-03 04:49 user/cloudera/person_incremental/_SUCCESS
-rw-r--r--   1 cloudera cloudera          9 2020-08-03 04:49 user/cloudera/person_incremental/part-m-00000
-rw-r--r--   1 cloudera cloudera          9 2020-08-03 04:49 user/cloudera/person_incremental/part-m-00001
-rw-r--r--   1 cloudera cloudera          8 2020-08-03 04:49 user/cloudera/person_incremental/part-m-00002

$ hdfs dfs -cat user/cloudera/person_incremental/*
100,Raja
101,Siva
102,Lal


//Adding 4 more records in ohm.person
mysql>insert into ohm.person (id,name) values (103,'Nila'),(104,'Kalai'),('105','Bharani'),('106','Sara');
Query OK, 4 rows affected (0.00 sec)
Records: 4  Duplicates: 0  Warnings: 0

mysql> select * from ohm.person;
+------+---------+
| id   | name    |
+------+---------+
|  100 | Raja    | $ Existing Row
|  101 | Siva    | $ Existing Row
|  102 | Lal     | $ Existing Row
|  103 | Nila    | * Newly inserted row
|  104 | Kalai   | * Newly inserted row
|  105 | Bharani | * Newly inserted row
|  106 | Sara    | * Newly inserted row
+------+---------+



// incremental append using SQOOP Import
// feeding last-value as 102 and check-column as id
sqoop import \
-connect jdbc:mysql://localhost:3306/ohm \
-username root \
-password cloudera \
-table person \
-incremental append \
-check-column id \
-last-value 102 \
-target-dir  user/cloudera/person_incremental \
-split-by id


$ hdfs dfs -ls  user/cloudera/person_incremental
Found 8 items
-rw-r--r--   1 cloudera cloudera          0 2020-08-03 04:49 user/cloudera/person_incremental/_SUCCESS
-rw-r--r--   1 cloudera cloudera          9 2020-08-03 04:49 user/cloudera/person_incremental/part-m-00000
-rw-r--r--   1 cloudera cloudera          9 2020-08-03 04:49 user/cloudera/person_incremental/part-m-00001
-rw-r--r--   1 cloudera cloudera          8 2020-08-03 04:49 user/cloudera/person_incremental/part-m-00002
-rw-r--r--   1 cloudera cloudera          9 2020-08-03 05:10 user/cloudera/person_incremental/part-m-00003
-rw-r--r--   1 cloudera cloudera         10 2020-08-03 05:10 user/cloudera/person_incremental/part-m-00004
-rw-r--r--   1 cloudera cloudera         12 2020-08-03 05:10 user/cloudera/person_incremental/part-m-00005
-rw-r--r--   1 cloudera cloudera          9 2020-08-03 05:10 user/cloudera/person_incremental/part-m-00006


$ hdfs dfs -cat user/cloudera/person_incremental/*
100,Raja
101,Siva
102,Lal
103,Nila  -- Newly appended row
104,Kalai  -- Newly appended row
105,Bharani  -- Newly appended row
106,Sara  -- Newly appended row

List Databases, Tables and Display the Structure of a table using SQOOP

List Databases, Tables and Display the Structure of a table using SQOOP

//Display all the databases

sqoop list-databases \
-connect "jdbc:mysql://localhost:3306/" \
-username root \
-password cloudera 

firehose
hue
metastore
mysql
nav
navms
ohm
oozie
retail_db
rman
sentry


//Display all the tables in a database:

sqoop list-tables \
-connect "jdbc:mysql://localhost:3306/retail_db" \
-username root \
-password cloudera 


categories
customers
departments
order_items
orders
products
suppliers


//Do evalute some queries

sqoop eval \
-connect "jdbc:mysql://localhost:3306/retail_db" \
-username root \
-password cloudera \
-query "describe customers" 

---------------------------------------------------------------------------------------------------------
| Field                | Type                 | Null | Key | Default              | Extra                | 
---------------------------------------------------------------------------------------------------------
| customer_id          | int(11)              | NO  | PRI | (null)               | auto_increment       | 
| customer_fname       | varchar(45)          | NO  |     | (null)               |                      | 
| customer_lname       | varchar(45)          | NO  |     | (null)               |                      | 
| customer_email       | varchar(45)          | NO  |     | (null)               |                      | 
| customer_password    | varchar(45)          | NO  |     | (null)               |                      | 
| customer_street      | varchar(255)         | NO  |     | (null)               |                      | 
| customer_city        | varchar(45)          | NO  |     | (null)               |                      | 
| customer_state       | varchar(45)          | NO  |     | (null)               |                      | 
| customer_zipcode     | varchar(45)          | NO  |     | (null)               |                      | 
---------------------------------------------------------------------------------------------------------

sqoop eval \
-connect "jdbc:mysql://localhost:3306/retail_db" \
-username root \
-password cloudera \
-query "select min(customer_id) as Minn, max(customer_id) as Maxx from retail_db.customers" 

-----------------------------
| Minn        | Maxx        | 
-----------------------------
| 1           | 12435       | 
-----------------------------

Import data from MySQL to Hive with the help of SQOOP.

Import data from MySQL to Hive with the help of SCOOP.

$ mysql -uroot -pcloudera -hlocalhost

mysql> describe retail_db.customers;
+-------------------+--------------+------+-----+---------+----------------+
| Field             | Type         | Null | Key | Default | Extra          |
+-------------------+--------------+------+-----+---------+----------------+
| customer_id       | int(11)      | NO   | PRI | NULL    | auto_increment |
| customer_fname    | varchar(45)  | NO   |     | NULL    |                |
| customer_lname    | varchar(45)  | NO   |     | NULL    |                |
| customer_email    | varchar(45)  | NO   |     | NULL    |                |
| customer_password | varchar(45)  | NO   |     | NULL    |                |
| customer_street   | varchar(255) | NO   |     | NULL    |                |
| customer_city     | varchar(45)  | NO   |     | NULL    |                |
| customer_state    | varchar(45)  | NO   |     | NULL    |                |
| customer_zipcode  | varchar(45)  | NO   |     | NULL    |                |
+-------------------+--------------+------+-----+---------+----------------+


sqoop import \
-connect "jdbc:mysql://localhost:3306/retail_db" \
-username root \
-password cloudera \
-table customers \
-target-dir /user/cloudera/cust_from_mysql \
-delete-target-dir \
-hive-import \
-create-hive-table \
-hive-database ohm \
-hive-table cust_from_mysql

// ohm.cust_from_mysql table will be created after import success.

hive> select * from ohm.cust_from_mysql limit 5;
OK
1 Richard Hernandez XXXXXXXXX XXXXXXXXX 6303 Heather Plaza Brownsville TX 78521
2 Mary Barrett XXXXXXXXX XXXXXXXXX 9526 Noble Embers RidgeLittleton CO 80126
3 Ann Smith XXXXXXXXX XXXXXXXXX 3422 Blue Pioneer Bend Caguas PR 00725
4 Mary Jones XXXXXXXXX XXXXXXXXX 8324 Little Common San Marcos CA 92069
5 Robert Hudson XXXXXXXXX XXXXXXXXX 10 Crystal River Mall Caguas PR 00725


 

Friday, 31 July 2020

Import a table from MySQL to Hbase using SQOOP Import

// Import into Hbase
sqoop import \
-connect jdbc:mysql://localhost:3306/ohm \
-driver com.mysql.jdbc.Driver \
-username root \
-password cloudera \
-table employee \
-hbase-create-table \
-hbase-table employee_details \
-column-family employees \
-hbase-row-key id -m 1

// single line statement for the same
sqoop import -connect jdbc:mysql://localhost:3306/ohm -driver com.mysql.jdbc.Driver -username root -password cloudera -table employee -hbase-create-table -hbase-table employee_details -column-family employees -hbase-row-key id -m 1

[cloudera@quickstart ~]$ hbase shell

scan 'employee-details'
// it will display all columnar view

Import a table from MySQL to Hive using SQOOP Import

// Start MySQL 
$ sudo mysql -uroot -pcloudera

mysql> create database ohm;
Query OK, 1 row affected (0.00 sec)

mysql> use ohm;
Database changed


CREATE TABLE employee 
(
id INT,
first_name VARCHAR(100),
last_name VARCHAR(100),
gender VARCHAR(10),
designation VARCHAR(20),
city VARCHAR(20),
country VARCHAR(20)
);


INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (1, 'Jervis', 'Roll', 'Male', 'Director of Sales', 'Thi Tran Lac', 'Vietnam');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (2, 'Gordon', 'Maltster', 'Male', 'Marketing Manager', 'Mabu', 'China');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (3, 'Griff', 'Godsafe', 'Male', 'Actuary', 'Kipit', 'Philippines');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (4, 'Gracie', 'Franken', 'Female', 'Assistant Manager', 'Xiabuji', 'China');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (5, 'Joelly', 'Wellbank', 'Female', 'Account Coordinator', 'Whitehorse', 'Canada');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (6, 'Bab', 'Havock', 'Female', 'Accountant II', 'Basyūn', 'Egypt');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (7, 'Carmine', 'Courage', 'Female', 'Account Coordinator', 'Boyeros', 'Cuba');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (8, 'Estella', 'Marvell', 'Female', 'Structural Analysis Engineer', 'Stettler', 'Canada');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (9, 'Celie', 'Trevaskiss', 'Female', 'Assistant Manager', 'Criuleni', 'Moldova');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (10, 'Madison', 'Ranyell', 'Male', 'Research Associate', 'Angatel', 'Philippines');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (11, 'Haydon', 'Faughey', 'Male', 'Safety Technician IV', 'Masalovka', 'Russia');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (12, 'Michele', 'Zarfai', 'Male', 'Legal Assistant', 'Karatau', 'Kazakhstan');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (13, 'Ruthi', 'Bowmer', 'Female', 'Analog Circuit Design manager', 'Peski', 'Russia');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (14, 'Adolphus', 'Pickthorne', 'Male', 'Senior Developer', 'Mae Fa Luang', 'Thailand');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (15, 'Kat', 'Dymocke', 'Female', 'Geological Engineer', 'Markópoulo Oropoú', 'Greece');


// Display databases;
$ sqoop list-databases -connect jdbc:mysql://localhost -username root -password cloudera;

information_schema
cm
firehose
hue
metastore
mysql
nav
navms
ohm
oozie
retail_db
rman

// Display tables
$ sqoop list-tables -connect jdbc:mysql://localhost/ohm -username root -password cloudera;
employee
person

// Create a table in Hive
hive> create database ohm;
OK
Time taken: 2.041 seconds
hive> show tables;
OK
Time taken: 0.244 seconds


// Import a table from Mysql To Hive
sqoop import \
-connect jdbc:mysql://localhost:3306/ohm \
-driver com.mysql.jdbc.Driver \
-username root \
-password cloudera \
-table employee \
-hive-import \
-split-by id \
-hive-table ohm.employee

 
hive> use ohm;
OK
Time taken: 0.017 seconds
hive> show tables;
OK
employee
Time taken: 0.029 seconds, Fetched: 1 row(s)
hive> select * from employee;
OK
1 Jervis Roll Male Director of Sales Thi Tran Lac Vietnam
2 Gordon Maltster Male Marketing Manager Mabu China
3 Griff Godsafe Male Actuary Kipit Philippines
4 Gracie Franken Female Assistant Manager Xiabuji China
5 Joelly Wellbank Female Account Coordinator Whitehorse Canada
6 Bab Havock Female Accountant II Basyūn Egypt
10 Madison Ranyell Male Research Associate Angatel Philippines
11 Haydon Faughey Male Safety Technician IV Masalovka Russia
12 Michele Zarfai Male Legal Assistant Karatau Kazakhstan
13 Ruthi Bowmer Female Analog Circuit Desig Peski Russia
14 Adolphus Pickthorne Male Senior Developer Mae Fa Luang Thailand
15 Kat Dymocke Female Geological Engineer Markópoulo Oropoú Greece
Time taken: 0.92 seconds, Fetched: 12 row(s)


// Verify the record count in MySQL, HDFS and Hive
mysql> use ohm;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> select count(1) from employee;
+----------+
| count(1) |
+----------+
|       12 |
+----------+
1 row in set (0.00 sec)



// do record count from all the files in warehouse 
hdfs dfs -cat /user/hive/warehouse/ohm.db/employee/* | wc -l
12

// do record count from the table
hive> select count(1) from employee;
OK
12


Sunday, 24 May 2020

MySQL, Derby Metastore configuration for Hive

$ mysql -u root -p
Enter password:

mysql> SHOW VARIABLES LIKE 'validate_password%';
+--------------------------------------+--------+
| Variable_name                        | Value  |
+--------------------------------------+--------+
| validate_password.check_user_name    | ON     |
| validate_password.dictionary_file    |        |
| validate_password.length             | 8      |
| validate_password.mixed_case_count   | 1      |
| validate_password.number_count       | 1      |
| validate_password.policy             | MEDIUM |
| validate_password.special_char_count | 1      |
+--------------------------------------+--------+
7 rows in set (0.06 sec)

mysql> SET GLOBAL validate_password.length = 6;
Query OK, 0 rows affected (0.00 sec)

mysql> SET GLOBAL validate_password.number_count = 0;
Query OK, 0 rows affected (0.00 sec)

 
mysql> set global validate_password.policy  = LOW;
Query OK, 0 rows affected (0.00 sec)



mysql> CREATE USER 'hiveuser'@'localhost' IDENTIFIED BY 'mypassword';
...
mysql> REVOKE ALL PRIVILEGES, GRANT OPTION FROM 'hiveuser'@'localhost';
mysql> GRANT ALL PRIVILEGES ON metastore.* TO 'hiveuser'@'localhost';
mysql> FLUSH PRIVILEGES;
mysql> quit;

mysql -u hiveuser -p 
password : mypassword 

hive-site.xml:
--------------

<configuration>
   <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:mysql://localhost/metastore?createDatabaseIfNotExist=true</value>
      <description>metadata is stored in a MySQL server</description>
   </property>
   <property>
      <name>javax.jdo.option.ConnectionDriverName</name>
      <value>com.mysql.jdbc.Driver</value>
      <description>MySQL JDBC driver class</description>
   </property>
   <property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>hiveuser</value>
      <description>user name for connecting to mysql server</description>
   </property>
   <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>mypassword</value>
      <description>password for connecting to mysql server</description>
   </property>
</configuration>

Copy hive-site.xml from hive/conf folder into spark/conf folder.


cp /home/hadoop/apache-hive-3.1.2-bin/conf/hive-site.xml //home/hadoop/spark-3.0.0-preview2-bin-hadoop3.2/conf

Download mysql connector directly in jars folder of spark:

hadoop@hadoop:~/spark-3.0.0-preview2-bin-hadoop3.2/jars$ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar







Derby installation:

sudo apt install derby-tools libderby-java libderbyclient-java



hadoop@hadoop:~/apache-hive-2.3.7-bin/conf$ whereis derby
derby: /usr/share/derby

hive-site.xml - only for derby:

<property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:derby:/home/hadoop/apache-hive-3.1.2-bin/metastore_db;databaseName=metastore_db;create=true</value>
</property>

hive --service metastore 

if anything goes wrong - just drop metastore db in mysql and try the following step to re-create it.

Start creating metastore schema in mysql
schematool --dbType mysql --initSchema
Start creating metastore schema in derby
schematool -initSchema -dbType derby

Tuesday, 19 May 2020

StructField, StructType Example in PySpark. Create Dataframe using in memory object in PySpark

#Create in memory object in Pyspark and create dataframe using it and export into MySQL.

import pyspark.sql.types as st

data = [
    (101,'Sankaranarayanan','1976-04-20','M','Bangalore','Married'),
    (102,'Anbusudha','1979-07-22','F','Bangalore','Married'),
(103,'Rahul','1989-07-15','M','Manachai','Bachelor')
]

print(data)

[(101, 'Sankaranarayanan', '1976-04-20', 'M', 'Bangalore', 'Married'), (102, 'Anbusudha', '1979-07-22', 'F', 'Bangalore', 'Married'), (103, 'Rahul', '1989-07-15', 'M', 'Manachai', 'Bachelor')]

for i in data:
    print(i)
(101, 'Sankaranarayanan', '1976-04-20', 'M', 'Bangalore', 'Married')
(102, 'Anbusudha', '1979-07-22', 'F', 'Bangalore', 'Married')
(103, 'Rahul', '1989-07-15', 'M', 'Manachai', 'Bachelor')

person_schema = st.StructType([
st.StructField('SNo', st.IntegerType(), True),
    st.StructField('Name', st.StringType(), True),
    st.StructField('DOB', st.StringType(), True),
    st.StructField('Gender', st.StringType(), True),
    st.StructField('City', st.StringType(), True),
    st.StructField('MarritalStatus', st.StringType(), True)
])

print(person_schema)

StructType(List(StructField(SNo,IntegerType,true),StructField(Name,StringType,true),StructField(DOB,StringType,true),StructField(Gender,StringType,true),StructField(City,StringType,true),StructField(MarritalStatus,StringType,true)))

user_df = spark.createDataFrame(data, person_schema)
 
user_df.printSchema()

root
 |-- SNo: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- DOB: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- City: string (nullable = true)
 |-- MarritalStatus: string (nullable = true)

user_df.show(5)

+---+----------------+----------+------+---------+--------------+
|SNo|            Name|       DOB|Gender|     City|MarritalStatus|
+---+----------------+----------+------+---------+--------------+
|101|Sankaranarayanan|1976-04-20|     M|Bangalore|       Married|
|102|       Anbusudha|1979-07-22|     F|Bangalore|       Married|
|103|           Rahul|1989-07-15|     M| Manachai|      Bachelor|
+---+----------------+----------+------+---------+--------------+ 

#Write the dataframe content into MySQL table
user_df.write.format("jdbc").\
option("url", "jdbc:mysql://localhost:3306/school").\
option("driver", "com.mysql.jdbc.Driver").\
option("dbtable", "user_table").\
option("user", "root").\
option("password", "Studi0Plus").save()
Goto Mysql :


use school;
show tables;
select * from user_table;

# SNo, Name, DOB, Gender, City, MarritalStatus
'103', 'Rahul', '1989-07-15', 'M', 'Manachai', 'Bachelor'
'102', 'Anbusudha', '1979-07-22', 'F', 'Bangalore', 'Married'
'101', 'Sankaranarayanan', '1976-04-20', 'M', 'Bangalore', 'Married'

Read Json and Write into MySQL table using PySpark

//Read from json

df_local = sqlContext.read.format("json").load("E:\\DataSets\\olympic.json")

df_local.printSchema()

root
 |-- age: long (nullable = true)
 |-- athelete: string (nullable = true)
 |-- bronze: long (nullable = true)
 |-- closing: string (nullable = true)
 |-- country: string (nullable = true)
 |-- gold: long (nullable = true)
 |-- silver: long (nullable = true)
 |-- sport: string (nullable = true)
 |-- total: long (nullable = true)
 |-- year: string (nullable = true)

df_local.show()

+---+--------------------+------+--------+-------------+----+------+--------------------+-----+----+
|age|            athelete|bronze| closing|      country|gold|silver|               sport|total|year|
+---+--------------------+------+--------+-------------+----+------+--------------------+-----+----+
| 30|      Inge de Bruijn|     2|08-29-04|  Netherlands|   1|     1|            Swimming|    4|2004|
| 24|         Ryan Lochte|     2|08-24-08|United States|   2|     0|            Swimming|    4|2008|
| 23|Libby Lenton-Tric...|     1|08-24-08|    Australia|   2|     1|            Swimming|    4|2008|
| 24|     Kirsty Coventry|     0|08-24-08|     Zimbabwe|   1|     3|            Swimming|    4|2008|
| 20|            Sun Yang|     1|08-12-12|        China|   2|     1|            Swimming|    4|2012|
| 29|       Marit Bjørgen|     1|02-28-10|       Norway|   3|     1|Cross Country Skiing|    5|2010|
| 18|       Nastia Liukin|     1|08-24-08|United States|   1|     3|          Gymnastics|    5|2008|
| 26|       Cindy Klassen|     2|02-26-06|       Canada|   1|     2|       Speed Skating|    5|2006|
| 33|         Dara Torres|     3|10-01-00|United States|   2|     0|            Swimming|    5|2000|
| 17|          Ian Thorpe|     0|10-01-00|    Australia|   3|     2|            Swimming|    5|2000|
| 21|    Natalie Coughlin|     1|08-29-04|United States|   2|     2|            Swimming|    5|2004|
| 22|     Allison Schmitt|     1|08-12-12|United States|   3|     1|            Swimming|    5|2012|
| 27|         Ryan Lochte|     1|08-12-12|United States|   2|     2|            Swimming|    5|2012|
| 17|      Missy Franklin|     1|08-12-12|United States|   4|     0|            Swimming|    5|2012|
| 24|       Alicia Coutts|     1|08-12-12|    Australia|   1|     3|            Swimming|    5|2012|
| 24|       Aleksey Nemov|     3|10-01-00|       Russia|   2|     1|          Gymnastics|    6|2000|
| 25|    Natalie Coughlin|     3|08-24-08|United States|   1|     2|            Swimming|    6|2008|
| 27|      Michael Phelps|     0|08-12-12|United States|   4|     2|            Swimming|    6|2012|
| 19|      Michael Phelps|     2|08-29-04|United States|   6|     0|            Swimming|    8|2004|
| 23|      Michael Phelps|     0|08-24-08|United States|   8|     0|            Swimming|    8|2008|
+---+--------------------+------+--------+-------------+----+------+--------------------+-----+----+

//Save dataframe into MySQL table
df_local.write.format("jdbc").\
option("url", "jdbc:mysql://localhost:3306/school").\
option("driver", "com.mysql.jdbc.Driver").\
option("dbtable", "olympic").\
option("user", "root").\
option("password", "Studi0Plus").save()
//Create Dataframe (Read) from MySQL
df_Olympic = sqlContext.read.format("jdbc").\
option("url", "jdbc:mysql://localhost:3306/school").\
option("driver", "com.mysql.jdbc.Driver").\
option("dbtable", "olympic").\
option("user", "root").\
option("password", "Studi0Plus").load()
df_Olympic.show(5)

+---+--------------------+------+--------+-------------+----+------+--------+-----+----+
|age|            athelete|bronze| closing|      country|gold|silver|   sport|total|year|
+---+--------------------+------+--------+-------------+----+------+--------+-----+----+
| 30|      Inge de Bruijn|     2|08-29-04|  Netherlands|   1|     1|Swimming|    4|2004|
| 24|         Ryan Lochte|     2|08-24-08|United States|   2|     0|Swimming|    4|2008|
| 23|Libby Lenton-Tric...|     1|08-24-08|    Australia|   2|     1|Swimming|    4|2008|
| 24|     Kirsty Coventry|     0|08-24-08|     Zimbabwe|   1|     3|Swimming|    4|2008|
| 20|            Sun Yang|     1|08-12-12|        China|   2|     1|Swimming|    4|2012|
+---+--------------------+------+--------+-------------+----+------+--------+-----+----+
only showing top 5 rows


df_Olympic.filter("country = 'United States' and sport='Swimming' and year=2008").show()


+---+----------------+------+--------+-------------+----+------+--------+-----+----+
|age|        athelete|bronze| closing|      country|gold|silver|   sport|total|year|
+---+----------------+------+--------+-------------+----+------+--------+-----+----+
| 24|     Ryan Lochte|     2|08-24-08|United States|   2|     0|Swimming|    4|2008|
| 25|Natalie Coughlin|     3|08-24-08|United States|   1|     2|Swimming|    6|2008|
| 23|  Michael Phelps|     0|08-24-08|United States|   8|     0|Swimming|    8|2008|
+---+----------------+------+--------+-------------+----+------+--------+-----+----+

Saturday, 16 May 2020

PySpark with MySQL Connection - Example

Download mysql connector and put it here : D\Spark\mysql folder

in windows:

D:\\Spark\\mysql\\mysql-connector-java-8.0.20.jar


Start pyspark with --jars option

pyspark --jars D:\\Spark\\mysql\\mysql-connector-java-8.0.20.jar


from pyspark import SparkConf, SparkContext, sql
from pyspark.sql import SparkSession

dataframe_mysql = sqlContext.read.format("jdbc").\
option("url", "jdbc:mysql://localhost:3306/school").\
option("driver", "com.mysql.jdbc.Driver").\
option("dbtable", "student").\
option("user", "root").
option("password", "Studi0Plus").load()
dataframe_mysql.show()

+----------+---------+---------+------------+
|rollnumber|firstname| lastname|        city|
+----------+---------+---------+------------+
|         1|  Sankara|Narayanan|   Pallathur|
|         3|   Rajini|    Kanth|   Bangalore|
|         4|    Vijay|   Balaji|   Bangalore|
|         5|     Anbu|    Sudha|Melasivapuri|
|         6|     Arun|    Vijay|     Chennai|
+----------+---------+---------+------------+


df.filter("city='Bangalore' and firstname='Rajini'").show(5)

+----------+---------+--------+---------+
|rollnumber|firstname|lastname|     city|
+----------+---------+--------+---------+
|         3|   Rajini|   Kanth|Bangalore|
+----------+---------+--------+---------+

in Ubuntu:

Start Pyspark with jars option

pyspark --jars /home/hadoop/apache-hive-3.1.2-bin/lib/mysql-connector-java-8.0.20.jar

from pyspark.sql import SparkSession
sparkdriver = SparkSession.builder.master("local").appName("demoApp").\
			  config("spark.jars.packages","mysql:mysql-connector-java-8.0.20").\
			  getOrCreate()
			  
df = sparkdriver.read.format("jdbc").\
	   option("url","jdbc:mysql://localhost:3306").\
	   option("driver","com.mysql.jdbc.Driver").\
	   option("user","hadoop").\
	   option("password","p@ssw0rd").\
	   option("query","select * from school.student").\
	   load()
	   
df.show()

+---+-----+---------+
| id| name|     city|
+---+-----+---------+
|100| Raja|Pallathur|
|101|Silva|Singapore|
+---+-----+---------+


Monday, 11 May 2020

Python with MySQL database programs

install python connector

python -m pip install mysql-connector

#verify the mysql connectivity
import mysql.connector
mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  passwd="p@ssw0rd"
)

print(mydb)

<mysql.connector.connection_cext.CMySQLConnection object at 0x000000E2CA88AB70>
#create a database
#display the databases
mycursor = mydb.cursor()
mycursor.execute("CREATE DATABASE School")
mycursor.execute("SHOW DATABASES")

for x in mycursor:
  print(x)
  
('employee',)
('information_schema',)
('mysql',)
('performance_schema',)
('sakila',)
('school',)
('sys',)
('world',)



import socket
socket.getaddrinfo('127.0.0.1', 3306)
ALTER USER 'root'@'localhost' IDENTIFIED BY 'p@ssw0rd' PASSWORD EXPIRE NEVER;
ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'p@ssw0rd';

#connect with School database 
#create a table
import mysql.connector
mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  passwd="p@ssw0rd",
  database="school"
)

mycursor = mydb.cursor()

mycursor.execute("CREATE TABLE Student (rollnumber int, firstname varchar(50), lastname varchar(50), city varchar(50))")




mycursor.execute("SHOW TABLES")

for x in mycursor:
  print(x)
  
('student',)

#Drop table
mycursor.execute("DROP TABLE Student")
for x in mycursor:
  print(x)
  
  
  
#create a table with auto increment column
import mysql.connector
mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  passwd="p@ssw0rd",
  database="school"
)

mycursor = mydb.cursor()
mycursor.execute("CREATE TABLE Student (rollnumber INT AUTO_INCREMENT PRIMARY KEY, firstname varchar(50), lastname varchar(50), city varchar(50))")
mycursor.execute("show tables")
for x in mycursor:
  print(x)
('student',)

mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  passwd="p@ssw0rd",
  database="school"
)

mycursor = mydb.cursor()

sql = "INSERT INTO student (firstname, lastname,city) VALUES (%s, %s, %s)"
val = ("Sankara","Narayanan","Pallathur")
mycursor.execute(sql, val)

mydb.commit()

print(mycursor.rowcount, "record inserted.")

1 record inserted.


mycursor = mydb.cursor()

sql = "INSERT INTO student (firstname, lastname,city) VALUES (%s, %s, %s)"
val = [
  ('Prathap','Pothan','திருவனந்தப்புரம்'),
  ('Rajini','Kanth','Bangalore'),
  ('Vijay', 'Balani','Bangalore'),
  ('Anbu','Sudha','Melasivapuri')
]

mycursor.executemany(sql, val)

mydb.commit()

print(mycursor.rowcount, "was inserted.")

4 was inserted.


Get Inserted identity ids:

sql = "INSERT INTO student (firstname, lastname,city) VALUES (%s, %s, %s)"
val = ("Arun","Vijay","Chennai")
mycursor.execute(sql, val)

mydb.commit()

print("1 record inserted, ID:", mycursor.lastrowid)

1 record inserted, ID: 6
select from table:

mycursor.execute("SELECT * FROM student")
myresult = mycursor.fetchall()
for x in myresult:
  print(x)
  
  
(1, 'Sankara', 'Narayanan', 'Pallathur')
(2, 'Prathap', 'Pothan', 'திருவனந்தப்புரம்')
(3, 'Rajini', 'Kanth', 'Bangalore')
(4, 'Vijay', 'Balani', 'Bangalore')
(5, 'Anbu', 'Sudha', 'Melasivapuri')
(6, 'Arun', 'Vijay', 'Chennai')


#select columns
mycursor.execute("SELECT rollnumber as RollNumber, concat(firstname, ' ' ,lastname) as Name, city as City FROM student")
myresult = mycursor.fetchall()
for x in myresult:
  print(x)
  
(1, 'Sankara Narayanan', 'Pallathur')
(2, 'Prathap Pothan', 'திருவனந்தப்புரம்')
(3, 'Rajini Kanth', 'Bangalore')
(4, 'Vijay Balani', 'Bangalore')
(5, 'Anbu Sudha', 'Melasivapuri')
(6, 'Arun Vijay', 'Chennai')


Fetch one record:

import mysql.connector

mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  passwd="p@ssw0rd",
  database="school"
)

mycursor = mydb.cursor()

mycursor.execute("SELECT * FROM Student")

myresult = mycursor.fetchone()

print(myresult)
myresult = mycursor.fetchone()

print(myresult)
myresult = mycursor.fetchone()

print(myresult)
myresult = mycursor.fetchone()

print(myresult)

(1, 'Sankara', 'Narayanan', 'Pallathur')
(2, 'Prathap', 'Pothan', 'திருவனந்தப்புரம்')
(3, 'Rajini', 'Kanth', 'Bangalore')
(4, 'Vijay', 'Balani', 'Bangalore')
#where condition
import mysql.connector

mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  passwd="p@ssw0rd",
  database="school"
)
mycursor = mydb.cursor()
sql = "SELECT * FROM student WHERE city ='Bangalore'"
mycursor.execute(sql)
myresult = mycursor.fetchall()
for x in myresult:
  print(x)
  
  
(3, 'Rajini', 'Kanth', 'Bangalore')
(4, 'Vijay', 'Balani', 'Bangalore')


Like Condition:

import mysql.connector

mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  passwd="p@ssw0rd",
  database="school"
)
mycursor = mydb.cursor()
sql = "SELECT * FROM student WHERE lastname LIKE '%an%'"
mycursor.execute(sql)
myresult = mycursor.fetchall()
for x in myresult:
  print(x)
  
    (1, 'Sankara', 'Narayanan', 'Pallathur')
(2, 'Prathap', 'Pothan', 'திருவனந்தப்புரம்')
(3, 'Rajini', 'Kanth', 'Bangalore')
(4, 'Vijay', 'Balani', 'Bangalore')
  
  
  
import mysql.connector

mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  passwd="p@ssw0rd",
  database="school"
)
mycursor = mydb.cursor()
sql = "SELECT * FROM student WHERE lastname LIKE '%an'"
mycursor.execute(sql)
myresult = mycursor.fetchall()
for x in myresult:
  print(x)
  
  (1, 'Sankara', 'Narayanan', 'Pallathur')
  (2, 'Prathap', 'Pothan', 'திருவனந்தப்புரம்')
  
  
 Prevent SQL Injection:
 
import mysql.connector

mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  passwd="p@ssw0rd",
  database="school"
)
mycursor = mydb.cursor()
sql = "SELECT * FROM student WHERE city = %s"
city = ("Bangalore", )
mycursor.execute(sql, city)
myresult = mycursor.fetchall()

for x in myresult:
  print(x)
  
  
  (3, 'Rajini', 'Kanth', 'Bangalore')
  (4, 'Vijay', 'Balani', 'Bangalore')
  
  
Order by example (ascending : default):
 
import mysql.connector

mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  passwd="p@ssw0rd",
  database="school"
)
mycursor = mydb.cursor()
sql = "SELECT * FROM student order by city"
mycursor.execute(sql)
myresult = mycursor.fetchall()

for x in myresult:
  print(x)
  
(3, 'Rajini', 'Kanth', 'Bangalore')
(4, 'Vijay', 'Balani', 'Bangalore')
(6, 'Arun', 'Vijay', 'Chennai')
(5, 'Anbu', 'Sudha', 'Melasivapuri')
(1, 'Sankara', 'Narayanan', 'Pallathur')
(2, 'Prathap', 'Pothan', 'திருவனந்தப்புரம்')



Order by descending example:

import mysql.connector

mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  passwd="p@ssw0rd",
  database="school"
)
mycursor = mydb.cursor()
sql = "SELECT * FROM student order by lastname desc"
mycursor.execute(sql)
myresult = mycursor.fetchall()

for x in myresult:
  print(x)
  
  
(6, 'Arun', 'Vijay', 'Chennai')
(5, 'Anbu', 'Sudha', 'Melasivapuri')
(2, 'Prathap', 'Pothan', 'திருவனந்தப்புரம்')
(1, 'Sankara', 'Narayanan', 'Pallathur')
(3, 'Rajini', 'Kanth', 'Bangalore')
(4, 'Vijay', 'Balani', 'Bangalore')



Delete a record:

import mysql.connector

mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  passwd="p@ssw0rd",
  database="school"
)
mycursor = mydb.cursor()

sql = "DELETE FROM student WHERE city = %s"
adr = ("திருவனந்தப்புரம்", )
mycursor.execute(sql, adr)
mydb.commit()
print(mycursor.rowcount, "record(s) deleted")

1 record(s) deleted


#drop a table

mycursor = mydb.cursor()
sql = "DROP TABLE student"
mycursor.execute(sql)


#Drop Only if Exist

mycursor = mydb.cursor()
sql = "DROP TABLE IF EXISTS student"
mycursor.execute(sql)



#update record with condition
#avoid sql injection

import mysql.connector

mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  passwd="p@ssw0rd",
  database="school"
)
mycursor = mydb.cursor()

sql = "UPDATE Student SET lastname = %s WHERE lastname = %s"
val = ("Balaji", "Balani")
mycursor.execute(sql, val)
mydb.commit()
print(mycursor.rowcount, "record(s) affected")

1 record(s) affected
#limit example

import mysql.connector

mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  passwd="p@ssw0rd",
  database="school"
)


mycursor = mydb.cursor()
mycursor.execute("SELECT * FROM Student LIMIT 5")
myresult = mycursor.fetchall()
for x in myresult:
  print(x)
  
(1, 'Sankara', 'Narayanan', 'Pallathur')
(3, 'Rajini', 'Kanth', 'Bangalore')
(4, 'Vijay', 'Balaji', 'Bangalore')
(5, 'Anbu', 'Sudha', 'Melasivapuri')
(6, 'Arun', 'Vijay', 'Chennai')
  
  
#limit with offset example

import mysql.connector

mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  passwd="p@ssw0rd",
  database="school"
)

mycursor = mydb.cursor()
mycursor.execute("SELECT * FROM Student LIMIT 3 offset 2")
myresult = mycursor.fetchall()
for x in myresult:
  print(x)
  
  
(4, 'Vijay', 'Balaji', 'Bangalore')
(5, 'Anbu', 'Sudha', 'Melasivapuri')
(6, 'Arun', 'Vijay', 'Chennai')

Saturday, 16 March 2019

Json Input To Kafka Broker to Spark Streaming to MySQL using KafkaProducer, kafkaUtils.CreateStream

//Json To Kafka Broker to Spark Streaming to MySQL
// Here we read a json input file from file system and put it into Kafka Broker
// Spark Streaming fetch messages from Kafka Broker and write it into MySQL table

input file:
nameList.json:
--------------
{"id":992,"name":"Herman","city":"Iles","country":"Colombia","Skills":"CVE"},
{"id":993,"name":"Burton","city":"Santo Tomas","country":"Philippines","Skills":"VMware vSphere"},
{"id":994,"name":"Correna","city":"Shirgjan","country":"Albania","Skills":"Wyse"},
{"id":995,"name":"Cathi","city":"Dorūd","country":"Iran","Skills":"SSCP"},
{"id":996,"name":"Lena","city":"Al Judayrah","country":"Palestinian Territory","Skills":"Commercial Kitchen Design"},
{"id":997,"name":"Madalena","city":"Livadiya","country":"Ukraine","Skills":"Software Development"},
{"id":998,"name":"Jo-anne","city":"Khatsyezhyna","country":"Belarus","Skills":"TPD"},
{"id":999,"name":"Georgi","city":"Pasuruan","country":"Indonesia","Skills":"Project Engineering"},
{"id":1000,"name":"Scott","city":"Gyumri","country":"Armenia","Skills":"RHEV"}



start zookeeper:
hadoop@hadoop:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties

start kafka server:
hadoop@hadoop:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties

hadoop@hadoop:/usr/local/kafka$ bin/kafka-topics.sh --create --topic jsonTopic --partitions 1 --replication-factor 1 --zookeeper localhost:2182

//view the topics available in Kafka Broker
hadoop@hadoop:/usr/local/kafka$  bin/kafka-topics.sh --list --zookeeper localhost:2182
jsonTopic



// create a database and table in MySQL:

 $ mysql -uroot -pcloudera

Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 4
Server version: 5.7.25-0ubuntu0.18.10.2 (Ubuntu)

Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> create database KafkaDB;
Query OK, 1 row affected (0.05 sec)

mysql> use KafkaDB;
Database changed

// create a table
mysql> create table jsonTable (id int, name varchar(50), city varchar(50), country varchar(50), Skills varchar(50));
Query OK, 0 rows affected (0.20 sec)





//Produce reads json file and publish them in kafka topic
Producer Programming in Scala:
-------------------------------
JsonProducer.scala:
-------------------------
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,ProducerRecord}

object JsonProducer {
  def main(args:Array[String]):Unit = {
    val props = new Properties()

    props.put("bootstrap.servers","localhost:9092")
    props.put("acks","all")
    props.put("client.id","ProducerApp")
    props.put("retries","4")
    props.put("batch.size","32768")
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    val topic = "jsonTopic"
    val producer = new KafkaProducer[String,String](props)
    val file = scala.io.Source.fromFile("/home/cloudera/Desktop/vow/nameList.json")

    for (line <- file.getLines()) {
      val msg = new ProducerRecord[String,String](topic,line)
      producer.send(msg)
    }
    producer.close()
  }
}


[cloudera@quickstart kafka]$ spark-shell
scala> sc.stop()
scala> :load JsonProducer.scala
scala> JsonProducer.main(null)




hadoop@hadoop:/usr/local/kafka$ bin/kafka-console-consumer.sh --topic jsonTopic --bootstrap-server localhost:9092 --from-beginning

{"id":1,"name":"Sharline","city":"Uppsala","country":"Sweden","Skills":"Eyelash Extensions"},
{"id":2,"name":"Marris","city":"São Domingos","country":"Cape Verde","Skills":"VMI Programs"},
{"id":3,"name":"Gregg","city":"Qaxbaş","country":"Azerbaijan","Skills":"Historical Research"},
{"id":4,"name":"Christye","city":"Guarapari","country":"Brazil","Skills":"Army"},
{"id":5,"name":"Modesta","city":"Paltamo","country":"Finland","Skills":"Avaya Technologies"},



Kafka2MySQLStreaming.scala:
-----------------------------
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.log4j.Logger
import org.apache.log4j.Level
import java.util.Properties
import org.apache.spark.sql.{SQLContext,SaveMode}
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.{SparkConf,SparkContext}

object Kafka2MySQLStreaming{
def main(args:Array[String]):Unit ={

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("SparkStreamingJson").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlc = new SQLContext(sc)
val batchInterval = 20
val zk = "localhost:2182"
val consumerGroupID = "jsonGroup"
val topic = "jsonTopic"
val partition = 1
val perTopicPartitions = Map(topic -> partition)
val ssc = new StreamingContext(sc,Seconds(batchInterval))

val KafkaData = KafkaUtils.createStream(ssc,zk,consumerGroupID,perTopicPartitions)
val ks = KafkaData.map (x => x._2)
ks.foreachRDD { x =>
val df = sqlc.read.json(x)

val props = new Properties()
props.put("driver","com.mysql.jdbc.Driver")
props.put("user","root")
props.put("password","cloudera")

df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/KafkaDB","jsonTable",props)
df.count()
}
ssc.start()
ssc.awaitTermination()
}
}

[cloudera@quickstart kafka]$ spark-shell
scala> sc.stop()
scala> :load Kafka2MySQLStreaming.scala
scala> Kafka2MySQLStreaming.main(null)



// MySQL result:
--------------
[cloudera@quickstart kafka]$ mysql -uroot -pcloudera
mysql> use KafkaDB;
mysql> select * from jsonTable;
+------+------------+--------------------------+-----------------------+-------------------------------+
| id   | name       | city                     | country               | Skills                        |
+------+------------+--------------------------+-----------------------+-------------------------------+
|    1 | Sharline   | Uppsala                  | Sweden                | Eyelash Extensions            |
|    2 | Marris     | São Domingos             | Cape Verde            | VMI Programs                  |
|    3 | Gregg      | Qaxbaş                   | Azerbaijan            | Historical Research           |
|    4 | Christye   | Guarapari                | Brazil                | Army                          |
|    5 | Modesta    | Paltamo                  | Finland               | Avaya Technologies            |

Wednesday, 27 February 2019

MySQL to RDD or Dataframe to Cassandra Integration

// MySQL Table creation with sample data insertion
hadoop@hadoop:~$ sudo mysql;

mysql> create database store;
Query OK, 1 row affected (0.00 sec)

mysql> use store;
Database changed

mysql> create table customer(id int primary key, name varchar(50), city varchar(50));
Query OK, 0 rows affected (0.03 sec)

mysql> insert into customer (id,name,city) values (100,'Sara','Karaikudi');
Query OK, 1 row affected (0.08 sec)

mysql> insert into customer (id,name,city) values (101,'Lara','Manachai');
Query OK, 1 row affected (0.02 sec)

mysql> insert into customer (id,name,city) values (102,'Kalai','Vadagudi');
Query OK, 1 row affected (0.00 sec)

mysql> select * from customer;
+-----+-------+-----------+
| id  | name  | city      |
+-----+-------+-----------+
| 100 | Sara  | Karaikudi |
| 101 | Lara  | Manachai  |
| 102 | Kalai | Vadagudi  |
+-----+-------+-----------+
3 rows in set (0.00 sec)


Create KeySpace and Table in Cassandra : Don't insert any rows there
--------------------------------------------------------------------
$ sudo service cassandra start

$ sudo update-rc.d cassandra defaults


//start CLI for Cassandra
$ cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh>  describe cluster;

Cluster: Test Cluster
Partitioner: Murmur3Partitioner

cqlsh> CREATE KEYSPACE store with replication = {'class':'SimpleStrategy','replication_factor':1};

cqlsh> use store;

cqlsh:store>  CREATE TABLE customer (id int,name varchar, city varchar, primary key (id));


         

// Spark Shell ->

$ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 --conf spark.cassandra.connection.host=localhost


val dfcustomer = spark.read.format("jdbc").
      option("driver","com.mysql.jdbc.Driver").
      option("url","jdbc:mysql://localhost:3306").
      option("dbtable","store.customer").
      option("user","hadoop").
      option("password","hadoop").
      load()
 
dfcustomer.show()

+---+-----+---------+
| id| name|     city|
+---+-----+---------+
|100| Sara|Karaikudi|
|101| Lara| Manachai|
|102|Kalai| Vadagudi|
+---+-----+---------+

dfcustomer.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "customer", "keyspace" -> "store")).save()


// Here we verify the Cassandra data:
cqlsh:store> select * from customer;

 id  | city      | name
-----+-----------+-------
 100 | Karaikudi |  Sara
 102 |  Vadagudi | Kalai
 101 |  Manachai |  Lara

// delete all records
cqlsh:store> delete from customer where id in (100,101,102);


cqlsh:store> select * from customer;

 id | city | name
----+------+------


// Here we make RDD from Dataframe and save RDD + Case Class Schema => Cassandra Table
scala> case class customer(id:Int, name:String, city:String);
defined class customer

scala> val r1 = dfcustomer.rdd
r1: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at rdd at <console>:28

scala> r1.collect.foreach(println)
[100,Sara,Karaikudi]
[101,Lara,Manachai]
[102,Kalai,Vadagudi]

scala> val r = r1.map (x => {
     |        val id = x(0).toString.toInt
     |        val name = x(1).toString
     |        val city = x(2).toString
     |        customer(id,name,city)
     |        })
r: org.apache.spark.rdd.RDD[customer] = MapPartitionsRDD[10] at map at <console>:30

scala> r.collect.foreach(println)
customer(100,Sara,Karaikudi)
customer(101,Lara,Manachai)
customer(102,Kalai,Vadagudi)

// here we write RDD into Cassandra Table
scala> r.saveToCassandra("store","customer");


// Here we verify the Cassandra data:
cqlsh:store> select * from customer;

 id  | city      | name
-----+-----------+-------
 100 | Karaikudi |  Sara
 102 |  Vadagudi | Kalai
 101 |  Manachai |  Lara


----+------+------

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...