Friday, 5 April 2019

Hive Queries Vs Dataframe Queries - Part 1

hadoop@hadoop:~/Desktop/vow$ touch emp.txt
hadoop@hadoop:~/Desktop/vow$ atom emp.txt

101,Sathya,1000
102,Shanthi,2000
103,Mani,3000
104,Kalai,4000
105,Aruvi,5000
106,Nila,1500
107,Praveen,2500
108,Rashee,7500
109,Pinki,3500
110,Ravi,2500

pwd : /home/hadoop/Desktop/vow


hive> create database learning;
OK
Time taken: 0.901 seconds

hive> use learning;
OK
Time taken: 0.08 seconds



hive> create external table emp(id int, name varchar(50), salary int) row format delimited fields terminated by ',';

hive> load data local inpath '/home/hadoop/Desktop/vow/emp.txt' into table emp;


hive> select * from emp;
OK
101 Sathya 1000
102 Shanthi 2000
103 Mani 3000
104 Kalai 4000
105 Aruvi 5000
106 Nila 1500
107 Praveen 2500
108 Rashee 7500
109 Pinki 3500
110 Ravi 2500
Time taken: 0.305 seconds, Fetched: 10 row(s)


scala> val empSchema = StructType(StructField("id",IntegerType,true)::StructField("name",StringType,true)::StructField("salary",IntegerType,true)::Nil)
empSchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(salary,IntegerType,true))

scala> val df = spark.read.format("csv").option("header","false").schema(empSchema).load("/home/hadoop/Desktop/vow/emp.txt");
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df.printSchema
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: integer (nullable = true)


hive> select * from emp;
OK
101 Sathya 1000
102 Shanthi 2000
103 Mani 3000
104 Kalai 4000
105 Aruvi 5000
106 Nila 1500
107 Praveen 2500
108 Rashee 7500
109 Pinki 3500
110 Ravi 2500

scala> df.show
+---+-------+------+
| id|   name|salary|
+---+-------+------+
|101| Sathya|  1000|
|102|Shanthi|  2000|
|103|   Mani|  3000|
|104|  Kalai|  4000|
|105|  Aruvi|  5000|
|106|   Nila|  1500|
|107|Praveen|  2500|
|108| Rashee|  7500|
|109|  Pinki|  3500|
|110|   Ravi|  2500|
+---+-------+------+


hive> select max(salary) from emp;
7500

scala> df.select(max(df("salary")) as "Salary").show
or
scala> df.select(max($"salary") as "Salary").show

+------+
|Salary|
+------+
|  7500|
+------+

hive> select max(salary),min(salary) from emp;
OK
7500 1000

scala> df.select(max(df("salary")) as "MaxSal", min(df("salary")) as "MinSal").show
df.select(max($"salary") as "MaxSal",min($"salary") as "MinSal").show
+------+------+
|MaxSal|MinSal|
+------+------+
|  7500|  1000|
+------+------+


hive> select salary from emp order by salary;
OK
1000
1500
2000
2500
2500
3000
3500
4000
5000
7500

scala> df.select(df("salary")).orderBy("salary").show
df.select($"salary").orderBy($"salary").show
+------+
|salary|
+------+
|  1000|
|  1500|
|  2000|
|  2500|
|  2500|
|  3000|
|  3500|
|  4000|
|  5000|
|  7500|
+------+

hive> select salary from emp order by salary desc;
OK
7500
5000
4000
3500
3000
2500
2500
2000
1500
1000

import org.apache.spark.sql.functions._

scala> df.select(df("salary")).orderBy(desc("salary")).show
or
scala> df.select($"salary").orderBy($"salary".desc).show
+------+
|salary|
+------+
|  7500|
|  5000|
|  4000|
|  3500|
|  3000|
|  2500|
|  2500|
|  2000|
|  1500|
|  1000|
+------+



hive> select sum(salary) from emp;
OK
32500


scala> df.select(sum("salary") as "Sum").show
or
scala> df.select(sum($"salary") as "Sum").show

+-----+
|  Sum|
+-----+
|32500|
+-----+



hadoop@hadoop:~$ touch emp.txt
hadoop@hadoop:~$ atom emp.txt

id,name,gender,salary,deptid
100,Ravi,m,1000,10
101,Rani,f,2000,11
102,Suresh,m,3000,12
103,Rahul,m,1250,10
104,Rashee,f,3500,11
105,Priya,f,3600,12
106,Ayeesha,f,4000,10
107,Aruvi,f,2500,11
108,Arushi,f,2800,12
109,Vinay,m,3200,10
110,Kalai,f,2550,11
111,Shilpa,f,2600,12

hadoop@hadoop:~$ atom dept.txt
hadoop@hadoop:~$ atom dept.txt

deptid,deptname
10,Marketing
11,Sales
12,Production

//  tblproperties("skip.header.line.count"="1");  --> which skips the header line

hive> create external table emp(id int, name varchar(50),gender char(1), salary int, deptid int) row format delimited fields terminated by ',' tblproperties("skip.header.line.count"="1");


hive> load data local inpath "/home/hadoop/Desktop/vow/emp.txt" into table emp;





hive> create external table dept(deptid int, deptname varchar(50)) row format delimited fields terminated by ',' tblproperties("skip.header.line.count"="1");

hive> load data local inpath "/home/hadoop/Desktop/vow/dept.txt" into table dept;




scala> val empSchema = StructType(StructField("id",IntegerType,true)::StructField("name",StringType,true)::StructField("gender",StringType,true)::StructField("salary",IntegerType,true)::StructField("deptid",IntegerType,true)::Nil)
empSchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(gender,StringType,true), StructField(salary,IntegerType,true), StructField(deptid,IntegerType,true))

scala> scala> val deptSchema = StructType(StructField("deptid",IntegerType,true)::StructField("deptname",StringType,true)::Nil)
deptSchema: org.apache.spark.sql.types.StructType = StructType(StructField(deptid,IntegerType,true), StructField(deptname,StringType,true))


 val dfEmp  = spark.read.format("csv").option("header","true").schema(empSchema).load("/home/hadoop/Desktop/vow/emp.txt");

 val dfDept = spark.read.format("csv").option("header","true").schema(deptSchema).load("/home/hadoop/Desktop/vow/dept.txt");

 hive> select * from emp;
OK
100 Ravi m 1000 10
101 Rani f 2000 11
102 Suresh m 3000 12
103 Rahul m 1250 10
104 Rashee f 3500 11
105 Priya f 3600 12
106 Ayeesha f 4000 10
107 Aruvi f 2500 11
108 Arushi f 2800 12
109 Vinay m 3200 10
110 Kalai f 2550 11
111 Shilpa f 2600 12

 scala> dfEmp.show
 or
 scala> dfEmp.select("*").show

+---+-------+------+------+------+
| id|   name|gender|salary|deptid|
+---+-------+------+------+------+
|100|   Ravi|     m|  1000|    10|
|101|   Rani|     f|  2000|    11|
|102| Suresh|     m|  3000|    12|
|103|  Rahul|     m|  1250|    10|
|104| Rashee|     f|  3500|    11|
|105|  Priya|     f|  3600|    12|
|106|Ayeesha|     f|  4000|    10|
|107|  Aruvi|     f|  2500|    11|
|108| Arushi|     f|  2800|    12|
|109|  Vinay|     m|  3200|    10|
|110|  Kalai|     f|  2550|    11|
|111| Shilpa|     f|  2600|    12|
+---+-------+------+------+------+

hive> select * from dept;
OK
10 Marketing
11 Sales
12 Production
Time taken: 0.238 seconds, Fetched: 3 row(s)

scala> dfDept.show
or
scala> dfDept.select("*").show

+------+----------+
|deptid|  deptname|
+------+----------+
|    10| Marketing|
|    11|     Sales|
|    12|Production|
+------+----------+


scala> dfEmp.select(max($"salary") as "MaxSal").show
+------+
|MaxSal|
+------+
|  4000|
+------+

hive> select max(salary) from emp;
4000


scala> dfEmp.select(min($"salary") as "MaxSal").show
+------+
|MaxSal|
+------+
|  1000|
+------+

hive> select min(salary) from emp;
1000


hive> select max(salary) as MaxSal, min(salary) as MinSal from emp;
4000 1000

scala> dfEmp.select(max("salary") as "MaxSal",min("salary") as "MinSal").show
+------+------+
|MaxSal|MinSal|
+------+------+
|  4000|  1000|
+------+------+


hive> select deptid,max(salary) from emp group by deptid order by deptid;
10 4000
11 3500
12 3600



scala> dfEmp.groupBy("deptid").agg(max("salary") as "maxSal").orderBy("deptid").show
+------+------+                                                               
|deptid|maxSal|
+------+------+
|    10|  4000|
|    11|  3500|
|    12|  3600|
+------+------+


hive> select deptid,count(name) from emp group by deptid order by deptid;
10 4
11 4
12 4

scala> dfEmp.groupBy("deptid").agg(count("name") as "nameCount").orderBy("deptid").show
+------+---------+                                                           
|deptid|nameCount|
+------+---------+
|    10|        4|
|    11|        4|
|    12|        4|
+------+---------+




 scala> dfEmp.select($"salary").orderBy("salary").show
 or
 scala> dfEmp.select(dfEmp("salary")).orderBy("salary").show

+------+
|salary|
+------+
|  1000|
|  1250|
|  2000|
|  2500|
|  2550|
|  2600|
|  2800|
|  3000|
|  3200|
|  3500|
|  3600|
|  4000|
+------+

select salary from emp order by salary
1000
1250
2000
2500
2550
2600
2800
3000
3200
3500
3600
4000


scala> dfEmp.select($"salary").orderBy(desc("salary")).show
or
dfEmp.select("salary").orderBy(desc("salary")).show
+------+
|salary|
+------+
|  4000|
|  3600|
|  3500|
|  3200|
|  3000|
|  2800|
|  2600|
|  2550|
|  2500|
|  2000|
|  1250|
|  1000|
+------+

hive> select salary from emp order by salary desc;
4000
3600
3500
3200
3000
2800
2600
2550
2500
2000
1250
1000



hive> select gender,max(salary) from emp group by gender order by gender;
f 4000
m 3200

scala> dfEmp.groupBy("gender").agg(max("salary") as "maxSal").orderBy("gender").show
+------+------+                                                               
|gender|maxSal|
+------+------+
|     f|  4000|
|     m|  3200|
+------+------+


hive> select gender,sum(salary) from emp group by gender order by gender;
f 23550
m 8450

scala> dfEmp.groupBy("gender").agg(sum("salary") as "GenderSumSal").orderBy("gender").show
+------+------------+                                                         
|gender|GenderSumSal|
+------+------------+
|     f|       23550|
|     m|        8450|
+------+------------+





hive> select * from emp order by salary desc;
OK
106 Ayeesha f 4000 10
105 Priya f 3600 12
104 Rashee f 3500 11
109 Vinay m 3200 10
102 Suresh m 3000 12
108 Arushi f 2800 12
111 Shilpa f 2600 12
110 Kalai f 2550 11
107 Aruvi f 2500 11
101 Rani f 2000 11
103 Rahul m 1250 10
100 Ravi m 1000 10


scala> dfEmp.orderBy(desc("salary")).show
+---+-------+------+------+------+
| id|   name|gender|salary|deptid|
+---+-------+------+------+------+
|106|Ayeesha|     f|  4000|    10|
|105|  Priya|     f|  3600|    12|
|104| Rashee|     f|  3500|    11|
|109|  Vinay|     m|  3200|    10|
|102| Suresh|     m|  3000|    12|
|108| Arushi|     f|  2800|    12|
|111| Shilpa|     f|  2600|    12|
|110|  Kalai|     f|  2550|    11|
|107|  Aruvi|     f|  2500|    11|
|101|   Rani|     f|  2000|    11|
|103|  Rahul|     m|  1250|    10|
|100|   Ravi|     m|  1000|    10|
+---+-------+------+------+------+




hive> select * from emp order by salary desc limit 2;
OK
106 Ayeesha f 4000 10
105 Priya f 3600 12


scala> dfEmp.orderBy(desc("salary")).show(2);
+---+-------+------+------+------+
| id|   name|gender|salary|deptid|
+---+-------+------+------+------+
|106|Ayeesha|     f|  4000|    10|
|105|  Priya|     f|  3600|    12|
+---+-------+------+------+------+
only showing top 2 rows



// top salaried person
hive> select * from emp order by salary desc limit 1;
106 Ayeesha f 4000 10


scala> dfEmp.orderBy(desc("salary")).show(1);
+---+-------+------+------+------+
| id|   name|gender|salary|deptid|
+---+-------+------+------+------+
|106|Ayeesha|     f|  4000|    10|
+---+-------+------+------+------+
only showing top 1 row






//extract single value (scalar) from dataframe
scala> val x:Int = dfEmp.agg(max("salary")).head().getInt(0)
x: Int = 4000



scala> dfEmp.orderBy(desc("salary")).show
+---+-------+------+------+------+
| id|   name|gender|salary|deptid|
+---+-------+------+------+------+
|106|Ayeesha|     f|  4000|    10|
|105|  Priya|     f|  3600|    12|
|104| Rashee|     f|  3500|    11|
|109|  Vinay|     m|  3200|    10|
|102| Suresh|     m|  3000|    12|
|108| Arushi|     f|  2800|    12|
|111| Shilpa|     f|  2600|    12|
|110|  Kalai|     f|  2550|    11|
|107|  Aruvi|     f|  2500|    11|
|101|   Rani|     f|  2000|    11|
|103|  Rahul|     m|  1250|    10|
|100|   Ravi|     m|  1000|    10|
+---+-------+------+------+------+


scala> dfEmp.where($"salary" < dfEmp.agg(max("salary")).first().getInt(0)).orderBy(desc("salary")).show(1)
+---+-----+------+------+------+
| id| name|gender|salary|deptid|
+---+-----+------+------+------+
|105|Priya|     f|  3600|    12|
+---+-----+------+------+------+
only showing top 1 row

// 2nd maximum salaried person
hive> select * from emp where salary not in (select max(salary) from emp ) order by salary desc limit 1;
105 Priya f 3600 12

hive> select * from (select * from emp sort by salary desc limit 2) result sort by salary limit 1;

105 Priya f 3600 12

scala> dfEmp.orderBy(desc("Salary")).limit(2).orderBy("salary").show(1);
+---+-----+------+------+------+
| id| name|gender|salary|deptid|
+---+-----+------+------+------+
|105|Priya|     f|  3600|    12|
+---+-----+------+------+------+
only showing top 1 row



scala> dfEmp.orderBy(desc("Salary")).take(2)
res87: Array[org.apache.spark.sql.Row] = Array([106,Ayeesha,f,4000,10], [105,Priya,f,3600,12])

scala> dfEmp.orderBy(desc("Salary")).take(2)(1);
res91: org.apache.spark.sql.Row = [105,Priya,f,3600,12]

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...