Input
yahoo_stocks.csv:
------------------
Date,Open,High,Low,Close,Volume,Adj Close
2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34
2015-04-27,44.65,45.10,44.25,44.36,10840900,44.36
2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52
2015-04-23,43.92,44.06,43.58,43.70,14274900,43.70
2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98
2015-04-21,45.15,45.18,44.45,44.49,16103700,44.49
2015-04-20,44.73,44.91,44.41,44.66,10052900,44.66
2015-04-17,45.30,45.44,44.25,44.45,13305700,44.45
2015-04-16,45.82,46.13,45.53,45.78,13800300,45.78
2015-04-15,45.46,45.83,45.23,45.73,15033500,45.73
hadoop@hadoop:~/Desktop/vow$ hdfs dfs -copyFromLocal yahoo_stocks.csv /user/
hadoop@hadoop:~/Desktop/vow$ hdfs dfs -head /user/yahoo_stocks.csv
Date,Open,High,Low,Close,Volume,Adj Close
2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34
2015-04-27,44.65,45.10,44.25,44.36,10840900,44.36
2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52
2015-04-23,43.92,44.06,43.58,43.70,14274900,43.70
2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98
2015-04-21,45.15,45.18,44.45,44.49,16103700,44.49
scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
scala> import org.apache.spark.sql.types
import org.apache.spark.sql.types
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
scala> import spark.implicits._
import spark.implicits._
scala> spark.sql("CREATE TABLE yahoo_orc_table (date STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT, close_price FLOAT, volume INT, adj_price FLOAT) stored as orc")
2019-02-01 20:59:04 WARN HiveMetaStore:1383 - Location: hdfs://localhost:9000/user/hive/warehouse/yahoo_orc_table specified for non-external table:yahoo_orc_table
res39: org.apache.spark.sql.DataFrame = []
scala> val stocks = sc.textFile("hdfs://localhost:9000/user/yahoo_stocks.csv")
stocks: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/yahoo_stocks.csv MapPartitionsRDD[109] at textFile at <console>:42
scala> stocks.take(5).foreach(println)
Date,Open,High,Low,Close,Volume,Adj Close
2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34
2015-04-27,44.65,45.10,44.25,44.36,10840900,44.36
2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52
2015-04-23,43.92,44.06,43.58,43.70,14274900,43.70
scala> val header = stocks.first
header: String = Date,Open,High,Low,Close,Volume,Adj Close
scala> val rddStocksWithoutHeader = stocks.filter(x => x != header)
rddStocksWithoutHeader: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[110] at filter at <console>:45
scala> rddStocksWithoutHeader.take(5).foreach(println)
2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34
2015-04-27,44.65,45.10,44.25,44.36,10840900,44.36
2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52
2015-04-23,43.92,44.06,43.58,43.70,14274900,43.70
2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98
scala> case class YahooStockPrice (date:String, open:Float, high:Float,low:Float,close:Float,volume:Integer,adjcClose:Float)
defined class YahooStockPrice
scala> val stockpricewithSchema = rddStocksWithoutHeader.map (x => {
| val fields = x.split(",")
| val date = fields(0)
| val open = fields(1).trim.toFloat
| val high = fields(2).trim.toFloat
| val low = fields(3).trim.toFloat
| val close = fields(4).trim.toFloat
| val volume = fields(5).trim.toInt
| val adjClose = fields(6).trim.toFloat
| YahooStockPrice(date,open,high,low,close,volume,adjClose)
| })
stockpricewithSchema: org.apache.spark.rdd.RDD[YahooStockPrice] = MapPartitionsRDD[111] at map at <console>:45
scala> stockpricewithSchema.take(5).foreach(println)
YahooStockPrice(2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34)
YahooStockPrice(2015-04-27,44.65,45.1,44.25,44.36,10840900,44.36)
YahooStockPrice(2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52)
YahooStockPrice(2015-04-23,43.92,44.06,43.58,43.7,14274900,43.7)
YahooStockPrice(2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98)
scala> val dfStockPrice = stockpricewithSchema.toDF
dfStockPrice: org.apache.spark.sql.DataFrame = [date: string, open: float ... 5 more fields]
scala> dfStockPrice.printSchema
root
|-- date: string (nullable = true)
|-- open: float (nullable = false)
|-- high: float (nullable = false)
|-- low: float (nullable = false)
|-- close: float (nullable = false)
|-- volume: integer (nullable = true)
|-- adjcClose: float (nullable = false)
scala> dfStockPrice.show(5)
+----------+-----+-----+-----+-----+--------+---------+
| date| open| high| low|close| volume|adjcClose|
+----------+-----+-----+-----+-----+--------+---------+
|2015-04-28|44.34|44.57|43.94|44.34| 7188300| 44.34|
|2015-04-27|44.65| 45.1|44.25|44.36|10840900| 44.36|
|2015-04-24|43.73|44.71|43.69|44.52|11267500| 44.52|
|2015-04-23|43.92|44.06|43.58| 43.7|14274900| 43.7|
|2015-04-22|44.58|44.85|43.67|43.98|32241200| 43.98|
+----------+-----+-----+-----+-----+--------+---------+
only showing top 5 rows
scala> dfStockPrice.createOrReplaceTempView("ystemp")
scala> val results = spark.sql("SELECT * FROM ystemp limit 10")
results: org.apache.spark.sql.DataFrame = [date: string, open: float ... 5 more fields]
scala> results.show
+----------+-----+-----+-----+-----+--------+---------+
| date| open| high| low|close| volume|adjcClose|
+----------+-----+-----+-----+-----+--------+---------+
|2015-04-28|44.34|44.57|43.94|44.34| 7188300| 44.34|
|2015-04-27|44.65| 45.1|44.25|44.36|10840900| 44.36|
|2015-04-24|43.73|44.71|43.69|44.52|11267500| 44.52|
|2015-04-23|43.92|44.06|43.58| 43.7|14274900| 43.7|
|2015-04-22|44.58|44.85|43.67|43.98|32241200| 43.98|
|2015-04-21|45.15|45.18|44.45|44.49|16103700| 44.49|
|2015-04-20|44.73|44.91|44.41|44.66|10052900| 44.66|
|2015-04-17| 45.3|45.44|44.25|44.45|13305700| 44.45|
|2015-04-16|45.82|46.13|45.53|45.78|13800300| 45.78|
|2015-04-15|45.46|45.83|45.23|45.73|15033500| 45.73|
+----------+-----+-----+-----+-----+--------+---------+
scala> results.map(t => "Stock Entry: " + t.toString).collect().foreach(println)
Stock Entry: [2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34]
Stock Entry: [2015-04-27,44.65,45.1,44.25,44.36,10840900,44.36]
Stock Entry: [2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52]
Stock Entry: [2015-04-23,43.92,44.06,43.58,43.7,14274900,43.7]
Stock Entry: [2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98]
Stock Entry: [2015-04-21,45.15,45.18,44.45,44.49,16103700,44.49]
Stock Entry: [2015-04-20,44.73,44.91,44.41,44.66,10052900,44.66]
Stock Entry: [2015-04-17,45.3,45.44,44.25,44.45,13305700,44.45]
Stock Entry: [2015-04-16,45.82,46.13,45.53,45.78,13800300,45.78]
Stock Entry: [2015-04-15,45.46,45.83,45.23,45.73,15033500,45.73]
scala> val results = spark.sql("SELECT * FROM ystemp")
results: org.apache.spark.sql.DataFrame = [date: string, open: float ... 5 more fields]
scala> results.write.format("orc").save("yahoo_stocks_orc")
scala> val yahoo_stocks_orc = spark.read.format("orc").load("yahoo_stocks_orc")
yahoo_stocks_orc: org.apache.spark.sql.DataFrame = [date: string, open: float ... 5 more fields]
scala> yahoo_stocks_orc.createOrReplaceTempView("orcTest")
scala> spark.sql("SELECT * from orcTest").collect.take(3).foreach(println)
[2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34]
[2015-04-27,44.65,45.1,44.25,44.36,10840900,44.36]
[2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52]
yahoo_stocks.csv:
------------------
Date,Open,High,Low,Close,Volume,Adj Close
2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34
2015-04-27,44.65,45.10,44.25,44.36,10840900,44.36
2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52
2015-04-23,43.92,44.06,43.58,43.70,14274900,43.70
2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98
2015-04-21,45.15,45.18,44.45,44.49,16103700,44.49
2015-04-20,44.73,44.91,44.41,44.66,10052900,44.66
2015-04-17,45.30,45.44,44.25,44.45,13305700,44.45
2015-04-16,45.82,46.13,45.53,45.78,13800300,45.78
2015-04-15,45.46,45.83,45.23,45.73,15033500,45.73
hadoop@hadoop:~/Desktop/vow$ hdfs dfs -copyFromLocal yahoo_stocks.csv /user/
hadoop@hadoop:~/Desktop/vow$ hdfs dfs -head /user/yahoo_stocks.csv
Date,Open,High,Low,Close,Volume,Adj Close
2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34
2015-04-27,44.65,45.10,44.25,44.36,10840900,44.36
2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52
2015-04-23,43.92,44.06,43.58,43.70,14274900,43.70
2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98
2015-04-21,45.15,45.18,44.45,44.49,16103700,44.49
scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
scala> import org.apache.spark.sql.types
import org.apache.spark.sql.types
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
scala> import spark.implicits._
import spark.implicits._
scala> spark.sql("CREATE TABLE yahoo_orc_table (date STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT, close_price FLOAT, volume INT, adj_price FLOAT) stored as orc")
2019-02-01 20:59:04 WARN HiveMetaStore:1383 - Location: hdfs://localhost:9000/user/hive/warehouse/yahoo_orc_table specified for non-external table:yahoo_orc_table
res39: org.apache.spark.sql.DataFrame = []
scala> val stocks = sc.textFile("hdfs://localhost:9000/user/yahoo_stocks.csv")
stocks: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/yahoo_stocks.csv MapPartitionsRDD[109] at textFile at <console>:42
scala> stocks.take(5).foreach(println)
Date,Open,High,Low,Close,Volume,Adj Close
2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34
2015-04-27,44.65,45.10,44.25,44.36,10840900,44.36
2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52
2015-04-23,43.92,44.06,43.58,43.70,14274900,43.70
scala> val header = stocks.first
header: String = Date,Open,High,Low,Close,Volume,Adj Close
scala> val rddStocksWithoutHeader = stocks.filter(x => x != header)
rddStocksWithoutHeader: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[110] at filter at <console>:45
scala> rddStocksWithoutHeader.take(5).foreach(println)
2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34
2015-04-27,44.65,45.10,44.25,44.36,10840900,44.36
2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52
2015-04-23,43.92,44.06,43.58,43.70,14274900,43.70
2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98
scala> case class YahooStockPrice (date:String, open:Float, high:Float,low:Float,close:Float,volume:Integer,adjcClose:Float)
defined class YahooStockPrice
scala> val stockpricewithSchema = rddStocksWithoutHeader.map (x => {
| val fields = x.split(",")
| val date = fields(0)
| val open = fields(1).trim.toFloat
| val high = fields(2).trim.toFloat
| val low = fields(3).trim.toFloat
| val close = fields(4).trim.toFloat
| val volume = fields(5).trim.toInt
| val adjClose = fields(6).trim.toFloat
| YahooStockPrice(date,open,high,low,close,volume,adjClose)
| })
stockpricewithSchema: org.apache.spark.rdd.RDD[YahooStockPrice] = MapPartitionsRDD[111] at map at <console>:45
scala> stockpricewithSchema.take(5).foreach(println)
YahooStockPrice(2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34)
YahooStockPrice(2015-04-27,44.65,45.1,44.25,44.36,10840900,44.36)
YahooStockPrice(2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52)
YahooStockPrice(2015-04-23,43.92,44.06,43.58,43.7,14274900,43.7)
YahooStockPrice(2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98)
scala> val dfStockPrice = stockpricewithSchema.toDF
dfStockPrice: org.apache.spark.sql.DataFrame = [date: string, open: float ... 5 more fields]
scala> dfStockPrice.printSchema
root
|-- date: string (nullable = true)
|-- open: float (nullable = false)
|-- high: float (nullable = false)
|-- low: float (nullable = false)
|-- close: float (nullable = false)
|-- volume: integer (nullable = true)
|-- adjcClose: float (nullable = false)
scala> dfStockPrice.show(5)
+----------+-----+-----+-----+-----+--------+---------+
| date| open| high| low|close| volume|adjcClose|
+----------+-----+-----+-----+-----+--------+---------+
|2015-04-28|44.34|44.57|43.94|44.34| 7188300| 44.34|
|2015-04-27|44.65| 45.1|44.25|44.36|10840900| 44.36|
|2015-04-24|43.73|44.71|43.69|44.52|11267500| 44.52|
|2015-04-23|43.92|44.06|43.58| 43.7|14274900| 43.7|
|2015-04-22|44.58|44.85|43.67|43.98|32241200| 43.98|
+----------+-----+-----+-----+-----+--------+---------+
only showing top 5 rows
scala> dfStockPrice.createOrReplaceTempView("ystemp")
scala> val results = spark.sql("SELECT * FROM ystemp limit 10")
results: org.apache.spark.sql.DataFrame = [date: string, open: float ... 5 more fields]
scala> results.show
+----------+-----+-----+-----+-----+--------+---------+
| date| open| high| low|close| volume|adjcClose|
+----------+-----+-----+-----+-----+--------+---------+
|2015-04-28|44.34|44.57|43.94|44.34| 7188300| 44.34|
|2015-04-27|44.65| 45.1|44.25|44.36|10840900| 44.36|
|2015-04-24|43.73|44.71|43.69|44.52|11267500| 44.52|
|2015-04-23|43.92|44.06|43.58| 43.7|14274900| 43.7|
|2015-04-22|44.58|44.85|43.67|43.98|32241200| 43.98|
|2015-04-21|45.15|45.18|44.45|44.49|16103700| 44.49|
|2015-04-20|44.73|44.91|44.41|44.66|10052900| 44.66|
|2015-04-17| 45.3|45.44|44.25|44.45|13305700| 44.45|
|2015-04-16|45.82|46.13|45.53|45.78|13800300| 45.78|
|2015-04-15|45.46|45.83|45.23|45.73|15033500| 45.73|
+----------+-----+-----+-----+-----+--------+---------+
scala> results.map(t => "Stock Entry: " + t.toString).collect().foreach(println)
Stock Entry: [2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34]
Stock Entry: [2015-04-27,44.65,45.1,44.25,44.36,10840900,44.36]
Stock Entry: [2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52]
Stock Entry: [2015-04-23,43.92,44.06,43.58,43.7,14274900,43.7]
Stock Entry: [2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98]
Stock Entry: [2015-04-21,45.15,45.18,44.45,44.49,16103700,44.49]
Stock Entry: [2015-04-20,44.73,44.91,44.41,44.66,10052900,44.66]
Stock Entry: [2015-04-17,45.3,45.44,44.25,44.45,13305700,44.45]
Stock Entry: [2015-04-16,45.82,46.13,45.53,45.78,13800300,45.78]
Stock Entry: [2015-04-15,45.46,45.83,45.23,45.73,15033500,45.73]
scala> val results = spark.sql("SELECT * FROM ystemp")
results: org.apache.spark.sql.DataFrame = [date: string, open: float ... 5 more fields]
scala> results.write.format("orc").save("yahoo_stocks_orc")
scala> val yahoo_stocks_orc = spark.read.format("orc").load("yahoo_stocks_orc")
yahoo_stocks_orc: org.apache.spark.sql.DataFrame = [date: string, open: float ... 5 more fields]
scala> yahoo_stocks_orc.createOrReplaceTempView("orcTest")
scala> spark.sql("SELECT * from orcTest").collect.take(3).foreach(println)
[2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34]
[2015-04-27,44.65,45.1,44.25,44.36,10840900,44.36]
[2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52]