Friday, 31 July 2020

Import a table from MySQL to Hbase using SQOOP Import

// Import into Hbase
sqoop import \
-connect jdbc:mysql://localhost:3306/ohm \
-driver com.mysql.jdbc.Driver \
-username root \
-password cloudera \
-table employee \
-hbase-create-table \
-hbase-table employee_details \
-column-family employees \
-hbase-row-key id -m 1

// single line statement for the same
sqoop import -connect jdbc:mysql://localhost:3306/ohm -driver com.mysql.jdbc.Driver -username root -password cloudera -table employee -hbase-create-table -hbase-table employee_details -column-family employees -hbase-row-key id -m 1

[cloudera@quickstart ~]$ hbase shell

scan 'employee-details'
// it will display all columnar view

Import a table from MySQL to Hive using SQOOP Import

// Start MySQL 
$ sudo mysql -uroot -pcloudera

mysql> create database ohm;
Query OK, 1 row affected (0.00 sec)

mysql> use ohm;
Database changed


CREATE TABLE employee 
(
id INT,
first_name VARCHAR(100),
last_name VARCHAR(100),
gender VARCHAR(10),
designation VARCHAR(20),
city VARCHAR(20),
country VARCHAR(20)
);


INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (1, 'Jervis', 'Roll', 'Male', 'Director of Sales', 'Thi Tran Lac', 'Vietnam');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (2, 'Gordon', 'Maltster', 'Male', 'Marketing Manager', 'Mabu', 'China');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (3, 'Griff', 'Godsafe', 'Male', 'Actuary', 'Kipit', 'Philippines');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (4, 'Gracie', 'Franken', 'Female', 'Assistant Manager', 'Xiabuji', 'China');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (5, 'Joelly', 'Wellbank', 'Female', 'Account Coordinator', 'Whitehorse', 'Canada');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (6, 'Bab', 'Havock', 'Female', 'Accountant II', 'Basyūn', 'Egypt');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (7, 'Carmine', 'Courage', 'Female', 'Account Coordinator', 'Boyeros', 'Cuba');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (8, 'Estella', 'Marvell', 'Female', 'Structural Analysis Engineer', 'Stettler', 'Canada');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (9, 'Celie', 'Trevaskiss', 'Female', 'Assistant Manager', 'Criuleni', 'Moldova');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (10, 'Madison', 'Ranyell', 'Male', 'Research Associate', 'Angatel', 'Philippines');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (11, 'Haydon', 'Faughey', 'Male', 'Safety Technician IV', 'Masalovka', 'Russia');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (12, 'Michele', 'Zarfai', 'Male', 'Legal Assistant', 'Karatau', 'Kazakhstan');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (13, 'Ruthi', 'Bowmer', 'Female', 'Analog Circuit Design manager', 'Peski', 'Russia');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (14, 'Adolphus', 'Pickthorne', 'Male', 'Senior Developer', 'Mae Fa Luang', 'Thailand');
INSERT INTO employee (id, first_name, last_name, gender, designation, city, country) VALUES (15, 'Kat', 'Dymocke', 'Female', 'Geological Engineer', 'Markópoulo Oropoú', 'Greece');


// Display databases;
$ sqoop list-databases -connect jdbc:mysql://localhost -username root -password cloudera;

information_schema
cm
firehose
hue
metastore
mysql
nav
navms
ohm
oozie
retail_db
rman

// Display tables
$ sqoop list-tables -connect jdbc:mysql://localhost/ohm -username root -password cloudera;
employee
person

// Create a table in Hive
hive> create database ohm;
OK
Time taken: 2.041 seconds
hive> show tables;
OK
Time taken: 0.244 seconds


// Import a table from Mysql To Hive
sqoop import \
-connect jdbc:mysql://localhost:3306/ohm \
-driver com.mysql.jdbc.Driver \
-username root \
-password cloudera \
-table employee \
-hive-import \
-split-by id \
-hive-table ohm.employee

 
hive> use ohm;
OK
Time taken: 0.017 seconds
hive> show tables;
OK
employee
Time taken: 0.029 seconds, Fetched: 1 row(s)
hive> select * from employee;
OK
1 Jervis Roll Male Director of Sales Thi Tran Lac Vietnam
2 Gordon Maltster Male Marketing Manager Mabu China
3 Griff Godsafe Male Actuary Kipit Philippines
4 Gracie Franken Female Assistant Manager Xiabuji China
5 Joelly Wellbank Female Account Coordinator Whitehorse Canada
6 Bab Havock Female Accountant II Basyūn Egypt
10 Madison Ranyell Male Research Associate Angatel Philippines
11 Haydon Faughey Male Safety Technician IV Masalovka Russia
12 Michele Zarfai Male Legal Assistant Karatau Kazakhstan
13 Ruthi Bowmer Female Analog Circuit Desig Peski Russia
14 Adolphus Pickthorne Male Senior Developer Mae Fa Luang Thailand
15 Kat Dymocke Female Geological Engineer Markópoulo Oropoú Greece
Time taken: 0.92 seconds, Fetched: 12 row(s)


// Verify the record count in MySQL, HDFS and Hive
mysql> use ohm;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> select count(1) from employee;
+----------+
| count(1) |
+----------+
|       12 |
+----------+
1 row in set (0.00 sec)



// do record count from all the files in warehouse 
hdfs dfs -cat /user/hive/warehouse/ohm.db/employee/* | wc -l
12

// do record count from the table
hive> select count(1) from employee;
OK
12


Thursday, 30 July 2020

Scenario based Spark Programs


#1. Get all the employees whose year of joining is between 2017 and 2018 and salary is >50.

Customer table

cust_id cust_name yoj
1 X 2019-01-01 12:02:00.0
2 y 2018-01-01 12:02:00.0
3 Z 2017-01-01 12:02:00.0
Department table

id amount
1 100
1 200


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._


val sparkSession=SparkSession
.builder()
.master("local[*]")
.appName("Interview Questions")
.getOrCreate()
import sparkSession.implicits._

val customerdf=Seq((1,"X","2019-01-01 12:02:00.0")
,(2,"y","2018-01-01 12:02:00.0")
,(3,"Z","2017-01-01 12:02:00.0")
,(4,"F","2017-01-01 12:02:00.0")
,(5,"G","2017-01-01 10:02:00.0")).toDF("cust_id","cust_name","yoj")
 
val deptdf=Seq((1,100)
,(1,200)
,(2,300)
,(3,50)
,(4,50)).toDF("id","amount")
 
val cust= customerdf.withColumn("datetime", date_format(col("yoj").cast(TimestampType), "yyyy-MM-dd HH:mm:ss").cast(TimestampType))
val filteredcust=cust.filter(year($"yoj").between(2017,2019))//.show

val filtereddept=deptdf.filter($"amount">50)
val result=filteredcust.join(filtereddept,col("cust_id")===col("id"),"inner").show(truncate = false)

+-------+---------+---------------------+-------------------+---+------+
|cust_id|cust_name|yoj                  |datetime           |id |amount|
+-------+---------+---------------------+-------------------+---+------+
|1      |X        |2019-01-01 12:02:00.0|2019-01-01 12:02:00|1  |200   |
|1      |X        |2019-01-01 12:02:00.0|2019-01-01 12:02:00|1  |100   |
|2      |y        |2018-01-01 12:02:00.0|2018-01-01 12:02:00|2  |300   |
+-------+---------+---------------------+-------------------+---+------+

#2. Count the number of people residing in each city, there are 2 tables employee(empid,name,citycode) and city(citycode,cityname)
Employee table :

empid empname empcitycode
1 a 1
2 b 2
3 c 1
4 d 1
5 e 3
City Table :

CityCode CityName
1 delhi
2 punjab
3 mumbai

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._


val empdf = Seq((1, "a", 1), 
(2, "b", 2), 
(3, "c", 1), 
(4, "d", 1), 
(5, "e", 3)).toDF("empid", "empname", "empcitycode")
val citydf = Seq((1, "delhi"), 
(2, "punjab"), 
(3, "mumbai")).toDF("CityCode", "CityName")
val joineddf = empdf.join(citydf, $"empcitycode" === $"CityCode", "inner")
//DataframesWay

val window = Window.partitionBy($"empcitycode")
val result = joineddf.withColumn("citywiseresult", count($"empid") over window).select("CityName", "citywiseresult").distinct.show

+--------+--------------+
|CityName|citywiseresult|
+--------+--------------+
|   delhi|             3|
|  mumbai|             1|
|  punjab|             1|
+--------+--------------+




#3. Get all the records from left table which are not present in right table
Table Left :

id left
0 zero
1 one
Table Right :

id right
0 zero
2 two
3 three

val left = Seq((0, "zero"), 
   (1, "one")).toDF("id", "left")
   
val right = Seq((0, "zero"), 
(2, "two"), 
(3, "three")).toDF("id", "right")
left.join(right,Seq("id"),"left_anti").show


+---+----+
| id|left|
+---+----+
|  1| one|
+---+----+


RDD Approach:

#4. Find avg salary department wise using rdd approach
Table Dept :

DEPT EMP_NAME SALARY
1 RAM 10
2 Shyam 30
1 Vijay 10

val inputRdd = sc.parallelize(Seq("1,'RAM',10",
  "2,'Shyam',30",
  "1,'Vijay',10"))
inputRdd.map(record=>record.split(",")).map(record=>(record(0).toInt,(record(2).toInt,1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>x._1/x._2).collect.foreach(println)

(1,10)
(2,30)



#5. Find the lowest salary in each department
Table Dept :

id amount
1 100
1 200
2 300
2 50
3 50

import sparkSession.implicits._
val df=Seq((1,100),
   (1,200),
   (2,300),
   (2,50),
   (3,50)).toDF("id","amount")
 //Dataframes way
 val window=Window.partitionBy($"id").orderBy($"amount".asc,$"id".asc)
 val df1=df.withColumn("row_number",row_number().over(window))
 df1.select("id","amount").where("row_number=1").show()
 
 
 +---+------+
| id|amount|
+---+------+
|  1|   100|
|  3|    50|
|  2|    50|
+---+------+

Wednesday, 29 July 2020

Remove Leading and Trailing spaces for String columns in a dataframe in Spark with Scala

Remove Leading and Trailing spaces for String columns in a dataframe in Spark with Scala

Id,Name,Age
100,Anbu     ,55
101,     Siva,33
100,Anbu     ,55
102,    Kalai    ,33
103,    Arivu   ,22
101,   Siva    ,33
104,    Ashok    ,52
105, Priya    ,22



scala> val dfC = spark.read.format("csv").option("header","True").option("inferSchema","True").load("D:\\Ex\\My\\tblC.txt")
dfC: org.apache.spark.sql.DataFrame = [Id: int, Name: string ... 1 more field]

// Have a look @ here. Name column has Leading and Trailing spaces.
scala> dfC.show()
+---+-------------+---+
| Id|         Name|Age|
+---+-------------+---+
|100|    Anbu     | 55|
|101|         Siva| 33|
|100|    Anbu     | 55|
|102|    Kalai    | 33|
|103|     Arivu   | 22|
|101|     Siva    | 33|
|104|    Ashok    | 52|
|105|    Priya    | 22|
+---+-------------+---+

//option("ignoreLeadingWhiteSpace", "true").option("ignoreTrailingWhiteSpace", "true")
scala> val dfC = spark.read.format("csv").option("header","True").option("inferSchema","True").option("ignoreLeadingWhiteSpace", "true").option("ignoreTrailingWhiteSpace", "true").load("D:\\Ex\\My\\tblC.txt")
dfC: org.apache.spark.sql.DataFrame = [Id: int, Name: string ... 1 more field]


// Here it looks good
scala> dfC.show()
+---+-----+---+
| Id| Name|Age|
+---+-----+---+
|100| Anbu| 55|
|101| Siva| 33|
|100| Anbu| 55|
|102|Kalai| 33|
|103|Arivu| 22|
|101| Siva| 33|
|104|Ashok| 52|
|105|Priya| 22|
+---+-----+---+

Longest word of a given String in Python

def longest_word(str):
words = str.split()
max_len = len(max(words, key=len))
return [word for word in words if len(word) == max_len]

print(longest_word("I have an interview Scheduled today in Bangalore"))
['interview', 'Scheduled', 'Bangalore']

print(longest_word("Do you want me to do something?"))
['something?']

Remove Duplicates in Dataframe | Distinct, DropDuplicates Examples in Spark

tblC:

Id,Name,Age
100,Anbu,55
101,Siva,33
100,Anbu,55
102,Kalai,33
103,Arivu,22
101,Siva,33
104,Ashok,52
105,Priya,22


scala> val dfC = spark.read.format("csv").option("header","True").option("inferSchema","True").load("D:\\Ex\\My\\tblC.txt")
dfC: org.apache.spark.sql.DataFrame = [Id: int, Name: string ... 1 more field]

scala> dfC.printSchema()
root
 |-- Id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)


scala> dfC.show()
+---+-----+---+
| Id| Name|Age|
+---+-----+---+
|100| Anbu| 55|   - 
|101| Siva| 33|   -
|100| Anbu| 55|   -
|102|Kalai| 33|
|103|Arivu| 22|
|101| Siva| 33|   -
|104|Ashok| 52|
|105|Priya| 22|
+---+-----+---+



scala> dfC.distinct.show()
+---+-----+---+
| Id| Name|Age|
+---+-----+---+
|104|Ashok| 52|
|101| Siva| 33|
|105|Priya| 22|
|102|Kalai| 33|
|100| Anbu| 55|
|103|Arivu| 22|
+---+-----+---+

// Drop Duplicates
scala> val dfNoDuplicates = dfC.dropDuplicates()
dfNoDuplicates: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Id: int, Name: string ... 1 more field]

scala> dfNoDuplicates.show()
+---+-----+---+
| Id| Name|Age|
+---+-----+---+
|104|Ashok| 52|
|101| Siva| 33|
|105|Priya| 22|
|102|Kalai| 33|
|100| Anbu| 55|
|103|Arivu| 22|
+---+-----+---+

Find the difference between 2 dataframes in Spark with Scala

tblA:

100,Raja,55
101,Ravi,44
102,Kumar,33
104,Siva,22
105,Malar,32


tblB:

100,Raja,55
102,Kumar,33
105,Malar,32



scala> val dfA = spark.read.format("csv").option("inferSchema","True").option("header","True").load("D:\\Ex\\My\\tblA.txt")
dfA: org.apache.spark.sql.DataFrame = [Id: int, Name: string ... 1 more field]

scala> dfA.printSchema()
root
 |-- Id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 
scala> dfA.show()
+---+-----+---+
| Id| Name|Age|
+---+-----+---+
|100| Raja| 55|
|101| Ravi| 44|
|102|Kumar| 33|
|104| Siva| 22|
|105|Malar| 32|
+---+-----+---+
 
 
scala> val dfB = spark.read.format("csv").option("inferSchema","True").option("header","True").load("D:\\Ex\\My\\tblB.txt")
dfB: org.apache.spark.sql.DataFrame = [Id: int, Name: string ... 1 more field]

scala> dfB.printSchema()
root
 |-- Id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)


scala> dfB.show()
+---+-----+---+
| Id| Name|Age|
+---+-----+---+
|100| Raja| 55|
|102|Kumar| 33|
|105|Malar| 32|
+---+-----+---+


// Here is the answer
scala> dfA.exceptAll(dfB).show()
+---+----+---+
| Id|Name|Age|
+---+----+---+
|101|Ravi| 44|
|104|Siva| 22|
+---+----+---+


scala> dfB.exceptAll(dfA).show()
+---+----+---+
| Id|Name|Age|
+---+----+---+
+---+----+---+ 

//One more solution
scala> dfA.unionAll(dfB).except(dfA.intersect(dfB)).show()
warning: there was one deprecation warning; re-run with -deprecation for details
+---+----+---+
| Id|Name|Age|
+---+----+---+
|101|Ravi| 44|
|104|Siva| 22|
+---+----+---+

Sunday, 26 July 2020

SparkConf Example using Spark with Scala

//Without SparkConf

package myPackage
import org.apache.spark.sql.SparkSession
object my1st {
  def main(args:Array[String]) : Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("Ex")
      .getOrCreate()
    val df = spark.read.format("json").load("D:/Ex/Spark-Tutorials-master/data/people.json")
    df.printSchema()
    df.show()
  }
}




//With SparkConf
package myPackage
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object my1st {
  def main(args:Array[String]) : Unit = {
    val sparkAppConf = new SparkConf()
    sparkAppConf.set("spark.app.name","Ex")
    sparkAppConf.set("spark.master","local[3]")

    val spark = SparkSession.builder()
      .config(sparkAppConf)
      .getOrCreate()

    val df = spark.read.format("json").load("D:/Ex/Spark-Tutorials-master/data/people.json")
    df.printSchema()
    df.show()
  }
}

Wednesday, 22 July 2020

Column into Rows : Explode Example Spark with Scala


hadoop@hadoop:~/Downloads$ hdfs dfs -cat hdfs://localhost:9000/SparkFiles/inputwithCommas.csv
Name|Age|Education
Sankar|44|DEEE,BCA,Msc(IT)
Anbu|42|BLitt,MA,MPhil,B.Ed
Veeraiah|43|Bsc
Naga|33|DME,BE


scala> val df = spark.read.option("delimiter","|").option("header","True").format("csv").load("hdfs://localhost:9000/SparkFiles/inputwithCommas.csv")
df: org.apache.spark.sql.DataFrame = [Name: string, Age: string ... 1 more field]

scala> df.show()
+--------+---+-------------------+
|    Name|Age|          Education|
+--------+---+-------------------+
|  Sankar| 44|   DEEE,BCA,Msc(IT)|
|    Anbu| 42|BLitt,MA,MPhil,B.Ed|
|Veeraiah| 43|                Bsc|
|    Naga| 33|             DME,BE|
+--------+---+-------------------+

// Explode :: split comma comma into multiple rows
scala> val df1 = df.withColumn("Qualification",explode(split(col("Education"),",")))
df1: org.apache.spark.sql.DataFrame = [Name: string, Age: string ... 2 more fields]

scala> df1.show()
+--------+---+-------------------+-------------+
|    Name|Age|          Education|Qualification|
+--------+---+-------------------+-------------+
|  Sankar| 44|   DEEE,BCA,Msc(IT)|         DEEE|
|  Sankar| 44|   DEEE,BCA,Msc(IT)|          BCA|
|  Sankar| 44|   DEEE,BCA,Msc(IT)|      Msc(IT)|
|    Anbu| 42|BLitt,MA,MPhil,B.Ed|        BLitt|
|    Anbu| 42|BLitt,MA,MPhil,B.Ed|           MA|
|    Anbu| 42|BLitt,MA,MPhil,B.Ed|        MPhil|
|    Anbu| 42|BLitt,MA,MPhil,B.Ed|         B.Ed|
|Veeraiah| 43|                Bsc|          Bsc|
|    Naga| 33|             DME,BE|          DME|
|    Naga| 33|             DME,BE|           BE|
+--------+---+-------------------+-------------+

// drop an existing column
scala> val df2 = df1.drop(col("Education"))
df2: org.apache.spark.sql.DataFrame = [Name: string, Age: string ... 1 more field]

scala> df2.show()
+--------+---+-------------+
|    Name|Age|Qualification|
+--------+---+-------------+
|  Sankar| 44|         DEEE|
|  Sankar| 44|          BCA|
|  Sankar| 44|      Msc(IT)|
|    Anbu| 42|        BLitt|
|    Anbu| 42|           MA|
|    Anbu| 42|        MPhil|
|    Anbu| 42|         B.Ed|
|Veeraiah| 43|          Bsc|
|    Naga| 33|          DME|
|    Naga| 33|           BE|
+--------+---+-------------+

Pivot Example program in Spark using Scala

$ hdfs dfs -cat /SparkFiles/pivotexa.csv
Id,Subject,Marks
100,English,80
100,Physics,66
100,Maths,68
101,English,81
101,Physics,63
101,Maths,64
102,English,80
102,Physics,65
102,Maths,66
103,English,90
103,Physics,91
103,Maths,92


scala> val df = spark.read.format("csv").option("delimiter",",").option("header","True").option("inferSchema","True").load("hdfs://localhost:9000/SparkFiles/pivotexa.csv")

scala> df.printSchema()
root
 |-- Id: integer (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Marks: integer (nullable = true)


scala> df.show()
+---+-------+-----+
| Id|Subject|Marks|
+---+-------+-----+
|100|English|   80|
|100|Physics|   66|
|100|  Maths|   68|
|101|English|   81|
|101|Physics|   63|
|101|  Maths|   64|
|102|English|   80|
|102|Physics|   65|
|102|  Maths|   66|
|103|English|   90|
|103|Physics|   91|
|103|  Maths|   92|
+---+-------+-----+


// Pivot process goes here
scala> val df1 = df.groupBy("Id").pivot("Subject").max("Marks")
df1: org.apache.spark.sql.DataFrame = [Id: int, English: int ... 2 more fields]

scala> df1.show()
+---+-------+-----+-------+                                                     
| Id|English|Maths|Physics|
+---+-------+-----+-------+
|100|     80|   68|     66|
|101|     81|   64|     63|
|102|     80|   66|     65|
|103|     90|   92|     91|
+---+-------+-----+-------+


scala> val df2 = df1.withColumn("Total",col("English") + col("Maths") + col("Physics"))
df2: org.apache.spark.sql.DataFrame = [Id: int, English: int ... 3 more fields]

scala> df2.show()
+---+-------+-----+-------+-----+
| Id|English|Maths|Physics|Total|
+---+-------+-----+-------+-----+
|100|     80|   68|     66|  214|
|101|     81|   64|     63|  208|
|102|     80|   66|     65|  211|
|103|     90|   92|     91|  273|
+---+-------+-----+-------+-----+

Sunday, 19 July 2020

How to read multi sheet excel file using Python?

$ pip3 install pandas  // install pandas
$ pip3 install xlrd // install excel reading package

>>> import pandas as pd
>>> xls = pd.ExcelFile('/home/hadoop/Downloads/multisheets.xlsx')  // read excel file
>>> df1 = pd.read_excel(xls, 'first')  // read particular sheet
>>> df2 = pd.read_excel(xls, 'second')
>>> df3 = pd.read_excel(xls, 'third')

>>> print(df1)  // display the content of python dataframe
    Ravi  22    Chennai
    Rahul  23  Bengaluru

>>> print(df2)
  selvi  32 Chennai
  Usha  38  Singai

>>> print(df3)
  Ayush   5   Bengaluru
  Ram  50  Aranthangi


The content of multi sheets excel file is given below:

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...