Thursday, 31 January 2019

Case Class and StructType, StructField Examples in Spark with Scala

Input


people.txt:
------------
Michael, 29
Andy, 30
Justin, 19

people.json:
-----------
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}​


hadoop@hadoop:~/Desktop/vow$ hdfs dfs -copyFromLocal people.* /user/


hadoop@hadoop:~/Desktop/vow$ hdfs dfs -ls /user/people*.*
-rw-r--r--   1 hadoop supergroup         76 2019-02-01 11:43 /user/people.json
-rw-r--r--   1 hadoop supergroup         32 2019-02-01 11:43 /user/people.txt

scala> val df = spark.read.json("hdfs://localhost:9000/user/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

-- OR --

scala> val df = spark.read.format("json").load("hdfs://localhost:9000/user/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)


scala> df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
             

scala> df.select("age","name").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


scala> df.select(col("age"),col("name")).show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


scala> df.select(df("age"),df("name")).show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+




// Add 1 to Age
scala> df.select(df("name"),df("age")+1).show
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+


scala> df.select($"name",$"age"+1).show
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+


scala> df.select(col("name"),col("age")+1).show
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+


scala> df.select($"age" > 21).show
+----------+
|(age > 21)|
+----------+
|      null|
|      true|
|     false|
+----------+

scala> df.select(col("age") > 21).show
+----------+
|(age > 21)|
+----------+
|      null|
|      true|
|     false|
+----------+

//Filter operations


scala> df.filter($"age" > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+


scala> df.filter(col("age") > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+


scala> df.filter(df("age") > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



// Group By
scala> df.groupBy("age").count().show
+----+-----+                                                                   
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+




import org.apache.spark.sql._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._

scala> val peopleRDD = spark.sparkContext.textFile("hdfs://localhost:9000/user/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/people.txt MapPartitionsRDD[80] at textFile at <console>:33

scala> peopleRDD.collect.foreach(println)
Michael, 29
Andy, 30
Justin, 19


// schema using StructType and StructField defination
scala> val schema = StructType(StructField("Name",StringType)::StructField("Age",IntegerType)::Nil )
schema: org.apache.spark.sql.types.StructType = StructType(StructField(Name,StringType,true), StructField(Age,IntegerType,true))

// StructType Not applied here
val rowRDD  = peopleRDD.map (x => {
       val fields = x.split(",")
       val Name = fields(0).trim()
       val Age  = fields(1).trim()
   (Name,Age)
       })

scala> rowRDD.collect.foreach(println)
(Michael,29)
(Andy,30)
(Justin,19)

val rowRDD  = peopleRDD.map (x => {
       val fields = x.split(",")
       val Name = fields(0).trim()
       val Age  = fields(1).trim().toInt
   Row(Name,Age)
       })

scala> val rowRDD  = peopleRDD.map (x => {
     |        val fields = x.split(",")
     |        val Name = fields(0).trim()
     |        val Age  = fields(1).trim().toInt
     |      Row(Name,Age)
     |        })
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[84] at map at <console>:35

scala> rowRDD.collect.foreach(println)
[Michael,29]
[Andy,30]
[Justin,19]


// Creating Dataframe with schema (StructType and StructField here)
scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF: org.apache.spark.sql.DataFrame = [Name: string, Age: int]

scala> peopleDF.createOrReplaceTempView("people")
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

scala> val results = spark.sql("SELECT name FROM people")
results: org.apache.spark.sql.DataFrame = [name: string]

scala> results.map(attributes => "Name: " + attributes(0)).show()
+-------------+
|        value|
+-------------+
|Name: Michael|
|   Name: Andy|
| Name: Justin|
+-------------+



 scala> val ds = Seq(1,2,3).toDS()
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> ds.show
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
+-----+


scala> ds.printSchema
root
 |-- value: integer (nullable = false)


scala> case class Person(name:String, age:Long)
defined class Person


scala> val ds = Seq(Person("Sankar",42),Person("Sudha",40),Person("Vijay",18),Person("Aish",13)).toDS
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> ds.printSchema
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = false)


scala> ds.show
+------+---+
|  name|age|
+------+---+
|Sankar| 42|
| Sudha| 40|
| Vijay| 18|
|  Aish| 13|
+------+---+

scala> val path = "hdfs://localhost:9000/user/people.json"
path: String = hdfs://localhost:9000/user/people.json


// Case Class definition
scala> val peopleDS = spark.read.format("json").load(path).as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

// as [CaseClassName]
scala> val peopleDS = spark.read.format("json").load("hdfs://localhost:9000/user/people.json").as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

scala> peopleDS.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)


scala> peopleDS.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

scala> val peopleFiltered = peopleDS.filter("age is not null")
peopleFiltered: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

scala> peopleFiltered.show
+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+

How to Access MySQL database in Spark SQL?

//Here we are going create a database, table and add some sample records in MySQL
sudo mysql
[sudo] password for hadoop:

mysql> create database myown;
Query OK, 1 row affected (0.00 sec)

mysql> use myown;
Database changed

mysql> create table hadoopcomponents (name varchar(20),purpose varchar(20));
Query OK, 0 rows affected (0.44 sec)

mysql> insert into hadoopcomponents values ("hdfs","storage");

mysql> insert into hadoopcomponents values("sqoop","ingestion");
Query OK, 1 row affected (0.00 sec)

mysql> insert into hadoopcomponents values("flume","ingestion");
Query OK, 1 row affected (0.02 sec)

mysql> insert into hadoopcomponents values("kafka","ingestion");
Query OK, 1 row affected (0.00 sec)

mysql> insert into hadoopcomponents values("pig","processing");
Query OK, 1 row affected (0.03 sec)

mysql> insert into hadoopcomponents values("hive","processing");
Query OK, 1 row affected (0.00 sec)

mysql> insert into hadoopcomponents values("spark","processing");
Query OK, 1 row affected (0.02 sec)

mysql> insert into hadoopcomponents values("hive","datawarehousing");
Query OK, 1 row affected (0.08 sec)

mysql> insert into hadoopcomponents values("hbase","nosqldatabase");
Query OK, 1 row affected (0.01 sec)

mysql> select * from hadoopcomponents;
+-----------+-----------------+
| name      | purpose         |
+-----------+-----------------+
| hdfs      | storage         |
| mapreduce | processing      |
| sqoop     | ingestion       |
| flume     | ingestion       |
| kafka     | ingestion       |
| pig       | processing      |
| hive      | processing      |
| spark     | processing      |
| hive      | datawarehousing |
| hbase     | nosqldatabase   |
+-----------+-----------------+
10 rows in set (0.00 sec)

// Here we are going to connect MySQL in Spark

scala> val dfHadoopComponentsfromMySQL = spark.read.format("jdbc").
      option("driver","com.mysql.jdbc.Driver").
      option("url","jdbc:mysql://localhost:3306").
      option("dbtable","myown.hadoopcomponents").
      option("user","hadoop").
      option("password","hadoop").
      load()

scala> dfHadoopComponentsfromMySQL.printSchema
root
 |-- name: string (nullable = true)
 |-- purpose: string (nullable = true)



scala> dfHadoopComponentsfromMySQL.show
Thu Jan 31 13:53:22 IST 2019 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
+---------+---------------+
|     name|        purpose|
+---------+---------------+
|     hdfs|        storage|
|mapreduce|     processing|
|    sqoop|      ingestion|
|    flume|      ingestion|
|    kafka|      ingestion|
|      pig|     processing|
|     hive|     processing|
|    spark|     processing|
|     hive|datawarehousing|
|    hbase|  nosqldatabase|
+---------+---------------+

// Querying Database using Spark Dataframe syntax
scala> dfHadoopComponentsfromMySQL.select("name").show()
+---------+
|     name|
+---------+
|     hdfs|
|mapreduce|
|    sqoop|
|    flume|
|    kafka|
|      pig|
|     hive|
|    spark|
|     hive|
|    hbase|
+---------+

scala> dfHadoopComponentsfromMySQL.select("*").groupBy("purpose").count().show()
+---------------+-----+                                                       
|        purpose|count|
+---------------+-----+
|      ingestion|    3|
|datawarehousing|    1|
|  nosqldatabase|    1|
|     processing|    4|
|        storage|    1|
+---------------+-----+

scala> dfHadoopComponentsfromMySQL.filter("purpose='ingestion'").show
+-----+---------+
| name|  purpose|
+-----+---------+
|sqoop|ingestion|
|flume|ingestion|
|kafka|ingestion|
+-----+---------+

scala> dfHadoopComponentsfromMySQL.filter("purpose='processing'").show
+---------+----------+
|     name|   purpose|
+---------+----------+
|mapreduce|processing|
|      pig|processing|
|     hive|processing|
|    spark|processing|
+---------+----------+

scala> dfHadoopComponentsfromMySQL.select("name").filter("purpose='processing'").show
+---------+
|     name|
+---------+
|mapreduce|
|      pig|
|     hive|
|    spark|
+---------+


// Here we create temp view
scala> dfHadoopComponentsfromMySQL.createOrReplaceTempView("components")

// Here after we use pure sql in Spark SQL
scala> spark.sql("select * from components").show()

+---------+---------------+
|     name|        purpose|
+---------+---------------+
|     hdfs|        storage|
|mapreduce|     processing|
|    sqoop|      ingestion|
|    flume|      ingestion|
|    kafka|      ingestion|
|      pig|     processing|
|     hive|     processing|
|    spark|     processing|
|     hive|datawarehousing|
|    hbase|  nosqldatabase|
+---------+---------------+

scala> spark.sql("select * from components where purpose ='ingestion'").show()
+-----+---------+
| name|  purpose|
+-----+---------+
|sqoop|ingestion|
|flume|ingestion|
|kafka|ingestion|
+-----+---------+

scala> spark.sql("select * from components where purpose in('ingestion','processing')").show()
+---------+----------+
|     name|   purpose|
+---------+----------+
|mapreduce|processing|
|    sqoop| ingestion|
|    flume| ingestion|
|    kafka| ingestion|
|      pig|processing|
|     hive|processing|
|    spark|processing|
+---------+----------+

scala> spark.sql("select purpose,count(purpose) from components group by purpose").show()
+---------------+--------------+                                               
|        purpose|count(purpose)|
+---------------+--------------+
|      ingestion|             3|
|datawarehousing|             1|
|  nosqldatabase|             1|
|     processing|             4|
|        storage|             1|
+---------------+--------------+

scala> spark.sql("select purpose,count(purpose) as count from components group by purpose").show()
+---------------+-----+                                                       
|        purpose|count|
+---------------+-----+
|      ingestion|    3|
|datawarehousing|    1|
|  nosqldatabase|    1|
|     processing|    4|
|        storage|    1|
+---------------+-----+

scala> spark.sql("select purpose,count(purpose) as count from components group by purpose order by purpose").show()
+---------------+-----+                                                       
|        purpose|count|
+---------------+-----+
|datawarehousing|    1|
|      ingestion|    3|
|  nosqldatabase|    1|
|     processing|    4|
|        storage|    1|
+---------------+-----+

scala> spark.sql("select purpose,count(purpose) as count from components group by purpose order by purpose desc").show()
+---------------+-----+                                                       
|        purpose|count|
+---------------+-----+
|        storage|    1|
|     processing|    4|
|  nosqldatabase|    1|
|      ingestion|    3|
|datawarehousing|    1|
+---------------+-----+

RDD, Dataframe,DataSets in Spark with Scala

Converting RDD to DataFrame to DataSet (interoperable)

RDD -> Dataframe ====> .toDF
RDD -> DataSet   ====> .toDS

Dataframe -> DataSet  ====> df.as[Type]
Dataframe -> RDD  ====> df.rdd

DataSet -> RDD  ====> ds.rdd
DataSet -> Dataframe  ====> ds.toDF


//RDDs in Spark
scala> val r1 = sc.textFile("hdfs://localhost:9000/user/movies.csv")
r1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/movies.csv MapPartitionsRDD[6] at textFile at <console>:24

scala> r1.take(5).foreach(println)
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance

val r2 = r1.map (x => x.split(","))
r2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[22] at map at <console>:25


scala> val r3 = r2.map (x => {
       val sno = x(0)
       val moviename = x(1)
       val genre = x(2)
       (sno,moviename,genre)
       }
       )
 
scala> r3.take(3).foreach(println)
(movieId,title,genres)
(1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
(2,Jumanji (1995),Adventure|Children|Fantasy)


// RDD to Dataframe
scala> val dfX = r3.toDF
dfX: org.apache.spark.sql.DataFrame = [_1: string, _2: string ... 1 more field]

scala> dfX.printSchema
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)


scala> dfX.show(3)
+-------+----------------+--------------------+
|     _1|              _2|                  _3|
+-------+----------------+--------------------+
|movieId|           title|              genres|
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 3 rows


// Making DataSet from RDD
scala> val dsX = r3.toDS
dsX: org.apache.spark.sql.Dataset[(String, String, String)] = [_1: string, _2: string ... 1 more field]

scala> dsX.printSchema
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)


scala> dsX.show(3)
+-------+----------------+--------------------+
|     _1|              _2|                  _3|
+-------+----------------+--------------------+
|movieId|           title|              genres|
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 3 rows

// Making RDD from DataFrame
scala> val rddX = dfX.rdd
rddX: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[35] at rdd at <console>:25

scala> rddX.take(3).foreach(println)
[movieId,title,genres]
[1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy]
[2,Jumanji (1995),Adventure|Children|Fantasy]

//Making RDD from DataSet
scala> val rddXX = dsX.rdd
rddXX: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[37] at rdd at <console>:25

scala> rddXX.take(3).foreach(println)
(movieId,title,genres)
(1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
(2,Jumanji (1995),Adventure|Children|Fantasy)









// dataframe Json input
scala> val df = spark.read.format("json").load("hdfs://localhost:9000/user/employee.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show(3)
+---+------+
|age|  name|
+---+------+
| 28|  John|
| 28|Andrew|
| 22|Clarke|
+---+------+
only showing top 3 rows

scala> case class Person(name:String,age:Long)
defined class Person

// Making Dataset from DataFrame which use case class named Person
scala> val dsPerson = df.as[Person]
dsPerson: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

scala> dsPerson.show(3)
+---+------+
|age|  name|
+---+------+
| 28|  John|
| 28|Andrew|
| 22|Clarke|
+---+------+
only showing top 3 rows


scala> dsPerson.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

// Converting dataSet to Dataframe
scala> val tmpdf = dsPerson.toDF
tmpdf: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> tmpdf.show(3)
+---+------+
|age|  name|
+---+------+
| 28|  John|
| 28|Andrew|
| 22|Clarke|
+---+------+
only showing top 3 rows


scala> tmpdf.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)






// dataframe textfile as input. so, every line will be a row
scala> val df1 = spark.read.format("text").load("hdfs://localhost:9000/user/employee.txt")
df1: org.apache.spark.sql.DataFrame = [value: string]

scala> df1.show(3)
+---------+
|    value|
+---------+
|  John,28|
|Andrew,36|
|Clarke,22|
+---------+
only showing top 3 rows

// Converting dataframe into DataSet
scala> val ds1 = df1.as[String]
ds1: org.apache.spark.sql.Dataset[String] = [value: string]


scala> ds1.show(3)
+---------+
|    value|
+---------+
|  John,28|
|Andrew,36|
|Clarke,22|
+---------+

scala> val ds2 = ds1.map (x => x.split(","))
ds2: org.apache.spark.sql.Dataset[Array[String]] = [value: array<string>]

// Array of Strings
scala> ds2.show(5)
+-------------+
|        value|
+-------------+
|   [John, 28]|
| [Andrew, 36]|
| [Clarke, 22]|
|  [Kevin, 42]|
|[Richard, 51]|
+-------------+

scala> case class Person(name:String,age:Long)
defined class Person

scala> val ds3 = ds2.map( x => {
       val name = x(0)
       val age = x(1)
       Person(age,name)
       })
ds3: org.apache.spark.sql.Dataset[(String, String)] = [_1: string, _2: string]

// DataSet of String, String but schema is still missing
scala> ds3.show(5)
+-------+---+
|     _1| _2|
+-------+---+
|   John| 28|
| Andrew| 36|
| Clarke| 22|
|  Kevin| 42|
|Richard| 51|
+-------+---+

scala> ds3.printSchema
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)



hadoop@hadoop:~$ cat > samplefile.txt
spark is a big data technology
hadoop is a big data technology
spark and hadoop are big data technologies
^C
hadoop@hadoop:~$ hdfs dfs -copyFromLocal samplefile.txt /user/

scala>  val r1 = sc.textFile("hdfs://localhost:9000/user/samplefile.txt")
r1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/samplefile.txt MapPartitionsRDD[8] at textFile at <console>:24

scala> r1.collect.foreach(println)
spark is a big data technology
hadoop is a big data technology
spark and hadoop are big data technologies


scala> val d1 = spark.read.format("text").load("hdfs://localhost:9000/user/samplefile.txt")
d1: org.apache.spark.sql.DataFrame = [value: string]


scala> d1.show
+--------------------+
|               value|
+--------------------+
|spark is a big da...|
|hadoop is a big d...|
|spark and hadoop ...|
+--------------------+


Handling XML, JSON, CSV files in Spark with Scala

// Here we have 3 different files .csv, .json, .xml (But same content in different formats)
hadoop@hadoop:~/Desktop/vow$ hdfs dfs -ls /user/countries/
Found 3 items
-rw-r--r--   1 hadoop supergroup      13643 2019-01-31 12:48 /user/countries/country-capitals.csv
-rw-r--r--   1 hadoop supergroup      40247 2019-01-31 12:48 /user/countries/country-capitals.json
-rw-r--r--   1 hadoop supergroup      76081 2019-01-31 12:48 /user/countries/country-capitals.xml

country-capitals.xml:
---------------------
<?xml version="1.0" encoding="UTF-8"?>
<Countries>
 <Country>
      <CapitalLatitude>64.15</CapitalLatitude>
      <CapitalLongitude>-21.950000</CapitalLongitude>
      <CapitalName>Reykjavik</CapitalName>
      <ContinentName>Europe</ContinentName>
      <CountryCode>IS</CountryCode>
      <CountryName>Iceland</CountryName>
   </Country>
   <Country>
      <CapitalLatitude>28.6</CapitalLatitude>
      <CapitalLongitude>77.200000</CapitalLongitude>
      <CapitalName>New Delhi</CapitalName>
      <ContinentName>Asia</ContinentName>
      <CountryCode>IN</CountryCode>
      <CountryName>India</CountryName>
   </Country>
</Countries>

country-capitals.json:
----------------------
[
    {
      "CountryName":"Iceland",
      "CapitalName":"Reykjavik",
      "CapitalLatitude":"64.15",
      "CapitalLongitude":"-21.950000",
      "CountryCode":"IS",
      "ContinentName":"Europe"
   },
   {
      "CountryName":"India",
      "CapitalName":"New Delhi",
      "CapitalLatitude":"28.6",
      "CapitalLongitude":"77.200000",
      "CountryCode":"IN",
      "ContinentName":"Asia"
   }
 ]

country-capitals.csv:
----------------------
CountryName,CapitalName,CapitalLatitude,CapitalLongitude,CountryCode,ContinentName
Iceland,Reykjavik,64.15,-21.950000,IS,Europe
India,New Delhi,28.6,77.200000,IN,Asia




 Browse : https://mvnrepository.com
 search for : spark xml, spark csv, spark avro there and copy all scala version 2.11.12

 Here is the dependency packages for csv, xml, avro
<!-- https://mvnrepository.com/artifact/com.databricks/spark-csv -->
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.11</artifactId>
    <version>1.5.0</version>
</dependency>


<!-- https://mvnrepository.com/artifact/com.databricks/spark-xml -->
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-xml_2.11</artifactId>
    <version>0.5.0</version>
</dependency>


<!-- https://mvnrepository.com/artifact/com.databricks/spark-avro -->
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-avro_2.11</artifactId>
    <version>4.0.0</version>
</dependency>


make a string like ==> groupId:artifactId:version,groupId:artifactId:version,groupId:artifactId:version,

Run the spark-shell with --packages option :

spark-shell --packages com.databricks:spark-csv_2.11:1.5.0,com.databricks:spark-xml_2.11:0.5.0,com.databricks:spark-avro_2.11:4.0.0

// Read XML and make DataFrame
scala> val dfCountriesXML = spark.read.format("xml").option("rowTag","Country").load("hdfs://localhost:9000/user/countries/country-capitals.xml")
dfCountriesXML: org.apache.spark.sql.DataFrame = [CapitalLatitude: string, CapitalLongitude: double ... 4 more fields]

scala> dfCountriesXML.show(5)
+------------------+----------------+------------------+-------------+-----------+--------------------+
|   CapitalLatitude|CapitalLongitude|       CapitalName|ContinentName|CountryCode|         CountryName|
+------------------+----------------+------------------+-------------+-----------+--------------------+
|              9.55|           44.05|          Hargeisa|       Africa|       NULL|          Somaliland|
|        -54.283333|           -36.5| King Edward Point|   Antarctica|         GS|South Georgia and...|
|            -49.35|       70.216667|Port-aux-Français|   Antarctica|         TF|French Southern a...|
|31.766666666666666|       35.233333|         Jerusalem|         Asia|         PS|           Palestine|
|         60.116667|            19.9|         Mariehamn|       Europe|         AX|       Aland Islands|
+------------------+----------------+------------------+-------------+-----------+--------------------+
only showing top 5 rows


 // Making Dataframe - Reading CSV with Schema
scala> val dfCountriesCSV = spark.read.format("csv").option("header","true").option("inferSchema","true").load("hdfs://localhost:9000/user/countries/country-capitals.csv")


scala> dfCountriesCSV.show(3)
+--------------------+-----------------+---------------+----------------+-----------+-------------+
|         CountryName|      CapitalName|CapitalLatitude|CapitalLongitude|CountryCode|ContinentName|
+--------------------+-----------------+---------------+----------------+-----------+-------------+
|          Somaliland|         Hargeisa|           9.55|           44.05|       NULL|       Africa|
|South Georgia and...|King Edward Point|     -54.283333|           -36.5|         GS|   Antarctica|
|French Southern a...|Port-aux-Français|         -49.35|       70.216667|         TF|   Antarctica|
+--------------------+-----------------+---------------+----------------+-----------+-------------+
only showing top 3 rows


scala> dfCountriesCSV.printSchema
root
 |-- CountryName: string (nullable = true)
 |-- CapitalName: string (nullable = true)
 |-- CapitalLatitude: string (nullable = true)
 |-- CapitalLongitude: double (nullable = true)
 |-- CountryCode: string (nullable = true)
 |-- ContinentName: string (nullable = true)

// Make DataFrame - Reading json file
 scala> val dfCountriesJSON = spark.read.format("json").option("header","true").option("inferSchema","true").load("hdfs://localhost:9000/user/countries/country-capitals.json")
dfCountriesJSON: org.apache.spark.sql.DataFrame = [CapitalLatitude: string, CapitalLongitude: string ... 4 more fields]

scala> dfCountriesJSON.show(3)
+---------------+----------------+------------------+-------------+-----------+--------------------+
|CapitalLatitude|CapitalLongitude|       CapitalName|ContinentName|CountryCode|         CountryName|
+---------------+----------------+------------------+-------------+-----------+--------------------+
|           9.55|       44.050000|          Hargeisa|       Africa|       NULL|          Somaliland|
|     -54.283333|      -36.500000| King Edward Point|   Antarctica|         GS|South Georgia and...|
|         -49.35|       70.216667|Port-aux-Français|   Antarctica|         TF|French Southern a...|
+---------------+----------------+------------------+-------------+-----------+--------------------+
only showing top 3 rows


scala> dfCountriesJSON.printSchema
root
 |-- CapitalLatitude: string (nullable = true)
 |-- CapitalLongitude: string (nullable = true)
 |-- CapitalName: string (nullable = true)
 |-- ContinentName: string (nullable = true)
 |-- CountryCode: string (nullable = true)
 |-- CountryName: string (nullable = true)

Wednesday, 30 January 2019

Hive and Spark Integration

Integrating Hive with Spark
//Start Hive and create a new Database : School and Create a new Table : Student and add 3 records

hive> show databases;
OK
default
Time taken: 0.832 seconds, Fetched: 1 row(s)
hive> create database School ;
OK
Time taken: 0.343 seconds
hive> use School;
OK
Time taken: 0.045 seconds
hive> create table Student(id int, name varchar(50));
OK
Time taken: 0.685 seconds
hive> insert into Student (id,name) values(101,'Sankar');
insert into Student (id,name) values(102,"Zee");
insert into Student (id,name) values(103,"Maha");

hive> select * from Student;
OK
101 Sankar
102 Zee
103 Maha
Time taken: 0.261 seconds, Fetched: 3 row(s)


// Start Spark and do the following to access Hive Database (School), and Hive Table (Student)
scala> spark.sql("use School")
scala> spark.sql("select * from Student").show()
+---+------+                                                                   
| id|  name|
+---+------+
|101|Sankar|
|102|   Zee|
|103|  Maha|
+---+------+

Datanode is not running - Datanode is missing - how to fix?

// Datanode is not up. Its not running. we need to do the following fix to make datanode up

jps
3317 NameNode
3785 SecondaryNameNode
5292 RunJar
21276 SparkSubmit
4350 NodeManager
5711 Jps
4063 ResourceManager
stop-all.sh

hadoop@hadoop:/usr/local/hadoop/etc/hadoop$ sudo gedit core-site.xml
see this :
<name>hadoop.tmp.dir</name>
<value>/app/hadoop/tmp</value> // this is the location we need to recreate
<description>A base for other temporary directories.</description>
</property>

// delete
sudo rm -Rf /app/hadoop/tmp

//recreate
sudo mkdir /app/hadoop/tmp
sudo chown -R hadoop:hadoop /app/hadoop/tmp
sudo chmod 777 /app/hadoop/tmp

format namenode
hdfs namenode -format

start-all.sh

jps
3317 NameNode
3785 SecondaryNameNode
5292 RunJar
21276 SparkSubmit
4350 NodeManager
3519 DataNode
5711 Jps
4063 ResourceManager

How to fix metastore issue in Hive and MySQL?

run mysql...
sudo mysql
drop database metastore

 $ schematool -dbType mysql -initSchema 
 $ schematool -dbType mysql -info

How to do with User Defined Function in Spark with Scala?

How to do with User Defined Function in Spark with Scala?
How to save Dataframe in Hive?

hdfs dfs -copyFromLocal ratings.csv /user/

scala> val r1 = sc.textFile("hdfs://localhost:9000/user/ratings.csv")
r1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/ratings.csv MapPartitionsRDD[21] at textFile at <console>:24

scala> r1.first()
res19: String = userId,movieId,rating,timestamp

scala> r1.count
res9: Long = 27753445 

scala> val header = r1.first()
header: String = userId,movieId,rating,timestamp

scala> val r2 = r1.filter (line => line != header)
r2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[66] at filter at <console>:27

scala> r2.first()
res18: String = 1,307,3.5,1256677221


val r3 = r2.map(x => x.split(",")).map {x =>
      val userid = x(0).toInt
      val movieid = x(1).toInt
      val rating = x(2).toFloat
      val time = x(3)
      (userid,movieid,rating,time)
      }

scala> r3.count
res21: Long = 27753444

scala> val df = r3.toDF("UserID","MovieID","Rating","Time")
df: org.apache.spark.sql.DataFrame = [UserID: int, MovieID: int ... 2 more fields]

scala> df.show(5)
+------+-------+------+----------+
|UserID|MovieID|Rating|      Time|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
+------+-------+------+----------+
only showing top 5 rows

// Number of Milliseconds from Jan,01,1970 to till date / epoch time 
 scala> sc.startTime
res1: Long = 1548827706613


scala> df.printSchema
root
 |-- UserID: integer (nullable = false)
 |-- MovieID: integer (nullable = false)
 |-- Rating: float (nullable = false)
 |-- Time: string (nullable = true)

// change the data type of "Time" from String into Long
scala> val df1 = df.withColumn("Time",col("Time").cast("Long"))
df1: org.apache.spark.sql.DataFrame = [UserID: int, MovieID: int ... 2 more fields]

scala> df1.printSchema
root
 |-- UserID: integer (nullable = false)
 |-- MovieID: integer (nullable = false)
 |-- Rating: float (nullable = false)
 |-- Time: long (nullable = true)


scala> spark.sql("show functions").show(250)

scala> import java.util.Date
import java.util.Date

scala> import java.text.SimpleDateFormat
import java.text.SimpleDateFormat


// user defined function
scala> def ephochToTimestamp(ephoMillis:Long):String = {
     |   val time:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")
     |   time.format(ephoMillis)
     | }
ephochToTimestamp: (ephoMillis: Long)String

// test the function
scala> ephochToTimestamp(1548827706613L)
res12: String = 2019-01-30 11:25:06

//register the function
scala> spark.udf.register("convertEpoch",ephochToTimestamp _)
res14: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(LongType)))

scala> val convertEpoch = udf(ephochToTimestamp _)
convertEpoch: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(LongType)))


// Here Time is Long in Long Data type
scala> df1.show(5)
+------+-------+------+----------+
|UserID|MovieID|Rating|      Time|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
+------+-------+------+----------+
only showing top 5 rows

// applying function
scala> val df2 = df1.withColumn("Time",convertEpoch(col("Time")))
scala> df2.show(5)
+------+-------+------+-------------------+
|UserID|MovieID|Rating|               Time|
+------+-------+------+-------------------+
|     1|    307|   3.5|1970-01-15 06:34:37|
|     1|    481|   3.5|1970-01-15 06:34:37|
|     1|   1091|   1.5|1970-01-15 06:34:37|
|     1|   1257|   4.5|1970-01-15 06:34:37|
|     1|   1449|   4.5|1970-01-15 06:34:37|
+------+-------+------+-------------------+
only showing top 5 rows

 spark.sql("show functions").show(250) // now u can see convertEpoch function present in that list


// Now the last column "Time" is in String Data Type

scala> df2.printSchema
root
 |-- UserID: integer (nullable = false)
 |-- MovieID: integer (nullable = false)
 |-- Rating: float (nullable = false)
 |-- Time: string (nullable = true)


// Now changing the datatype of "Time" from 'String' to 'TimeStamp'
scala> val df3 = df2.withColumn("Time",col("Time").cast("Timestamp"))
df3: org.apache.spark.sql.DataFrame = [UserID: int, MovieID: int ... 2 more fields]

scala> df3.printSchema
root
 |-- UserID: integer (nullable = false)
 |-- MovieID: integer (nullable = false)
 |-- Rating: float (nullable = false)
 |-- Time: timestamp (nullable = true)


scala> df3.show(5)
+------+-------+------+-------------------+
|UserID|MovieID|Rating|               Time|
+------+-------+------+-------------------+
|     1|    307|   3.5|1970-01-15 06:34:37|
|     1|    481|   3.5|1970-01-15 06:34:37|
|     1|   1091|   1.5|1970-01-15 06:34:37|
|     1|   1257|   4.5|1970-01-15 06:34:37|
|     1|   1449|   4.5|1970-01-15 06:34:37|
+------+-------+------+-------------------+
only showing top 5 rows

scala> val df4 = df3.select( col("UserID"),col("MovieID"),col("Rating"),year(col("Time")),month(col("Time")),hour(col("Time")) )
df4: org.apache.spark.sql.DataFrame = [UserID: int, MovieID: int ... 4 more fields]

scala> df4.show(5)
+------+-------+------+----------+-----------+----------+
|UserID|MovieID|Rating|year(Time)|month(Time)|hour(Time)|
+------+-------+------+----------+-----------+----------+
|     1|    307|   3.5|      1970|          1|         6|
|     1|    481|   3.5|      1970|          1|         6|
|     1|   1091|   1.5|      1970|          1|         6|
|     1|   1257|   4.5|      1970|          1|         6|
|     1|   1449|   4.5|      1970|          1|         6|
+------+-------+------+----------+-----------+----------+
only showing top 5 rows

//alias names applied for Year,Month,Hour
scala> df4.show(5)
+------+-------+------+----+-----+----+
|UserID|MovieID|Rating|Year|Month|Hour|
+------+-------+------+----+-----+----+
|     1|    307|   3.5|1970|    1|   6|
|     1|    481|   3.5|1970|    1|   6|
|     1|   1091|   1.5|1970|    1|   6|
|     1|   1257|   4.5|1970|    1|   6|
|     1|   1449|   4.5|1970|    1|   6|
+------+-------+------+----+-----+----+
only showing top 5 rows

// Write the dataframe content in hive
scala> df4.write.saveAsTable("default.ratingsTable")

scala> spark.sql("show databases").show()
+------------+
|databaseName|
+------------+
|     default|
+------------+

scala> spark.sql("use default")
res34: org.apache.spark.sql.DataFrame = []

scala> spark.sql("show tables").show()
+--------+------------+-----------+
|database|   tableName|isTemporary|
+--------+------------+-----------+
| default|ratingstable|      false|
+--------+------------+-----------+

scala> val x = spark.sql("select * from ratingstable")
x: org.apache.spark.sql.DataFrame = [UserID: int, MovieID: int ... 4 more fields]

scala> x.count
res36: Long = 27753444

scala> x.show(5)
+------+-------+------+----+-----+----+
|UserID|MovieID|Rating|Year|Month|Hour|
+------+-------+------+----+-----+----+
|     1|    307|   3.5|1970|    1|   6|
|     1|    481|   3.5|1970|    1|   6|
|     1|   1091|   1.5|1970|    1|   6|
|     1|   1257|   4.5|1970|    1|   6|
|     1|   1449|   4.5|1970|    1|   6|
+------+-------+------+----+-----+----+
only showing top 5 rows


hive> show databases;
OK
default
Time taken: 0.746 seconds, Fetched: 1 row(s)
hive> use default;
OK
Time taken: 0.067 seconds
hive> show tables;
OK
ratingstable
Time taken: 0.07 seconds, Fetched: 1 row(s)
hive> select * from ratingstable limit 10;
OK
Time taken: 3.442 seconds
hive> select * from ratingstable
    > ;
OK
Time taken: 0.321 seconds
hive> describe ratingstable;
OK
userid              int                                     
movieid              int                                     
rating              float                                   
year                int                                     
month                int                                     
hour                int   

Location:            file:/home/hadoop/spark-warehouse/ratingstable

# Storage Information
SerDe Library:      org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat:        org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat:        org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
Compressed:          No                 
Num Buckets:        -1                 
Bucket Columns:      []                 
Sort Columns:        []                 
Storage Desc Params:
path                hdfs://localhost:9000/user/hive/warehouse/ratingstable
serialization.format 1                 
Time taken: 0.246 seconds, Fetched: 35 row(s)
hive>


hdfs dfs -ls /user/hive/warehouse/ratingstable

Found 7 items
-rw-r--r--   3 hadoop supergroup          0 2019-01-30 13:42 /user/hive/warehouse/ratingstable/_SUCCESS
-rw-r--r--   3 hadoop supergroup   13775546 2019-01-30 13:39 /user/hive/warehouse/ratingstable/part-00000-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet
-rw-r--r--   3 hadoop supergroup   13669265 2019-01-30 13:40 /user/hive/warehouse/ratingstable/part-00001-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet
-rw-r--r--   3 hadoop supergroup   13437742 2019-01-30 13:40 /user/hive/warehouse/ratingstable/part-00002-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet
-rw-r--r--   3 hadoop supergroup   13207254 2019-01-30 13:41 /user/hive/warehouse/ratingstable/part-00003-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet
-rw-r--r--   3 hadoop supergroup   13113080 2019-01-30 13:42 /user/hive/warehouse/ratingstable/part-00004-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet
-rw-r--r--   3 hadoop supergroup    8639989 2019-01-30 13:42 /user/hive/warehouse/ratingstable/part-00005-45f04917-8bb5-4b55-b6a3-986fe9401326-c000.snappy.parquet

Tuesday, 29 January 2019

Samples which use with and without using Case class

Here is the samples which use with and without using Case class.
input
movies.csv:
------------
hdfs dfs -head /user/movies.csv

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller
11,"American President, The (1995)",Comedy|Drama|Romance
12,Dracula: Dead and Loving It (1995),Comedy|Horror
13,Balto (1995),Adventure|Animation|Children
14,Nixon (1995),Drama
15,Cutthroat Island (1995),Action|Adventure|Romance
16,Casino (1995),Crime|Drama
17,Sense and Sensibility (1995),Drama|Romance
18,Four Rooms (1995),Comedy
19,Ace Ventura: When Nature Calls (1995),Comedy
20,Money Train (1995),Action|Comedy|Crime|Drama|Thriller
21,Get Shorty (1995),Comedy|Crime|Thriller
22,Copycat (1995),Crime|Drama|Horror|Mystery|Thriller


scala> val d1 = sc.textFile("hdfs://localhost:9000/user/movies.csv")
d1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/movies.csv MapPartitionsRDD[43] at textFile at <console>:30

scala> d1.take(10).foreach(println)
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action


scala> val d2 = d1.map (x => x.split(","))
d2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[46] at map at <console>:31

// Here we are not using Case class // without Case class
val d3 = d2.map { x =>
       val movieID = x(0)
       val title = x(1)
       val genre = x(2)
       (movieID,title,genre)
       }

scala> d3.take(5).foreach(println)
(movieId,title,genres)
(1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
(2,Jumanji (1995),Adventure|Children|Fantasy)
(3,Grumpier Old Men (1995),Comedy|Romance)
(4,Waiting to Exhale (1995),Comedy|Drama|Romance)

// Dataframe without proper header
scala> val df1 = d3.toDF()
df1: org.apache.spark.sql.DataFrame = [_1: string, _2: string ... 1 more field]

scala> df1.show(3)
+-------+----------------+--------------------+
|     _1|              _2|                  _3|
+-------+----------------+--------------------+
|movieId|           title|              genres|
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 3 rows

// Dataframe with Header
scala> val df1 = d3.toDF("MovieID","Title","Genre")
df1: org.apache.spark.sql.DataFrame = [MovieID: string, Title: string ... 1 more field]

scala> df1.show(3)
+-------+----------------+--------------------+
|MovieID|           Title|               Genre|
+-------+----------------+--------------------+
|movieId|           title|              genres|
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 3 rows


// Here we are going to make use of Case class

scala> case class Movies(MovieID:String, Title:String, Genre:String)
defined class Movies

scala> val d1 = sc.textFile("hdfs://localhost:9000/user/movies.csv")
d1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/movies.csv MapPartitionsRDD[62] at textFile at <console>:30

scala>  val d2 = d1.map (x => x.split(","))
d2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[63] at map at <console>:31

// Here we are Applying Case Class // with case class
val d3 = d2.map { x =>
       val movieID = x(0)
       val title = x(1)
       val genre = x(2)
       Movies(movieID,title,genre)
       }
 
scala> d3.take(5).foreach(println)
Movies(movieId,title,genres)
Movies(1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
Movies(2,Jumanji (1995),Adventure|Children|Fantasy)
Movies(3,Grumpier Old Men (1995),Comedy|Romance)
Movies(4,Waiting to Exhale (1995),Comedy|Drama|Romance)


scala> val df = d3.toDF()
df: org.apache.spark.sql.DataFrame = [MovieID: string, Title: string ... 1 more field]

// fields automatically came because we used Case class object
scala> df.show(5)
+-------+--------------------+--------------------+
|MovieID|               Title|               Genre|
+-------+--------------------+--------------------+
|movieId|               title|              genres|
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
+-------+--------------------+--------------------+
only showing top 5 rows


scala> case class Movies (MovieID:Int, Title:String, Genre:String)
defined class Movies

scala> val a1 = Movies(1,"Gladiator","Action")
a1: Movies = Movies(1,Gladiator,Action)

scala> val a2 = Movies(2,"Batman","Thriller")
a2: Movies = Movies(2,Batman,Thriller)

scala> val a3 = Movies(3,"Titanic","Romance")
a3: Movies = Movies(3,Titanic,Romance)

scala> val myList = List(a1,a2,a3)
myList: List[Movies] = List(Movies(1,Gladiator,Action), Movies(2,Batman,Thriller), Movies(3,Titanic,Romance))

scala> val myRDD = sc.makeRDD(myList)
myRDD: org.apache.spark.rdd.RDD[Movies] = ParallelCollectionRDD[72] at makeRDD at <console>:32

scala> myRDD.foreach(println)
Movies(1,Gladiator,Action)
Movies(2,Batman,Thriller)
Movies(3,Titanic,Romance)

scala> val df = myRDD.toDF() // with Case class
df: org.apache.spark.sql.DataFrame = [MovieID: int, Title: string ... 1 more field]

scala> df.show()
+-------+---------+--------+
|MovieID|    Title|   Genre|
+-------+---------+--------+
|      1|Gladiator|  Action|
|      2|   Batman|Thriller|
|      3|  Titanic| Romance|
+-------+---------+--------+


scala> df.printSchema
root
 |-- MovieID: integer (nullable = false)
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)


// Here we applied schema to an RDD which doesn't have schema using case class

[RDD without schema] + [case class] => dataframe with Schema
// We provide schema using case class



// here we have an Array of Tuples
scala> val rdd1 = sc.makeRDD(Array( (1,2),(2,3),(3,4),(4,5),(5,6),(6,7) ))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[78] at makeRDD at <console>:30

scala> rdd1.foreach(println)
(1,2)
(2,3)
(3,4)
(4,5)
(5,6)
(6,7)

// Creating a dataframe without applying our own custom schema
scala> val df = rdd1.toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: int]

// here the schema is auto generated [without Case class]
scala> df.printSchema
root
 |-- _1: integer (nullable = false)
 |-- _2: integer (nullable = false)


scala> df.show()
+---+---+
| _1| _2|
+---+---+
|  1|  2|
|  2|  3|
|  3|  4|
|  4|  5|
|  5|  6|
|  6|  7|
+---+---+

scala> val rdd1 = sc.makeRDD(Array( (1,2),(2,3),(3,4),(4,5),(5,6),(6,7) ))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[83] at makeRDD at <console>:30

scala> case class ourSchema(id1:Int, id2:Int)
defined class ourSchema


// Here schema applied with the help of case class object
scala> val rdd2 = rdd1.map (x => ourSchema(x._1, x._2))
rdd2: org.apache.spark.rdd.RDD[ourSchema] = MapPartitionsRDD[84] at map at <console>:33

scala> rdd2.foreach(println)
ourSchema(1,2)
ourSchema(2,3)
ourSchema(3,4)
ourSchema(4,5)
ourSchema(5,6)
ourSchema(6,7)

scala> val df = rdd2.toDF()
df: org.apache.spark.sql.DataFrame = [id1: int, id2: int]

scala> df.show()
+---+---+
|id1|id2|
+---+---+
|  1|  2|
|  2|  3|
|  3|  4|
|  4|  5|
|  5|  6|
|  6|  7|
+---+---+


scala> val df1 = df.toDF("Left","Right")
df1: org.apache.spark.sql.DataFrame = [Left: int, Right: int]

scala> df1.show()
+----+-----+
|Left|Right|
+----+-----+
|   1|    2|
|   2|    3|
|   3|    4|
|   4|    5|
|   5|    6|
|   6|    7|
+----+-----+

scala> val df2 = df1.select("Left","Right")
df2: org.apache.spark.sql.DataFrame = [Left: int, Right: int]

scala> df2.show
+----+-----+
|Left|Right|
+----+-----+
|   1|    2|
|   2|    3|
|   3|    4|
|   4|    5|
|   5|    6|
|   6|    7|
+----+-----+

// Here headers are wrong
scala> val df2 = df1.select(col("Left")+100,col("Right")+10)
df2: org.apache.spark.sql.DataFrame = [(Left + 100): int, (Right + 10): int]

scala> df2.show
+------------+------------+
|(Left + 100)|(Right + 10)|
+------------+------------+
|         101|          12|
|         102|          13|
|         103|          14|
|         104|          15|
|         105|          16|
|         106|          17|
+------------+------------+

// Here we properly managed headers using alias names
scala>  val df2 = df1.select(col("Left")+100 as "Left",col("Right")+10 as "Right")
df2: org.apache.spark.sql.DataFrame = [Left: int, Right: int]

scala> df2.show
+----+-----+
|Left|Right|
+----+-----+
| 101|   12|
| 102|   13|
| 103|   14|
| 104|   15|
| 105|   16|
| 106|   17|
+----+-----+

scala> val df3 = df2.select("Left","Right").groupBy("Left").agg(count("*"))
df3: org.apache.spark.sql.DataFrame = [Left: int, count(1): bigint]

scala> df3.show
+----+--------+
|Left|count(1)|
+----+--------+
| 101|       1|
| 103|       1|
| 102|       1|
| 105|       1|
| 106|       1|
| 104|       1|
+----+--------+


scala> val df3 = df2.select("Left","Right").groupBy("Right").agg(count("*"))
df3: org.apache.spark.sql.DataFrame = [Right: int, count(1): bigint]

scala> df3.show
+-----+--------+
|Right|count(1)|
+-----+--------+
|   12|       1|
|   13|       1|
|   16|       1|
|   15|       1|
|   17|       1|
|   14|       1|
+-----+--------+

How to do groupBy Aggregation in Spark with Scala

Input
file2.csv:
--------------
hdfs dfs -cat /user/file2.csv
s1,d1
s1,d2
s1,d2
s1,d3
s2,d1
s2,d3
s2,d1
s3,d2
s3,d1
s1,d1
s2,d1
s3,d1
s1,d1
s2,d2
s3,d3


// autogenerated column headers
scala> val df = spark.read.format("csv").option("inferSchema","true").load("hdfs://localhost:9000/user/file2.csv")
df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string]

scala> df.show
+---+---+
|_c0|_c1|
+---+---+
| s1| d1|
| s1| d2|
| s1| d2|
| s1| d3|
| s2| d1|
| s2| d3|
| s2| d1|
| s3| d2|
| s3| d1|
| s1| d1|
| s2| d1|
| s3| d1|
| s1| d1|
| s2| d2|
| s3| d3|
+---+---+

// groupBy aggregation goes here
scala> df.groupBy("_c0","_c1").agg(count("*")).show
+---+---+--------+                                                             
|_c0|_c1|count(1)|
+---+---+--------+
| s3| d2|       1|
| s2| d2|       1|
| s1| d2|       2|
| s1| d1|       3|
| s3| d1|       2|
| s2| d1|       3|
| s3| d3|       1|
| s1| d3|       1|
| s2| d3|       1|
+---+---+--------+

// In order to make our own schema we need to import the following
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

// Making Schema
scala> val sch = StructType(StructField("School",StringType)::StructField("Department",StringType)::Nil)
sch: org.apache.spark.sql.types.StructType = StructType(StructField(School,StringType,true), StructField(Department,StringType,true))

// applying schema
scala> val df = spark.read.format("csv").option("inferSchema","true").schema(sch).load("hdfs://localhost:9000/user/file2.csv")
df: org.apache.spark.sql.DataFrame = [School: string, Department: string]

scala> df.printSchema
root
 |-- School: string (nullable = true)
 |-- Department: string (nullable = true)

scala> df.show
+------+----------+
|School|Department|
+------+----------+
|    s1|        d1|
|    s1|        d2|
|    s1|        d2|
|    s1|        d3|
|    s2|        d1|
|    s2|        d3|
|    s2|        d1|
|    s3|        d2|
|    s3|        d1|
|    s1|        d1|
|    s2|        d1|
|    s3|        d1|
|    s1|        d1|
|    s2|        d2|
|    s3|        d3|
+------+----------+

// groupBy Aggregation operation
//added alias name for 3rd column
scala> df.groupBy("School","Department").agg(count("*") as "Count").show
+------+----------+-----+
|School|Department|Count|
+------+----------+-----+
|    s3|        d2|    1|
|    s2|        d2|    1|
|    s1|        d2|    2|
|    s1|        d1|    3|
|    s3|        d1|    2|
|    s2|        d1|    3|
|    s3|        d3|    1|
|    s1|        d3|    1|
|    s2|        d3|    1|
+------+----------+-----+

How to make our own schema for a dataframe?

How to do with StructType, StructField Examples in Spark with Scala?
How to make our own schema for a dataframe?
Input
file1.csv:
-----------
hdfs dfs -cat hdfs://localhost:9000/user/file1.csv
1,2,3.4
2,3,4.5
3,4.5,6.7
4,5,7.8

// we accidentally added header=true. so it shows 1,2,3.4 as headers
scala> val df1 = spark.read.format("csv").option("inferSchema","true").option("header","true").load("hdfs://localhost:9000/user/file1.csv")
df1: org.apache.spark.sql.DataFrame = [1: int, 2: double ... 1 more field]

scala> df1.printSchema
root
 |-- 1: integer (nullable = true)
 |-- 2: double (nullable = true)
 |-- 3.4: double (nullable = true)

// this one is autogenerated schema with header=true but all headers are just 1,2,3.4 numbers. that is wrong
scala> df1.show()
+---+---+---+
|  1|  2|3.4|
+---+---+---+
|  2|3.0|4.5|
|  3|4.5|6.7|
|  4|5.0|7.8|
+---+---+---+

// we removed header=true so it added _c0, _c1, _c2 as autogenerated headers.
// But how to use our own columns.. answer is given below.
scala> val df1 = spark.read.format("csv").option("inferSchema","true").load("hdfs://localhost:9000/user/file1.csv")
df1: org.apache.spark.sql.DataFrame = [_c0: int, _c1: double ... 1 more field]

scala> df1.show()
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|  1|2.0|3.4|
|  2|3.0|4.5|
|  3|4.5|6.7|
|  4|5.0|7.8|
+---+---+---+

// Instead of autogenerated structure, we are going to make our own structure for a dataframe
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._


// Here we are making our own schema
scala> val structure = StructType(StructField("id1",FloatType)::StructField("id2",FloatType)::StructField("id3",FloatType)::Nil)
structure: org.apache.spark.sql.types.StructType = StructType(StructField(id1,FloatType,true), StructField(id2,FloatType,true), StructField(id3,FloatType,true))

scala> val df1 = spark.read.format("csv").option("inferSchema","true").schema(structure).load("hdfs://localhost:9000/user/file1.csv")
df1: org.apache.spark.sql.DataFrame = [id1: float, id2: float ... 1 more field]

scala> df1.printSchema
root
 |-- id1: float (nullable = true)
 |-- id2: float (nullable = true)
 |-- id3: float (nullable = true)

scala> df1.show
+---+---+---+
|id1|id2|id3|
+---+---+---+
|1.0|2.0|3.4|
|2.0|3.0|4.5|
|3.0|4.5|6.7|
|4.0|5.0|7.8|
+---+---+---+

Monday, 28 January 2019

Dataframe example with Cancer Data csv file

Input file
CancerData10.csv:
-------------------
ID,Name,Age,Sex,State,Symptoms,Diagnosis,Cancer,CancerSc,Stage,Treatment,Survival
100001,David,45,M,Alamama,Red Itchy Patches,Biopsy,Malignant,Skin,1,Resection,Yes
100002,John,56,M,Alaska,Blood Couch,Pet Scan,Malignant,Throacic,2,Surgery,Yes
100003,Paul,65,M,Arizona,Red Itchy Patches,Biopsy,Malignant,Skin,3,Resection,No
100004,Mark,35,M,Arkansas,Blood Couch,Pet Scan,Malignant,Throacic,3,Surgery,No
100005,James,44,M,California,Red Itchy Patches,Biopsy,Malignant,Skin,2,Resection,Yes
100006,Andrew,53,M,Colarado,Red Itchy Patches,Biopsy,Malignant,Skin,2,Resection,Yes
100007,Scott,68,M,Conneticut,Blood Couch,Pet Scan,Malignant,Throacic,1,Surgery,Yes
100008,Steven,36,M,Delaware,Head Ache,CT Scan,Malignant,CNS,1,Surgery,Yes
100009,RoMert,54,M,Florida,Blood Couch,Pet Scan,Malignant,Throacic,1,Surgery,Yes

scala> import spark.implicits._
import spark.implicits._

scala> val df = spark.read.format("csv").load("hdfs://localhost:9000/user/CancerData10.csv")
df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 10 more fields]

scala> df.show(3)
+------+-----+---+---+-------+-----------------+---------+---------+--------+-----+---------+--------+
|   _c0|  _c1|_c2|_c3|    _c4|              _c5|      _c6|      _c7|     _c8|  _c9|     _c10|    _c11|
+------+-----+---+---+-------+-----------------+---------+---------+--------+-----+---------+--------+
|    ID| Name|Age|Sex|  State|         Symptoms|Diagnosis|   Cancer|CancerSc|Stage|Treatment|Survival|
|100001|David| 45|  M|Alamama|Red Itchy Patches|   Biopsy|Malignant|    Skin|    1|Resection|     Yes|
|100002| John| 56|  M| Alaska|      Blood Couch| Pet Scan|Malignant|Throacic|    2|  Surgery|     Yes|
+------+-----+---+---+-------+-----------------+---------+---------+--------+-----+---------+--------+
only showing top 3 rows



//Header information is missing (_c0, _c1 are autogenerated)
scala> df.show()
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+
|   _c0|   _c1|_c2|_c3|       _c4|              _c5|      _c6|      _c7|     _c8|  _c9|     _c10|    _c11|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+
|    ID|  Name|Age|Sex|     State|         Symptoms|Diagnosis|   Cancer|CancerSc|Stage|Treatment|Survival|
|100001| David| 45|  M|   Alamama|Red Itchy Patches|   Biopsy|Malignant|    Skin|    1|Resection|     Yes|
|100002|  John| 56|  M|    Alaska|      Blood Couch| Pet Scan|Malignant|Throacic|    2|  Surgery|     Yes|
|100003|  Paul| 65|  M|   Arizona|Red Itchy Patches|   Biopsy|Malignant|    Skin|    3|Resection|      No|
|100004|  Mark| 35|  M|  Arkansas|      Blood Couch| Pet Scan|Malignant|Throacic|    3|  Surgery|      No|
|100005| James| 44|  M|California|Red Itchy Patches|   Biopsy|Malignant|    Skin|    2|Resection|     Yes|
|100006|Andrew| 53|  M|  Colarado|Red Itchy Patches|   Biopsy|Malignant|    Skin|    2|Resection|     Yes|
|100007| Scott| 68|  M|Conneticut|      Blood Couch| Pet Scan|Malignant|Throacic|    1|  Surgery|     Yes|
|100008|Steven| 36|  M|  Delaware|        Head Ache|  CT Scan|Malignant|     CNS|    1|  Surgery|     Yes|
|100009|RoMert| 54|  M|   Florida|      Blood Couch| Pet Scan|Malignant|Throacic|    1|  Surgery|     Yes|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+

// With Header information
scala> val df = spark.read.format("csv").option("header","true").load("hdfs://localhost:9000/user/CancerData10.csv")
df: org.apache.spark.sql.DataFrame = [ID: string, Name: string ... 10 more fields]

scala> df.show(3)
+------+-----+---+---+-------+-----------------+---------+---------+--------+-----+---------+--------+
|    ID| Name|Age|Sex|  State|         Symptoms|Diagnosis|   Cancer|CancerSc|Stage|Treatment|Survival|
+------+-----+---+---+-------+-----------------+---------+---------+--------+-----+---------+--------+
|100001|David| 45|  M|Alamama|Red Itchy Patches|   Biopsy|Malignant|    Skin|    1|Resection|     Yes|
|100002| John| 56|  M| Alaska|      Blood Couch| Pet Scan|Malignant|Throacic|    2|  Surgery|     Yes|
|100003| Paul| 65|  M|Arizona|Red Itchy Patches|   Biopsy|Malignant|    Skin|    3|Resection|      No|
+------+-----+---+---+-------+-----------------+---------+---------+--------+-----+---------+--------+
only showing top 3 rows

// Schema is wrong. Because all the data types are strings here. thats not good.
scala> df.printSchema
root
 |-- ID: string (nullable = true)  // this is wrong
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true) // this is wrong
 |-- Sex: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Symptoms: string (nullable = true)
 |-- Diagnosis: string (nullable = true)
 |-- Cancer: string (nullable = true)
 |-- CancerSc: string (nullable = true)
 |-- Stage: string (nullable = true)
 |-- Treatment: string (nullable = true)
 |-- Survival: string (nullable = true)

// Added inferSchema - so each column will be in proper data type based on the data given in the file
scala> val df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("hdfs://localhost:9000/user/CancerData10.csv")
df: org.apache.spark.sql.DataFrame = [ID: int, Name: string ... 10 more fields]

scala> df.printSchema
root
 |-- ID: integer (nullable = true) // perfect
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true) // perfect
 |-- Sex: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Symptoms: string (nullable = true)
 |-- Diagnosis: string (nullable = true)
 |-- Cancer: string (nullable = true)
 |-- CancerSc: string (nullable = true)
 |-- Stage: integer (nullable = true)
 |-- Treatment: string (nullable = true)
 |-- Survival: string (nullable = true)

// to find number of rows in a dataframe
scala> df.count
res5: Long = 9

// Find the unique values of a particular column
scala> df.select("Stage").distinct.show
+-----+                                                                       
|Stage|
+-----+
|    1|
|    3|
|    2|
+-----+

scala> df.where("Age > 50").show
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+
|    ID|  Name|Age|Sex|     State|         Symptoms|Diagnosis|   Cancer|CancerSc|Stage|Treatment|Survival|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+
|100002|  John| 56|  M|    Alaska|      Blood Couch| Pet Scan|Malignant|Throacic|    2|  Surgery|     Yes|
|100003|  Paul| 65|  M|   Arizona|Red Itchy Patches|   Biopsy|Malignant|    Skin|    3|Resection|      No|
|100006|Andrew| 53|  M|  Colarado|Red Itchy Patches|   Biopsy|Malignant|    Skin|    2|Resection|     Yes|
|100007| Scott| 68|  M|Conneticut|      Blood Couch| Pet Scan|Malignant|Throacic|    1|  Surgery|     Yes|
|100009|RoMert| 54|  M|   Florida|      Blood Couch| Pet Scan|Malignant|Throacic|    1|  Surgery|     Yes|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+


scala> df.where("Age > 50 and Age <= 55").show
+------+------+---+---+--------+-----------------+---------+---------+--------+-----+---------+--------+
|    ID|  Name|Age|Sex|   State|         Symptoms|Diagnosis|   Cancer|CancerSc|Stage|Treatment|Survival|
+------+------+---+---+--------+-----------------+---------+---------+--------+-----+---------+--------+
|100006|Andrew| 53|  M|Colarado|Red Itchy Patches|   Biopsy|Malignant|    Skin|    2|Resection|     Yes|
|100009|RoMert| 54|  M| Florida|      Blood Couch| Pet Scan|Malignant|Throacic|    1|  Surgery|     Yes|
+------+------+---+---+--------+-----------------+---------+---------+--------+-----+---------+--------+


scala> df.filter("Age > 50 and Age <= 55").show
+------+------+---+---+--------+-----------------+---------+---------+--------+-----+---------+--------+
|    ID|  Name|Age|Sex|   State|         Symptoms|Diagnosis|   Cancer|CancerSc|Stage|Treatment|Survival|
+------+------+---+---+--------+-----------------+---------+---------+--------+-----+---------+--------+
|100006|Andrew| 53|  M|Colarado|Red Itchy Patches|   Biopsy|Malignant|    Skin|    2|Resection|     Yes|
|100009|RoMert| 54|  M| Florida|      Blood Couch| Pet Scan|Malignant|Throacic|    1|  Surgery|     Yes|
+------+------+---+---+--------+-----------------+---------+---------+--------+-----+---------+--------+

// get unique values of Survival
scala> df.select("Survival").distinct.show
+--------+                                                                   
|Survival|
+--------+
|      No|
|     Yes|
+--------+

// Replace 'Yes' with 1 and 'No' with 0
scala> df.withColumn("Survival",when(col("Survival")==="Yes",1).otherwise (0)).show
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+
|    ID|  Name|Age|Sex|     State|         Symptoms|Diagnosis|   Cancer|CancerSc|Stage|Treatment|Survival|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+
|100001| David| 45|  M|   Alamama|Red Itchy Patches|   Biopsy|Malignant|    Skin|    1|Resection|       1|
|100002|  John| 56|  M|    Alaska|      Blood Couch| Pet Scan|Malignant|Throacic|    2|  Surgery|       1|
|100003|  Paul| 65|  M|   Arizona|Red Itchy Patches|   Biopsy|Malignant|    Skin|    3|Resection|       0|
|100004|  Mark| 35|  M|  Arkansas|      Blood Couch| Pet Scan|Malignant|Throacic|    3|  Surgery|       0|
|100005| James| 44|  M|California|Red Itchy Patches|   Biopsy|Malignant|    Skin|    2|Resection|       1|
|100006|Andrew| 53|  M|  Colarado|Red Itchy Patches|   Biopsy|Malignant|    Skin|    2|Resection|       1|
|100007| Scott| 68|  M|Conneticut|      Blood Couch| Pet Scan|Malignant|Throacic|    1|  Surgery|       1|
|100008|Steven| 36|  M|  Delaware|        Head Ache|  CT Scan|Malignant|     CNS|    1|  Surgery|       1|
|100009|RoMert| 54|  M|   Florida|      Blood Couch| Pet Scan|Malignant|Throacic|    1|  Surgery|       1|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+

scala> df.withColumn("AgeGroup",when(col("Age") >= 30 and col("Age") <= 40, "Level-A").when (col("Age") > 40 and col("Age") <=  50, "Level-B").otherwise("Level-C")).show
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+
|    ID|  Name|Age|Sex|     State|         Symptoms|Diagnosis|   Cancer|CancerSc|Stage|Treatment|Survival|AgeGroup|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+
|100001| David| 45|  M|   Alamama|Red Itchy Patches|   Biopsy|Malignant|    Skin|    1|Resection|     Yes| Level-B|
|100002|  John| 56|  M|    Alaska|      Blood Couch| Pet Scan|Malignant|Throacic|    2|  Surgery|     Yes| Level-C|
|100003|  Paul| 65|  M|   Arizona|Red Itchy Patches|   Biopsy|Malignant|    Skin|    3|Resection|      No| Level-C|
|100004|  Mark| 35|  M|  Arkansas|      Blood Couch| Pet Scan|Malignant|Throacic|    3|  Surgery|      No| Level-A|
|100005| James| 44|  M|California|Red Itchy Patches|   Biopsy|Malignant|    Skin|    2|Resection|     Yes| Level-B|
|100006|Andrew| 53|  M|  Colarado|Red Itchy Patches|   Biopsy|Malignant|    Skin|    2|Resection|     Yes| Level-C|
|100007| Scott| 68|  M|Conneticut|      Blood Couch| Pet Scan|Malignant|Throacic|    1|  Surgery|     Yes| Level-C|
|100008|Steven| 36|  M|  Delaware|        Head Ache|  CT Scan|Malignant|     CNS|    1|  Surgery|     Yes| Level-A|
|100009|RoMert| 54|  M|   Florida|      Blood Couch| Pet Scan|Malignant|Throacic|    1|  Surgery|     Yes| Level-C|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+


scala> df.withColumn("AgeGroup",when(col("Age") >= 30 and col("Age") <= 40, "Level-A").when (col("Age") > 40 and col("Age") <=  50, "Level-B").otherwise("Level-C")).withColumn("Survival",when(col("Survival")==="Yes",1).otherwise (0)).show
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+
|    ID|  Name|Age|Sex|     State|         Symptoms|Diagnosis|   Cancer|CancerSc|Stage|Treatment|Survival|AgeGroup|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+
|100001| David| 45|  M|   Alamama|Red Itchy Patches|   Biopsy|Malignant|    Skin|    1|Resection|       1| Level-B|
|100002|  John| 56|  M|    Alaska|      Blood Couch| Pet Scan|Malignant|Throacic|    2|  Surgery|       1| Level-C|
|100003|  Paul| 65|  M|   Arizona|Red Itchy Patches|   Biopsy|Malignant|    Skin|    3|Resection|       0| Level-C|
|100004|  Mark| 35|  M|  Arkansas|      Blood Couch| Pet Scan|Malignant|Throacic|    3|  Surgery|       0| Level-A|
|100005| James| 44|  M|California|Red Itchy Patches|   Biopsy|Malignant|    Skin|    2|Resection|       1| Level-B|
|100006|Andrew| 53|  M|  Colarado|Red Itchy Patches|   Biopsy|Malignant|    Skin|    2|Resection|       1| Level-C|
|100007| Scott| 68|  M|Conneticut|      Blood Couch| Pet Scan|Malignant|Throacic|    1|  Surgery|       1| Level-C|
|100008|Steven| 36|  M|  Delaware|        Head Ache|  CT Scan|Malignant|     CNS|    1|  Surgery|       1| Level-A|
|100009|RoMert| 54|  M|   Florida|      Blood Couch| Pet Scan|Malignant|Throacic|    1|  Surgery|       1| Level-C|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+

scala> val dfTemp = df.withColumn("AgeGroup",when(col("Age") >= 30 and col("Age") <= 40, "Level-A").when (col("Age") > 40 and col("Age") <=  50, "Level-B").otherwise("Level-C"))
dfTemp: org.apache.spark.sql.DataFrame = [ID: int, Name: string ... 11 more fields]

scala> val dfResult = dfTemp.withColumn("Survival",when(col("Survival")==="Yes",1).otherwise (0))
dfResult: org.apache.spark.sql.DataFrame = [ID: int, Name: string ... 11 more fields]

scala> dfResult.show
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+
|    ID|  Name|Age|Sex|     State|         Symptoms|Diagnosis|   Cancer|CancerSc|Stage|Treatment|Survival|AgeGroup|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+
|100001| David| 45|  M|   Alamama|Red Itchy Patches|   Biopsy|Malignant|    Skin|    1|Resection|       1| Level-B|
|100002|  John| 56|  M|    Alaska|      Blood Couch| Pet Scan|Malignant|Throacic|    2|  Surgery|       1| Level-C|
|100003|  Paul| 65|  M|   Arizona|Red Itchy Patches|   Biopsy|Malignant|    Skin|    3|Resection|       0| Level-C|
|100004|  Mark| 35|  M|  Arkansas|      Blood Couch| Pet Scan|Malignant|Throacic|    3|  Surgery|       0| Level-A|
|100005| James| 44|  M|California|Red Itchy Patches|   Biopsy|Malignant|    Skin|    2|Resection|       1| Level-B|
|100006|Andrew| 53|  M|  Colarado|Red Itchy Patches|   Biopsy|Malignant|    Skin|    2|Resection|       1| Level-C|
|100007| Scott| 68|  M|Conneticut|      Blood Couch| Pet Scan|Malignant|Throacic|    1|  Surgery|       1| Level-C|
|100008|Steven| 36|  M|  Delaware|        Head Ache|  CT Scan|Malignant|     CNS|    1|  Surgery|       1| Level-A|
|100009|RoMert| 54|  M|   Florida|      Blood Couch| Pet Scan|Malignant|Throacic|    1|  Surgery|       1| Level-C|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+

// Renaming a column
scala> val dfGender = dfResult.withColumnRenamed("Sex","Gender")
dfGender: org.apache.spark.sql.DataFrame = [ID: int, Name: string ... 11 more fields]

scala> dfGender.show(3)
+------+-----+---+------+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+
|    ID| Name|Age|Gender|  State|         Symptoms|Diagnosis|   Cancer|CancerSc|Stage|Treatment|Survival|AgeGroup|
+------+-----+---+------+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+
|100001|David| 45|     M|Alamama|Red Itchy Patches|   Biopsy|Malignant|    Skin|    1|Resection|       1| Level-B|
|100002| John| 56|     M| Alaska|      Blood Couch| Pet Scan|Malignant|Throacic|    2|  Surgery|       1| Level-C|
|100003| Paul| 65|     M|Arizona|Red Itchy Patches|   Biopsy|Malignant|    Skin|    3|Resection|       0| Level-C|
+------+-----+---+------+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+
only showing top 3 rows

// Rename succeeded
scala> dfGender.printSchema
root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true) // Column named 'sex' is renamed as 'Gender'
 |-- State: string (nullable = true)
 |-- Symptoms: string (nullable = true)
 |-- Diagnosis: string (nullable = true)
 |-- Cancer: string (nullable = true)
 |-- CancerSc: string (nullable = true)
 |-- Stage: integer (nullable = true)
 |-- Treatment: string (nullable = true)
 |-- Survival: integer (nullable = false)
 |-- AgeGroup: string (nullable = false)


// Change the datatype (Integer to Float) of Age column
scala> val dfAgeFloat = dfResult.withColumn("Age",col("Age").cast("Float"))
dfAgeFloat: org.apache.spark.sql.DataFrame = [ID: int, Name: string ... 11 more fields]

scala> dfAgeFloat.printSchema
root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: float (nullable = true)  // Previously it was int column
 |-- Sex: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Symptoms: string (nullable = true)
 |-- Diagnosis: string (nullable = true)
 |-- Cancer: string (nullable = true)
 |-- CancerSc: string (nullable = true)
 |-- Stage: integer (nullable = true)
 |-- Treatment: string (nullable = true)
 |-- Survival: integer (nullable = false)
 |-- AgeGroup: string (nullable = false)


scala> dfAgeFloat.show(3)
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+
|    ID| Name| Age|Sex|  State|         Symptoms|Diagnosis|   Cancer|CancerSc|Stage|Treatment|Survival|AgeGroup|
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+
|100001|David|45.0|  M|Alamama|Red Itchy Patches|   Biopsy|Malignant|    Skin|    1|Resection|       1| Level-B|
|100002| John|56.0|  M| Alaska|      Blood Couch| Pet Scan|Malignant|Throacic|    2|  Surgery|       1| Level-C|
|100003| Paul|65.0|  M|Arizona|Red Itchy Patches|   Biopsy|Malignant|    Skin|    3|Resection|       0| Level-C|
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+
only showing top 3 rows

// Create a new column as 'NewAge' which hold the value of Age + 10
scala> val dfAgeIncrement10 = dfAgeFloat.withColumn("NewAge",col("Age")+10)
dfAgeIncrement10: org.apache.spark.sql.DataFrame = [ID: int, Name: string ... 12 more fields]

scala> dfAgeIncrement10.show(3)
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+------+
|    ID| Name| Age|Sex|  State|         Symptoms|Diagnosis|   Cancer|CancerSc|Stage|Treatment|Survival|AgeGroup|NewAge|
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+------+
|100001|David|45.0|  M|Alamama|Red Itchy Patches|   Biopsy|Malignant|    Skin|    1|Resection|       1| Level-B|  55.0|
|100002| John|56.0|  M| Alaska|      Blood Couch| Pet Scan|Malignant|Throacic|    2|  Surgery|       1| Level-C|  66.0|
|100003| Paul|65.0|  M|Arizona|Red Itchy Patches|   Biopsy|Malignant|    Skin|    3|Resection|       0| Level-C|  75.0|
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+------+
only showing top 3 rows


// Add a new column with Fixed value "Cancer Patients"
scala> val dfAddColumnWithFixedValue = dfAgeIncrement10.withColumn("Type",lit("Cancer Patients"))  // lit : literal, "Type" is column name here
dfAddColumnWithFixedValue: org.apache.spark.sql.DataFrame = [ID: int, Name: string ... 13 more fields]

scala> dfAddColumnWithFixedValue.show(3)
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+------+---------------+
|    ID| Name| Age|Sex|  State|         Symptoms|Diagnosis|   Cancer|CancerSc|Stage|Treatment|Survival|AgeGroup|NewAge|           Type|
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+------+---------------+
|100001|David|45.0|  M|Alamama|Red Itchy Patches|   Biopsy|Malignant|    Skin|    1|Resection|       1| Level-B|  55.0|Cancer Patients|
|100002| John|56.0|  M| Alaska|      Blood Couch| Pet Scan|Malignant|Throacic|    2|  Surgery|       1| Level-C|  66.0|Cancer Patients|
|100003| Paul|65.0|  M|Arizona|Red Itchy Patches|   Biopsy|Malignant|    Skin|    3|Resection|       0| Level-C|  75.0|Cancer Patients|
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+------+---------------+
only showing top 3 rows

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...