Wednesday, 30 January 2019

How to do with User Defined Function in Spark with Scala?

How to do with User Defined Function in Spark with Scala?
How to save Dataframe in Hive?

hdfs dfs -copyFromLocal ratings.csv /user/

scala> val r1 = sc.textFile("hdfs://localhost:9000/user/ratings.csv")
r1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/ratings.csv MapPartitionsRDD[21] at textFile at <console>:24

scala> r1.first()
res19: String = userId,movieId,rating,timestamp

scala> r1.count
res9: Long = 27753445 

scala> val header = r1.first()
header: String = userId,movieId,rating,timestamp

scala> val r2 = r1.filter (line => line != header)
r2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[66] at filter at <console>:27

scala> r2.first()
res18: String = 1,307,3.5,1256677221


val r3 = r2.map(x => x.split(",")).map {x =>
      val userid = x(0).toInt
      val movieid = x(1).toInt
      val rating = x(2).toFloat
      val time = x(3)
      (userid,movieid,rating,time)
      }

scala> r3.count
res21: Long = 27753444

scala> val df = r3.toDF("UserID","MovieID","Rating","Time")
df: org.apache.spark.sql.DataFrame = [UserID: int, MovieID: int ... 2 more fields]

scala> df.show(5)
+------+-------+------+----------+
|UserID|MovieID|Rating|      Time|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
+------+-------+------+----------+
only showing top 5 rows

// Number of Milliseconds from Jan,01,1970 to till date / epoch time 
 scala> sc.startTime
res1: Long = 1548827706613


scala> df.printSchema
root
 |-- UserID: integer (nullable = false)
 |-- MovieID: integer (nullable = false)
 |-- Rating: float (nullable = false)
 |-- Time: string (nullable = true)

// change the data type of "Time" from String into Long
scala> val df1 = df.withColumn("Time",col("Time").cast("Long"))
df1: org.apache.spark.sql.DataFrame = [UserID: int, MovieID: int ... 2 more fields]

scala> df1.printSchema
root
 |-- UserID: integer (nullable = false)
 |-- MovieID: integer (nullable = false)
 |-- Rating: float (nullable = false)
 |-- Time: long (nullable = true)


scala> spark.sql("show functions").show(250)

scala> import java.util.Date
import java.util.Date

scala> import java.text.SimpleDateFormat
import java.text.SimpleDateFormat


// user defined function
scala> def ephochToTimestamp(ephoMillis:Long):String = {
     |   val time:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")
     |   time.format(ephoMillis)
     | }
ephochToTimestamp: (ephoMillis: Long)String

// test the function
scala> ephochToTimestamp(1548827706613L)
res12: String = 2019-01-30 11:25:06

//register the function
scala> spark.udf.register("convertEpoch",ephochToTimestamp _)
res14: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(LongType)))

scala> val convertEpoch = udf(ephochToTimestamp _)
convertEpoch: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(LongType)))


// Here Time is Long in Long Data type
scala> df1.show(5)
+------+-------+------+----------+
|UserID|MovieID|Rating|      Time|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
+------+-------+------+----------+
only showing top 5 rows

// applying function
scala> val df2 = df1.withColumn("Time",convertEpoch(col("Time")))
scala> df2.show(5)
+------+-------+------+-------------------+
|UserID|MovieID|Rating|               Time|
+------+-------+------+-------------------+
|     1|    307|   3.5|1970-01-15 06:34:37|
|     1|    481|   3.5|1970-01-15 06:34:37|
|     1|   1091|   1.5|1970-01-15 06:34:37|
|     1|   1257|   4.5|1970-01-15 06:34:37|
|     1|   1449|   4.5|1970-01-15 06:34:37|
+------+-------+------+-------------------+
only showing top 5 rows

 spark.sql("show functions").show(250) // now u can see convertEpoch function present in that list


// Now the last column "Time" is in String Data Type

scala> df2.printSchema
root
 |-- UserID: integer (nullable = false)
 |-- MovieID: integer (nullable = false)
 |-- Rating: float (nullable = false)
 |-- Time: string (nullable = true)


// Now changing the datatype of "Time" from 'String' to 'TimeStamp'
scala> val df3 = df2.withColumn("Time",col("Time").cast("Timestamp"))
df3: org.apache.spark.sql.DataFrame = [UserID: int, MovieID: int ... 2 more fields]

scala> df3.printSchema
root
 |-- UserID: integer (nullable = false)
 |-- MovieID: integer (nullable = false)
 |-- Rating: float (nullable = false)
 |-- Time: timestamp (nullable = true)


scala> df3.show(5)
+------+-------+------+-------------------+
|UserID|MovieID|Rating|               Time|
+------+-------+------+-------------------+
|     1|    307|   3.5|1970-01-15 06:34:37|
|     1|    481|   3.5|1970-01-15 06:34:37|
|     1|   1091|   1.5|1970-01-15 06:34:37|
|     1|   1257|   4.5|1970-01-15 06:34:37|
|     1|   1449|   4.5|1970-01-15 06:34:37|
+------+-------+------+-------------------+
only showing top 5 rows

scala> val df4 = df3.select( col("UserID"),col("MovieID"),col("Rating"),year(col("Time")),month(col("Time")),hour(col("Time")) )
df4: org.apache.spark.sql.DataFrame = [UserID: int, MovieID: int ... 4 more fields]

scala> df4.show(5)
+------+-------+------+----------+-----------+----------+
|UserID|MovieID|Rating|year(Time)|month(Time)|hour(Time)|
+------+-------+------+----------+-----------+----------+
|     1|    307|   3.5|      1970|          1|         6|
|     1|    481|   3.5|      1970|          1|         6|
|     1|   1091|   1.5|      1970|          1|         6|
|     1|   1257|   4.5|      1970|          1|         6|
|     1|   1449|   4.5|      1970|          1|         6|
+------+-------+------+----------+-----------+----------+
only showing top 5 rows

//alias names applied for Year,Month,Hour
scala> df4.show(5)
+------+-------+------+----+-----+----+
|UserID|MovieID|Rating|Year|Month|Hour|
+------+-------+------+----+-----+----+
|     1|    307|   3.5|1970|    1|   6|
|     1|    481|   3.5|1970|    1|   6|
|     1|   1091|   1.5|1970|    1|   6|
|     1|   1257|   4.5|1970|    1|   6|
|     1|   1449|   4.5|1970|    1|   6|
+------+-------+------+----+-----+----+
only showing top 5 rows

// Write the dataframe content in hive
scala> df4.write.saveAsTable("default.ratingsTable")

scala> spark.sql("show databases").show()
+------------+
|databaseName|
+------------+
|     default|
+------------+

scala> spark.sql("use default")
res34: org.apache.spark.sql.DataFrame = []

scala> spark.sql("show tables").show()
+--------+------------+-----------+
|database|   tableName|isTemporary|
+--------+------------+-----------+
| default|ratingstable|      false|
+--------+------------+-----------+

scala> val x = spark.sql("select * from ratingstable")
x: org.apache.spark.sql.DataFrame = [UserID: int, MovieID: int ... 4 more fields]

scala> x.count
res36: Long = 27753444

scala> x.show(5)
+------+-------+------+----+-----+----+
|UserID|MovieID|Rating|Year|Month|Hour|
+------+-------+------+----+-----+----+
|     1|    307|   3.5|1970|    1|   6|
|     1|    481|   3.5|1970|    1|   6|
|     1|   1091|   1.5|1970|    1|   6|
|     1|   1257|   4.5|1970|    1|   6|
|     1|   1449|   4.5|1970|    1|   6|
+------+-------+------+----+-----+----+
only showing top 5 rows


hive> show databases;
OK
default
Time taken: 0.746 seconds, Fetched: 1 row(s)
hive> use default;
OK
Time taken: 0.067 seconds
hive> show tables;
OK
ratingstable
Time taken: 0.07 seconds, Fetched: 1 row(s)
hive> select * from ratingstable limit 10;
OK
Time taken: 3.442 seconds
hive> select * from ratingstable
    > ;
OK
Time taken: 0.321 seconds
hive> describe ratingstable;
OK
userid              int                                     
movieid              int                                     
rating              float                                   
year                int                                     
month                int                                     
hour                int   

Location:            file:/home/hadoop/spark-warehouse/ratingstable

# Storage Information
SerDe Library:      org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat:        org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat:        org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
Compressed:          No                 
Num Buckets:        -1                 
Bucket Columns:      []                 
Sort Columns:        []                 
Storage Desc Params:
path                hdfs://localhost:9000/user/hive/warehouse/ratingstable
serialization.format 1                 
Time taken: 0.246 seconds, Fetched: 35 row(s)
hive>


hdfs dfs -ls /user/hive/warehouse/ratingstable

Found 7 items
-rw-r--r--   3 hadoop supergroup          0 2019-01-30 13:42 /user/hive/warehouse/ratingstable/_SUCCESS
-rw-r--r--   3 hadoop supergroup   13775546 2019-01-30 13:39 /user/hive/warehouse/ratingstable/part-00000-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet
-rw-r--r--   3 hadoop supergroup   13669265 2019-01-30 13:40 /user/hive/warehouse/ratingstable/part-00001-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet
-rw-r--r--   3 hadoop supergroup   13437742 2019-01-30 13:40 /user/hive/warehouse/ratingstable/part-00002-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet
-rw-r--r--   3 hadoop supergroup   13207254 2019-01-30 13:41 /user/hive/warehouse/ratingstable/part-00003-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet
-rw-r--r--   3 hadoop supergroup   13113080 2019-01-30 13:42 /user/hive/warehouse/ratingstable/part-00004-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet
-rw-r--r--   3 hadoop supergroup    8639989 2019-01-30 13:42 /user/hive/warehouse/ratingstable/part-00005-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...