Friday, 1 February 2019

Experimenting with ORC file - Parsing Yahoo Stocks using Spark with Scala

Input
yahoo_stocks.csv:
------------------
Date,Open,High,Low,Close,Volume,Adj Close
2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34
2015-04-27,44.65,45.10,44.25,44.36,10840900,44.36
2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52
2015-04-23,43.92,44.06,43.58,43.70,14274900,43.70
2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98
2015-04-21,45.15,45.18,44.45,44.49,16103700,44.49
2015-04-20,44.73,44.91,44.41,44.66,10052900,44.66
2015-04-17,45.30,45.44,44.25,44.45,13305700,44.45
2015-04-16,45.82,46.13,45.53,45.78,13800300,45.78
2015-04-15,45.46,45.83,45.23,45.73,15033500,45.73



hadoop@hadoop:~/Desktop/vow$ hdfs dfs -copyFromLocal yahoo_stocks.csv /user/

hadoop@hadoop:~/Desktop/vow$ hdfs dfs -head /user/yahoo_stocks.csv

Date,Open,High,Low,Close,Volume,Adj Close
2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34
2015-04-27,44.65,45.10,44.25,44.36,10840900,44.36
2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52
2015-04-23,43.92,44.06,43.58,43.70,14274900,43.70
2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98
2015-04-21,45.15,45.18,44.45,44.49,16103700,44.49


scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> import org.apache.spark.sql.types
import org.apache.spark.sql.types

scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> import spark.implicits._
import spark.implicits._

scala> spark.sql("CREATE TABLE yahoo_orc_table (date STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT, close_price FLOAT, volume INT, adj_price FLOAT) stored as orc")
2019-02-01 20:59:04 WARN  HiveMetaStore:1383 - Location: hdfs://localhost:9000/user/hive/warehouse/yahoo_orc_table specified for non-external table:yahoo_orc_table
res39: org.apache.spark.sql.DataFrame = []

scala> val stocks = sc.textFile("hdfs://localhost:9000/user/yahoo_stocks.csv")
stocks: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/yahoo_stocks.csv MapPartitionsRDD[109] at textFile at <console>:42

scala> stocks.take(5).foreach(println)
Date,Open,High,Low,Close,Volume,Adj Close
2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34
2015-04-27,44.65,45.10,44.25,44.36,10840900,44.36
2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52
2015-04-23,43.92,44.06,43.58,43.70,14274900,43.70

scala> val header = stocks.first
header: String = Date,Open,High,Low,Close,Volume,Adj Close

scala> val rddStocksWithoutHeader = stocks.filter(x => x != header)
rddStocksWithoutHeader: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[110] at filter at <console>:45

scala> rddStocksWithoutHeader.take(5).foreach(println)
2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34
2015-04-27,44.65,45.10,44.25,44.36,10840900,44.36
2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52
2015-04-23,43.92,44.06,43.58,43.70,14274900,43.70
2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98



scala> case class YahooStockPrice (date:String, open:Float, high:Float,low:Float,close:Float,volume:Integer,adjcClose:Float)
defined class YahooStockPrice


 scala> val stockpricewithSchema = rddStocksWithoutHeader.map (x => {
     |       val fields = x.split(",")
     | val date = fields(0)
     | val open = fields(1).trim.toFloat
     | val high = fields(2).trim.toFloat
     | val low = fields(3).trim.toFloat
     | val close = fields(4).trim.toFloat
     | val volume = fields(5).trim.toInt
     | val adjClose = fields(6).trim.toFloat
     | YahooStockPrice(date,open,high,low,close,volume,adjClose)
     | })
stockpricewithSchema: org.apache.spark.rdd.RDD[YahooStockPrice] = MapPartitionsRDD[111] at map at <console>:45

scala> stockpricewithSchema.take(5).foreach(println)
YahooStockPrice(2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34)
YahooStockPrice(2015-04-27,44.65,45.1,44.25,44.36,10840900,44.36)
YahooStockPrice(2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52)
YahooStockPrice(2015-04-23,43.92,44.06,43.58,43.7,14274900,43.7)
YahooStockPrice(2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98)

scala> val dfStockPrice = stockpricewithSchema.toDF
dfStockPrice: org.apache.spark.sql.DataFrame = [date: string, open: float ... 5 more fields]

scala> dfStockPrice.printSchema
root
 |-- date: string (nullable = true)
 |-- open: float (nullable = false)
 |-- high: float (nullable = false)
 |-- low: float (nullable = false)
 |-- close: float (nullable = false)
 |-- volume: integer (nullable = true)
 |-- adjcClose: float (nullable = false)


scala> dfStockPrice.show(5)
+----------+-----+-----+-----+-----+--------+---------+
|      date| open| high|  low|close|  volume|adjcClose|
+----------+-----+-----+-----+-----+--------+---------+
|2015-04-28|44.34|44.57|43.94|44.34| 7188300|    44.34|
|2015-04-27|44.65| 45.1|44.25|44.36|10840900|    44.36|
|2015-04-24|43.73|44.71|43.69|44.52|11267500|    44.52|
|2015-04-23|43.92|44.06|43.58| 43.7|14274900|     43.7|
|2015-04-22|44.58|44.85|43.67|43.98|32241200|    43.98|
+----------+-----+-----+-----+-----+--------+---------+
only showing top 5 rows

scala> dfStockPrice.createOrReplaceTempView("ystemp")

scala> val results = spark.sql("SELECT * FROM ystemp limit 10")
results: org.apache.spark.sql.DataFrame = [date: string, open: float ... 5 more fields]

scala> results.show
+----------+-----+-----+-----+-----+--------+---------+
|      date| open| high|  low|close|  volume|adjcClose|
+----------+-----+-----+-----+-----+--------+---------+
|2015-04-28|44.34|44.57|43.94|44.34| 7188300|    44.34|
|2015-04-27|44.65| 45.1|44.25|44.36|10840900|    44.36|
|2015-04-24|43.73|44.71|43.69|44.52|11267500|    44.52|
|2015-04-23|43.92|44.06|43.58| 43.7|14274900|     43.7|
|2015-04-22|44.58|44.85|43.67|43.98|32241200|    43.98|
|2015-04-21|45.15|45.18|44.45|44.49|16103700|    44.49|
|2015-04-20|44.73|44.91|44.41|44.66|10052900|    44.66|
|2015-04-17| 45.3|45.44|44.25|44.45|13305700|    44.45|
|2015-04-16|45.82|46.13|45.53|45.78|13800300|    45.78|
|2015-04-15|45.46|45.83|45.23|45.73|15033500|    45.73|
+----------+-----+-----+-----+-----+--------+---------+


scala> results.map(t => "Stock Entry: " + t.toString).collect().foreach(println)
Stock Entry: [2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34]
Stock Entry: [2015-04-27,44.65,45.1,44.25,44.36,10840900,44.36]
Stock Entry: [2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52]
Stock Entry: [2015-04-23,43.92,44.06,43.58,43.7,14274900,43.7]
Stock Entry: [2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98]
Stock Entry: [2015-04-21,45.15,45.18,44.45,44.49,16103700,44.49]
Stock Entry: [2015-04-20,44.73,44.91,44.41,44.66,10052900,44.66]
Stock Entry: [2015-04-17,45.3,45.44,44.25,44.45,13305700,44.45]
Stock Entry: [2015-04-16,45.82,46.13,45.53,45.78,13800300,45.78]
Stock Entry: [2015-04-15,45.46,45.83,45.23,45.73,15033500,45.73]



scala> val results = spark.sql("SELECT * FROM ystemp")
results: org.apache.spark.sql.DataFrame = [date: string, open: float ... 5 more fields]

scala> results.write.format("orc").save("yahoo_stocks_orc")

scala> val yahoo_stocks_orc = spark.read.format("orc").load("yahoo_stocks_orc")
yahoo_stocks_orc: org.apache.spark.sql.DataFrame = [date: string, open: float ... 5 more fields]

scala> yahoo_stocks_orc.createOrReplaceTempView("orcTest")

scala> spark.sql("SELECT * from orcTest").collect.take(3).foreach(println)
[2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34]
[2015-04-27,44.65,45.1,44.25,44.36,10840900,44.36]
[2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52]

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...