How to save Dataframe in Hive?
hdfs dfs -copyFromLocal ratings.csv /user/
scala> val r1 = sc.textFile("hdfs://localhost:9000/user/ratings.csv")
r1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/ratings.csv MapPartitionsRDD[21] at textFile at <console>:24
scala> r1.first()
res19: String = userId,movieId,rating,timestamp
scala> r1.count
res9: Long = 27753445
scala> val header = r1.first()
header: String = userId,movieId,rating,timestamp
scala> val r2 = r1.filter (line => line != header)
r2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[66] at filter at <console>:27
scala> r2.first()
res18: String = 1,307,3.5,1256677221
val r3 = r2.map(x => x.split(",")).map {x =>
val userid = x(0).toInt
val movieid = x(1).toInt
val rating = x(2).toFloat
val time = x(3)
(userid,movieid,rating,time)
}
scala> r3.count
res21: Long = 27753444
scala> val df = r3.toDF("UserID","MovieID","Rating","Time")
df: org.apache.spark.sql.DataFrame = [UserID: int, MovieID: int ... 2 more fields]
scala> df.show(5)
+------+-------+------+----------+
|UserID|MovieID|Rating| Time|
+------+-------+------+----------+
| 1| 307| 3.5|1256677221|
| 1| 481| 3.5|1256677456|
| 1| 1091| 1.5|1256677471|
| 1| 1257| 4.5|1256677460|
| 1| 1449| 4.5|1256677264|
+------+-------+------+----------+
only showing top 5 rows
// Number of Milliseconds from Jan,01,1970 to till date / epoch time
scala> sc.startTime
res1: Long = 1548827706613
scala> df.printSchema
root
|-- UserID: integer (nullable = false)
|-- MovieID: integer (nullable = false)
|-- Rating: float (nullable = false)
|-- Time: string (nullable = true)
// change the data type of "Time" from String into Long
scala> val df1 = df.withColumn("Time",col("Time").cast("Long"))
df1: org.apache.spark.sql.DataFrame = [UserID: int, MovieID: int ... 2 more fields]
scala> df1.printSchema
root
|-- UserID: integer (nullable = false)
|-- MovieID: integer (nullable = false)
|-- Rating: float (nullable = false)
|-- Time: long (nullable = true)
scala> spark.sql("show functions").show(250)
scala> import java.util.Date
import java.util.Date
scala> import java.text.SimpleDateFormat
import java.text.SimpleDateFormat
// user defined function
scala> def ephochToTimestamp(ephoMillis:Long):String = {
| val time:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")
| time.format(ephoMillis)
| }
ephochToTimestamp: (ephoMillis: Long)String
// test the function
scala> ephochToTimestamp(1548827706613L)
res12: String = 2019-01-30 11:25:06
//register the function
scala> spark.udf.register("convertEpoch",ephochToTimestamp _)
res14: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(LongType)))
scala> val convertEpoch = udf(ephochToTimestamp _)
convertEpoch: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(LongType)))
// Here Time is Long in Long Data type
scala> df1.show(5)
+------+-------+------+----------+
|UserID|MovieID|Rating| Time|
+------+-------+------+----------+
| 1| 307| 3.5|1256677221|
| 1| 481| 3.5|1256677456|
| 1| 1091| 1.5|1256677471|
| 1| 1257| 4.5|1256677460|
| 1| 1449| 4.5|1256677264|
+------+-------+------+----------+
only showing top 5 rows
// applying function
scala> val df2 = df1.withColumn("Time",convertEpoch(col("Time")))
scala> df2.show(5)
+------+-------+------+-------------------+
|UserID|MovieID|Rating| Time|
+------+-------+------+-------------------+
| 1| 307| 3.5|1970-01-15 06:34:37|
| 1| 481| 3.5|1970-01-15 06:34:37|
| 1| 1091| 1.5|1970-01-15 06:34:37|
| 1| 1257| 4.5|1970-01-15 06:34:37|
| 1| 1449| 4.5|1970-01-15 06:34:37|
+------+-------+------+-------------------+
only showing top 5 rows
spark.sql("show functions").show(250) // now u can see convertEpoch function present in that list
// Now the last column "Time" is in String Data Type
scala> df2.printSchema
root
|-- UserID: integer (nullable = false)
|-- MovieID: integer (nullable = false)
|-- Rating: float (nullable = false)
|-- Time: string (nullable = true)
// Now changing the datatype of "Time" from 'String' to 'TimeStamp'
scala> val df3 = df2.withColumn("Time",col("Time").cast("Timestamp"))
df3: org.apache.spark.sql.DataFrame = [UserID: int, MovieID: int ... 2 more fields]
scala> df3.printSchema
root
|-- UserID: integer (nullable = false)
|-- MovieID: integer (nullable = false)
|-- Rating: float (nullable = false)
|-- Time: timestamp (nullable = true)
scala> df3.show(5)
+------+-------+------+-------------------+
|UserID|MovieID|Rating| Time|
+------+-------+------+-------------------+
| 1| 307| 3.5|1970-01-15 06:34:37|
| 1| 481| 3.5|1970-01-15 06:34:37|
| 1| 1091| 1.5|1970-01-15 06:34:37|
| 1| 1257| 4.5|1970-01-15 06:34:37|
| 1| 1449| 4.5|1970-01-15 06:34:37|
+------+-------+------+-------------------+
only showing top 5 rows
scala> val df4 = df3.select( col("UserID"),col("MovieID"),col("Rating"),year(col("Time")),month(col("Time")),hour(col("Time")) )
df4: org.apache.spark.sql.DataFrame = [UserID: int, MovieID: int ... 4 more fields]
scala> df4.show(5)
+------+-------+------+----------+-----------+----------+
|UserID|MovieID|Rating|year(Time)|month(Time)|hour(Time)|
+------+-------+------+----------+-----------+----------+
| 1| 307| 3.5| 1970| 1| 6|
| 1| 481| 3.5| 1970| 1| 6|
| 1| 1091| 1.5| 1970| 1| 6|
| 1| 1257| 4.5| 1970| 1| 6|
| 1| 1449| 4.5| 1970| 1| 6|
+------+-------+------+----------+-----------+----------+
only showing top 5 rows
//alias names applied for Year,Month,Hour
scala> df4.show(5)
+------+-------+------+----+-----+----+
|UserID|MovieID|Rating|Year|Month|Hour|
+------+-------+------+----+-----+----+
| 1| 307| 3.5|1970| 1| 6|
| 1| 481| 3.5|1970| 1| 6|
| 1| 1091| 1.5|1970| 1| 6|
| 1| 1257| 4.5|1970| 1| 6|
| 1| 1449| 4.5|1970| 1| 6|
+------+-------+------+----+-----+----+
only showing top 5 rows
// Write the dataframe content in hive
scala> df4.write.saveAsTable("default.ratingsTable")
scala> spark.sql("show databases").show()
+------------+
|databaseName|
+------------+
| default|
+------------+
scala> spark.sql("use default")
res34: org.apache.spark.sql.DataFrame = []
scala> spark.sql("show tables").show()
+--------+------------+-----------+
|database| tableName|isTemporary|
+--------+------------+-----------+
| default|ratingstable| false|
+--------+------------+-----------+
scala> val x = spark.sql("select * from ratingstable")
x: org.apache.spark.sql.DataFrame = [UserID: int, MovieID: int ... 4 more fields]
scala> x.count
res36: Long = 27753444
scala> x.show(5)
+------+-------+------+----+-----+----+
|UserID|MovieID|Rating|Year|Month|Hour|
+------+-------+------+----+-----+----+
| 1| 307| 3.5|1970| 1| 6|
| 1| 481| 3.5|1970| 1| 6|
| 1| 1091| 1.5|1970| 1| 6|
| 1| 1257| 4.5|1970| 1| 6|
| 1| 1449| 4.5|1970| 1| 6|
+------+-------+------+----+-----+----+
only showing top 5 rows
hive> show databases;
OK
default
Time taken: 0.746 seconds, Fetched: 1 row(s)
hive> use default;
OK
Time taken: 0.067 seconds
hive> show tables;
OK
ratingstable
Time taken: 0.07 seconds, Fetched: 1 row(s)
hive> select * from ratingstable limit 10;
OK
Time taken: 3.442 seconds
hive> select * from ratingstable
> ;
OK
Time taken: 0.321 seconds
hive> describe ratingstable;
OK
userid int
movieid int
rating float
year int
month int
hour int
Location: file:/home/hadoop/spark-warehouse/ratingstable
# Storage Information
SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
path hdfs://localhost:9000/user/hive/warehouse/ratingstable
serialization.format 1
Time taken: 0.246 seconds, Fetched: 35 row(s)
hive>
hdfs dfs -ls /user/hive/warehouse/ratingstable
Found 7 items
-rw-r--r-- 3 hadoop supergroup 0 2019-01-30 13:42 /user/hive/warehouse/ratingstable/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 13775546 2019-01-30 13:39 /user/hive/warehouse/ratingstable/part-00000-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet
-rw-r--r-- 3 hadoop supergroup 13669265 2019-01-30 13:40 /user/hive/warehouse/ratingstable/part-00001-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet
-rw-r--r-- 3 hadoop supergroup 13437742 2019-01-30 13:40 /user/hive/warehouse/ratingstable/part-00002-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet
-rw-r--r-- 3 hadoop supergroup 13207254 2019-01-30 13:41 /user/hive/warehouse/ratingstable/part-00003-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet
-rw-r--r-- 3 hadoop supergroup 13113080 2019-01-30 13:42 /user/hive/warehouse/ratingstable/part-00004-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet
-rw-r--r-- 3 hadoop supergroup 8639989 2019-01-30 13:42 /user/hive/warehouse/ratingstable/part-00005-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet
No comments:
Post a Comment