Input file
CancerData10.csv:
-------------------
ID,Name,Age,Sex,State,Symptoms,Diagnosis,Cancer,CancerSc,Stage,Treatment,Survival
100001,David,45,M,Alamama,Red Itchy Patches,Biopsy,Malignant,Skin,1,Resection,Yes
100002,John,56,M,Alaska,Blood Couch,Pet Scan,Malignant,Throacic,2,Surgery,Yes
100003,Paul,65,M,Arizona,Red Itchy Patches,Biopsy,Malignant,Skin,3,Resection,No
100004,Mark,35,M,Arkansas,Blood Couch,Pet Scan,Malignant,Throacic,3,Surgery,No
100005,James,44,M,California,Red Itchy Patches,Biopsy,Malignant,Skin,2,Resection,Yes
100006,Andrew,53,M,Colarado,Red Itchy Patches,Biopsy,Malignant,Skin,2,Resection,Yes
100007,Scott,68,M,Conneticut,Blood Couch,Pet Scan,Malignant,Throacic,1,Surgery,Yes
100008,Steven,36,M,Delaware,Head Ache,CT Scan,Malignant,CNS,1,Surgery,Yes
100009,RoMert,54,M,Florida,Blood Couch,Pet Scan,Malignant,Throacic,1,Surgery,Yes
scala> import spark.implicits._
import spark.implicits._
scala> val df = spark.read.format("csv").load("hdfs://localhost:9000/user/CancerData10.csv")
df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 10 more fields]
scala> df.show(3)
+------+-----+---+---+-------+-----------------+---------+---------+--------+-----+---------+--------+
| _c0| _c1|_c2|_c3| _c4| _c5| _c6| _c7| _c8| _c9| _c10| _c11|
+------+-----+---+---+-------+-----------------+---------+---------+--------+-----+---------+--------+
| ID| Name|Age|Sex| State| Symptoms|Diagnosis| Cancer|CancerSc|Stage|Treatment|Survival|
|100001|David| 45| M|Alamama|Red Itchy Patches| Biopsy|Malignant| Skin| 1|Resection| Yes|
|100002| John| 56| M| Alaska| Blood Couch| Pet Scan|Malignant|Throacic| 2| Surgery| Yes|
+------+-----+---+---+-------+-----------------+---------+---------+--------+-----+---------+--------+
only showing top 3 rows
//Header information is missing (_c0, _c1 are autogenerated)
scala> df.show()
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+
| _c0| _c1|_c2|_c3| _c4| _c5| _c6| _c7| _c8| _c9| _c10| _c11|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+
| ID| Name|Age|Sex| State| Symptoms|Diagnosis| Cancer|CancerSc|Stage|Treatment|Survival|
|100001| David| 45| M| Alamama|Red Itchy Patches| Biopsy|Malignant| Skin| 1|Resection| Yes|
|100002| John| 56| M| Alaska| Blood Couch| Pet Scan|Malignant|Throacic| 2| Surgery| Yes|
|100003| Paul| 65| M| Arizona|Red Itchy Patches| Biopsy|Malignant| Skin| 3|Resection| No|
|100004| Mark| 35| M| Arkansas| Blood Couch| Pet Scan|Malignant|Throacic| 3| Surgery| No|
|100005| James| 44| M|California|Red Itchy Patches| Biopsy|Malignant| Skin| 2|Resection| Yes|
|100006|Andrew| 53| M| Colarado|Red Itchy Patches| Biopsy|Malignant| Skin| 2|Resection| Yes|
|100007| Scott| 68| M|Conneticut| Blood Couch| Pet Scan|Malignant|Throacic| 1| Surgery| Yes|
|100008|Steven| 36| M| Delaware| Head Ache| CT Scan|Malignant| CNS| 1| Surgery| Yes|
|100009|RoMert| 54| M| Florida| Blood Couch| Pet Scan|Malignant|Throacic| 1| Surgery| Yes|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+
// With Header information
scala> val df = spark.read.format("csv").option("header","true").load("hdfs://localhost:9000/user/CancerData10.csv")
df: org.apache.spark.sql.DataFrame = [ID: string, Name: string ... 10 more fields]
scala> df.show(3)
+------+-----+---+---+-------+-----------------+---------+---------+--------+-----+---------+--------+
| ID| Name|Age|Sex| State| Symptoms|Diagnosis| Cancer|CancerSc|Stage|Treatment|Survival|
+------+-----+---+---+-------+-----------------+---------+---------+--------+-----+---------+--------+
|100001|David| 45| M|Alamama|Red Itchy Patches| Biopsy|Malignant| Skin| 1|Resection| Yes|
|100002| John| 56| M| Alaska| Blood Couch| Pet Scan|Malignant|Throacic| 2| Surgery| Yes|
|100003| Paul| 65| M|Arizona|Red Itchy Patches| Biopsy|Malignant| Skin| 3|Resection| No|
+------+-----+---+---+-------+-----------------+---------+---------+--------+-----+---------+--------+
only showing top 3 rows
// Schema is wrong. Because all the data types are strings here. thats not good.
scala> df.printSchema
root
|-- ID: string (nullable = true) // this is wrong
|-- Name: string (nullable = true)
|-- Age: string (nullable = true) // this is wrong
|-- Sex: string (nullable = true)
|-- State: string (nullable = true)
|-- Symptoms: string (nullable = true)
|-- Diagnosis: string (nullable = true)
|-- Cancer: string (nullable = true)
|-- CancerSc: string (nullable = true)
|-- Stage: string (nullable = true)
|-- Treatment: string (nullable = true)
|-- Survival: string (nullable = true)
// Added inferSchema - so each column will be in proper data type based on the data given in the file
scala> val df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("hdfs://localhost:9000/user/CancerData10.csv")
df: org.apache.spark.sql.DataFrame = [ID: int, Name: string ... 10 more fields]
scala> df.printSchema
root
|-- ID: integer (nullable = true) // perfect
|-- Name: string (nullable = true)
|-- Age: integer (nullable = true) // perfect
|-- Sex: string (nullable = true)
|-- State: string (nullable = true)
|-- Symptoms: string (nullable = true)
|-- Diagnosis: string (nullable = true)
|-- Cancer: string (nullable = true)
|-- CancerSc: string (nullable = true)
|-- Stage: integer (nullable = true)
|-- Treatment: string (nullable = true)
|-- Survival: string (nullable = true)
// to find number of rows in a dataframe
scala> df.count
res5: Long = 9
// Find the unique values of a particular column
scala> df.select("Stage").distinct.show
+-----+
|Stage|
+-----+
| 1|
| 3|
| 2|
+-----+
scala> df.where("Age > 50").show
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+
| ID| Name|Age|Sex| State| Symptoms|Diagnosis| Cancer|CancerSc|Stage|Treatment|Survival|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+
|100002| John| 56| M| Alaska| Blood Couch| Pet Scan|Malignant|Throacic| 2| Surgery| Yes|
|100003| Paul| 65| M| Arizona|Red Itchy Patches| Biopsy|Malignant| Skin| 3|Resection| No|
|100006|Andrew| 53| M| Colarado|Red Itchy Patches| Biopsy|Malignant| Skin| 2|Resection| Yes|
|100007| Scott| 68| M|Conneticut| Blood Couch| Pet Scan|Malignant|Throacic| 1| Surgery| Yes|
|100009|RoMert| 54| M| Florida| Blood Couch| Pet Scan|Malignant|Throacic| 1| Surgery| Yes|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+
scala> df.where("Age > 50 and Age <= 55").show
+------+------+---+---+--------+-----------------+---------+---------+--------+-----+---------+--------+
| ID| Name|Age|Sex| State| Symptoms|Diagnosis| Cancer|CancerSc|Stage|Treatment|Survival|
+------+------+---+---+--------+-----------------+---------+---------+--------+-----+---------+--------+
|100006|Andrew| 53| M|Colarado|Red Itchy Patches| Biopsy|Malignant| Skin| 2|Resection| Yes|
|100009|RoMert| 54| M| Florida| Blood Couch| Pet Scan|Malignant|Throacic| 1| Surgery| Yes|
+------+------+---+---+--------+-----------------+---------+---------+--------+-----+---------+--------+
scala> df.filter("Age > 50 and Age <= 55").show
+------+------+---+---+--------+-----------------+---------+---------+--------+-----+---------+--------+
| ID| Name|Age|Sex| State| Symptoms|Diagnosis| Cancer|CancerSc|Stage|Treatment|Survival|
+------+------+---+---+--------+-----------------+---------+---------+--------+-----+---------+--------+
|100006|Andrew| 53| M|Colarado|Red Itchy Patches| Biopsy|Malignant| Skin| 2|Resection| Yes|
|100009|RoMert| 54| M| Florida| Blood Couch| Pet Scan|Malignant|Throacic| 1| Surgery| Yes|
+------+------+---+---+--------+-----------------+---------+---------+--------+-----+---------+--------+
// get unique values of Survival
scala> df.select("Survival").distinct.show
+--------+
|Survival|
+--------+
| No|
| Yes|
+--------+
// Replace 'Yes' with 1 and 'No' with 0
scala> df.withColumn("Survival",when(col("Survival")==="Yes",1).otherwise (0)).show
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+
| ID| Name|Age|Sex| State| Symptoms|Diagnosis| Cancer|CancerSc|Stage|Treatment|Survival|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+
|100001| David| 45| M| Alamama|Red Itchy Patches| Biopsy|Malignant| Skin| 1|Resection| 1|
|100002| John| 56| M| Alaska| Blood Couch| Pet Scan|Malignant|Throacic| 2| Surgery| 1|
|100003| Paul| 65| M| Arizona|Red Itchy Patches| Biopsy|Malignant| Skin| 3|Resection| 0|
|100004| Mark| 35| M| Arkansas| Blood Couch| Pet Scan|Malignant|Throacic| 3| Surgery| 0|
|100005| James| 44| M|California|Red Itchy Patches| Biopsy|Malignant| Skin| 2|Resection| 1|
|100006|Andrew| 53| M| Colarado|Red Itchy Patches| Biopsy|Malignant| Skin| 2|Resection| 1|
|100007| Scott| 68| M|Conneticut| Blood Couch| Pet Scan|Malignant|Throacic| 1| Surgery| 1|
|100008|Steven| 36| M| Delaware| Head Ache| CT Scan|Malignant| CNS| 1| Surgery| 1|
|100009|RoMert| 54| M| Florida| Blood Couch| Pet Scan|Malignant|Throacic| 1| Surgery| 1|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+
scala> df.withColumn("AgeGroup",when(col("Age") >= 30 and col("Age") <= 40, "Level-A").when (col("Age") > 40 and col("Age") <= 50, "Level-B").otherwise("Level-C")).show
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+
| ID| Name|Age|Sex| State| Symptoms|Diagnosis| Cancer|CancerSc|Stage|Treatment|Survival|AgeGroup|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+
|100001| David| 45| M| Alamama|Red Itchy Patches| Biopsy|Malignant| Skin| 1|Resection| Yes| Level-B|
|100002| John| 56| M| Alaska| Blood Couch| Pet Scan|Malignant|Throacic| 2| Surgery| Yes| Level-C|
|100003| Paul| 65| M| Arizona|Red Itchy Patches| Biopsy|Malignant| Skin| 3|Resection| No| Level-C|
|100004| Mark| 35| M| Arkansas| Blood Couch| Pet Scan|Malignant|Throacic| 3| Surgery| No| Level-A|
|100005| James| 44| M|California|Red Itchy Patches| Biopsy|Malignant| Skin| 2|Resection| Yes| Level-B|
|100006|Andrew| 53| M| Colarado|Red Itchy Patches| Biopsy|Malignant| Skin| 2|Resection| Yes| Level-C|
|100007| Scott| 68| M|Conneticut| Blood Couch| Pet Scan|Malignant|Throacic| 1| Surgery| Yes| Level-C|
|100008|Steven| 36| M| Delaware| Head Ache| CT Scan|Malignant| CNS| 1| Surgery| Yes| Level-A|
|100009|RoMert| 54| M| Florida| Blood Couch| Pet Scan|Malignant|Throacic| 1| Surgery| Yes| Level-C|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+
scala> df.withColumn("AgeGroup",when(col("Age") >= 30 and col("Age") <= 40, "Level-A").when (col("Age") > 40 and col("Age") <= 50, "Level-B").otherwise("Level-C")).withColumn("Survival",when(col("Survival")==="Yes",1).otherwise (0)).show
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+
| ID| Name|Age|Sex| State| Symptoms|Diagnosis| Cancer|CancerSc|Stage|Treatment|Survival|AgeGroup|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+
|100001| David| 45| M| Alamama|Red Itchy Patches| Biopsy|Malignant| Skin| 1|Resection| 1| Level-B|
|100002| John| 56| M| Alaska| Blood Couch| Pet Scan|Malignant|Throacic| 2| Surgery| 1| Level-C|
|100003| Paul| 65| M| Arizona|Red Itchy Patches| Biopsy|Malignant| Skin| 3|Resection| 0| Level-C|
|100004| Mark| 35| M| Arkansas| Blood Couch| Pet Scan|Malignant|Throacic| 3| Surgery| 0| Level-A|
|100005| James| 44| M|California|Red Itchy Patches| Biopsy|Malignant| Skin| 2|Resection| 1| Level-B|
|100006|Andrew| 53| M| Colarado|Red Itchy Patches| Biopsy|Malignant| Skin| 2|Resection| 1| Level-C|
|100007| Scott| 68| M|Conneticut| Blood Couch| Pet Scan|Malignant|Throacic| 1| Surgery| 1| Level-C|
|100008|Steven| 36| M| Delaware| Head Ache| CT Scan|Malignant| CNS| 1| Surgery| 1| Level-A|
|100009|RoMert| 54| M| Florida| Blood Couch| Pet Scan|Malignant|Throacic| 1| Surgery| 1| Level-C|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+
scala> val dfTemp = df.withColumn("AgeGroup",when(col("Age") >= 30 and col("Age") <= 40, "Level-A").when (col("Age") > 40 and col("Age") <= 50, "Level-B").otherwise("Level-C"))
dfTemp: org.apache.spark.sql.DataFrame = [ID: int, Name: string ... 11 more fields]
scala> val dfResult = dfTemp.withColumn("Survival",when(col("Survival")==="Yes",1).otherwise (0))
dfResult: org.apache.spark.sql.DataFrame = [ID: int, Name: string ... 11 more fields]
scala> dfResult.show
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+
| ID| Name|Age|Sex| State| Symptoms|Diagnosis| Cancer|CancerSc|Stage|Treatment|Survival|AgeGroup|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+
|100001| David| 45| M| Alamama|Red Itchy Patches| Biopsy|Malignant| Skin| 1|Resection| 1| Level-B|
|100002| John| 56| M| Alaska| Blood Couch| Pet Scan|Malignant|Throacic| 2| Surgery| 1| Level-C|
|100003| Paul| 65| M| Arizona|Red Itchy Patches| Biopsy|Malignant| Skin| 3|Resection| 0| Level-C|
|100004| Mark| 35| M| Arkansas| Blood Couch| Pet Scan|Malignant|Throacic| 3| Surgery| 0| Level-A|
|100005| James| 44| M|California|Red Itchy Patches| Biopsy|Malignant| Skin| 2|Resection| 1| Level-B|
|100006|Andrew| 53| M| Colarado|Red Itchy Patches| Biopsy|Malignant| Skin| 2|Resection| 1| Level-C|
|100007| Scott| 68| M|Conneticut| Blood Couch| Pet Scan|Malignant|Throacic| 1| Surgery| 1| Level-C|
|100008|Steven| 36| M| Delaware| Head Ache| CT Scan|Malignant| CNS| 1| Surgery| 1| Level-A|
|100009|RoMert| 54| M| Florida| Blood Couch| Pet Scan|Malignant|Throacic| 1| Surgery| 1| Level-C|
+------+------+---+---+----------+-----------------+---------+---------+--------+-----+---------+--------+--------+
// Renaming a column
scala> val dfGender = dfResult.withColumnRenamed("Sex","Gender")
dfGender: org.apache.spark.sql.DataFrame = [ID: int, Name: string ... 11 more fields]
scala> dfGender.show(3)
+------+-----+---+------+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+
| ID| Name|Age|Gender| State| Symptoms|Diagnosis| Cancer|CancerSc|Stage|Treatment|Survival|AgeGroup|
+------+-----+---+------+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+
|100001|David| 45| M|Alamama|Red Itchy Patches| Biopsy|Malignant| Skin| 1|Resection| 1| Level-B|
|100002| John| 56| M| Alaska| Blood Couch| Pet Scan|Malignant|Throacic| 2| Surgery| 1| Level-C|
|100003| Paul| 65| M|Arizona|Red Itchy Patches| Biopsy|Malignant| Skin| 3|Resection| 0| Level-C|
+------+-----+---+------+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+
only showing top 3 rows
// Rename succeeded
scala> dfGender.printSchema
root
|-- ID: integer (nullable = true)
|-- Name: string (nullable = true)
|-- Age: integer (nullable = true)
|-- Gender: string (nullable = true) // Column named 'sex' is renamed as 'Gender'
|-- State: string (nullable = true)
|-- Symptoms: string (nullable = true)
|-- Diagnosis: string (nullable = true)
|-- Cancer: string (nullable = true)
|-- CancerSc: string (nullable = true)
|-- Stage: integer (nullable = true)
|-- Treatment: string (nullable = true)
|-- Survival: integer (nullable = false)
|-- AgeGroup: string (nullable = false)
// Change the datatype (Integer to Float) of Age column
scala> val dfAgeFloat = dfResult.withColumn("Age",col("Age").cast("Float"))
dfAgeFloat: org.apache.spark.sql.DataFrame = [ID: int, Name: string ... 11 more fields]
scala> dfAgeFloat.printSchema
root
|-- ID: integer (nullable = true)
|-- Name: string (nullable = true)
|-- Age: float (nullable = true) // Previously it was int column
|-- Sex: string (nullable = true)
|-- State: string (nullable = true)
|-- Symptoms: string (nullable = true)
|-- Diagnosis: string (nullable = true)
|-- Cancer: string (nullable = true)
|-- CancerSc: string (nullable = true)
|-- Stage: integer (nullable = true)
|-- Treatment: string (nullable = true)
|-- Survival: integer (nullable = false)
|-- AgeGroup: string (nullable = false)
scala> dfAgeFloat.show(3)
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+
| ID| Name| Age|Sex| State| Symptoms|Diagnosis| Cancer|CancerSc|Stage|Treatment|Survival|AgeGroup|
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+
|100001|David|45.0| M|Alamama|Red Itchy Patches| Biopsy|Malignant| Skin| 1|Resection| 1| Level-B|
|100002| John|56.0| M| Alaska| Blood Couch| Pet Scan|Malignant|Throacic| 2| Surgery| 1| Level-C|
|100003| Paul|65.0| M|Arizona|Red Itchy Patches| Biopsy|Malignant| Skin| 3|Resection| 0| Level-C|
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+
only showing top 3 rows
// Create a new column as 'NewAge' which hold the value of Age + 10
scala> val dfAgeIncrement10 = dfAgeFloat.withColumn("NewAge",col("Age")+10)
dfAgeIncrement10: org.apache.spark.sql.DataFrame = [ID: int, Name: string ... 12 more fields]
scala> dfAgeIncrement10.show(3)
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+------+
| ID| Name| Age|Sex| State| Symptoms|Diagnosis| Cancer|CancerSc|Stage|Treatment|Survival|AgeGroup|NewAge|
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+------+
|100001|David|45.0| M|Alamama|Red Itchy Patches| Biopsy|Malignant| Skin| 1|Resection| 1| Level-B| 55.0|
|100002| John|56.0| M| Alaska| Blood Couch| Pet Scan|Malignant|Throacic| 2| Surgery| 1| Level-C| 66.0|
|100003| Paul|65.0| M|Arizona|Red Itchy Patches| Biopsy|Malignant| Skin| 3|Resection| 0| Level-C| 75.0|
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+------+
only showing top 3 rows
// Add a new column with Fixed value "Cancer Patients"
scala> val dfAddColumnWithFixedValue = dfAgeIncrement10.withColumn("Type",lit("Cancer Patients")) // lit : literal, "Type" is column name here
dfAddColumnWithFixedValue: org.apache.spark.sql.DataFrame = [ID: int, Name: string ... 13 more fields]
scala> dfAddColumnWithFixedValue.show(3)
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+------+---------------+
| ID| Name| Age|Sex| State| Symptoms|Diagnosis| Cancer|CancerSc|Stage|Treatment|Survival|AgeGroup|NewAge| Type|
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+------+---------------+
|100001|David|45.0| M|Alamama|Red Itchy Patches| Biopsy|Malignant| Skin| 1|Resection| 1| Level-B| 55.0|Cancer Patients|
|100002| John|56.0| M| Alaska| Blood Couch| Pet Scan|Malignant|Throacic| 2| Surgery| 1| Level-C| 66.0|Cancer Patients|
|100003| Paul|65.0| M|Arizona|Red Itchy Patches| Biopsy|Malignant| Skin| 3|Resection| 0| Level-C| 75.0|Cancer Patients|
+------+-----+----+---+-------+-----------------+---------+---------+--------+-----+---------+--------+--------+------+---------------+
only showing top 3 rows
Monday, 28 January 2019
Subscribe to:
Comments (Atom)
Flume - Simple Demo
// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...
-
How to fetch Spark Application Id programmaticall while running the Spark Job? scala> spark.sparkContext.applicationId res124: String = l...
-
input data: ---------- customerID, itemID, amount 44,8602,37.19 35,5368,65.89 2,3391,40.64 47,6694,14.98 29,680,13.08 91,8900,24.59 ...
-
pattern matching is similar to switch statements in C#, Java no fall-through - at least one condition matched no breaks object PatternExa { ...