Converting RDD to DataFrame to DataSet (interoperable)
RDD -> Dataframe ====> .toDF
RDD -> DataSet ====> .toDS
Dataframe -> DataSet ====> df.as[Type]
Dataframe -> RDD ====> df.rdd
DataSet -> RDD ====> ds.rdd
DataSet -> Dataframe ====> ds.toDF
//RDDs in Spark
scala> val r1 = sc.textFile("hdfs://localhost:9000/user/movies.csv")
r1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/movies.csv MapPartitionsRDD[6] at textFile at <console>:24
scala> r1.take(5).foreach(println)
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
val r2 = r1.map (x => x.split(","))
r2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[22] at map at <console>:25
scala> val r3 = r2.map (x => {
val sno = x(0)
val moviename = x(1)
val genre = x(2)
(sno,moviename,genre)
}
)
scala> r3.take(3).foreach(println)
(movieId,title,genres)
(1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
(2,Jumanji (1995),Adventure|Children|Fantasy)
// RDD to Dataframe
scala> val dfX = r3.toDF
dfX: org.apache.spark.sql.DataFrame = [_1: string, _2: string ... 1 more field]
scala> dfX.printSchema
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
|-- _3: string (nullable = true)
scala> dfX.show(3)
+-------+----------------+--------------------+
| _1| _2| _3|
+-------+----------------+--------------------+
|movieId| title| genres|
| 1|Toy Story (1995)|Adventure|Animati...|
| 2| Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 3 rows
// Making DataSet from RDD
scala> val dsX = r3.toDS
dsX: org.apache.spark.sql.Dataset[(String, String, String)] = [_1: string, _2: string ... 1 more field]
scala> dsX.printSchema
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
|-- _3: string (nullable = true)
scala> dsX.show(3)
+-------+----------------+--------------------+
| _1| _2| _3|
+-------+----------------+--------------------+
|movieId| title| genres|
| 1|Toy Story (1995)|Adventure|Animati...|
| 2| Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 3 rows
// Making RDD from DataFrame
scala> val rddX = dfX.rdd
rddX: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[35] at rdd at <console>:25
scala> rddX.take(3).foreach(println)
[movieId,title,genres]
[1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy]
[2,Jumanji (1995),Adventure|Children|Fantasy]
//Making RDD from DataSet
scala> val rddXX = dsX.rdd
rddXX: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[37] at rdd at <console>:25
scala> rddXX.take(3).foreach(println)
(movieId,title,genres)
(1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
(2,Jumanji (1995),Adventure|Children|Fantasy)
// dataframe Json input
scala> val df = spark.read.format("json").load("hdfs://localhost:9000/user/employee.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show(3)
+---+------+
|age| name|
+---+------+
| 28| John|
| 28|Andrew|
| 22|Clarke|
+---+------+
only showing top 3 rows
scala> case class Person(name:String,age:Long)
defined class Person
// Making Dataset from DataFrame which use case class named Person
scala> val dsPerson = df.as[Person]
dsPerson: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
scala> dsPerson.show(3)
+---+------+
|age| name|
+---+------+
| 28| John|
| 28|Andrew|
| 22|Clarke|
+---+------+
only showing top 3 rows
scala> dsPerson.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
// Converting dataSet to Dataframe
scala> val tmpdf = dsPerson.toDF
tmpdf: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> tmpdf.show(3)
+---+------+
|age| name|
+---+------+
| 28| John|
| 28|Andrew|
| 22|Clarke|
+---+------+
only showing top 3 rows
scala> tmpdf.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
// dataframe textfile as input. so, every line will be a row
scala> val df1 = spark.read.format("text").load("hdfs://localhost:9000/user/employee.txt")
df1: org.apache.spark.sql.DataFrame = [value: string]
scala> df1.show(3)
+---------+
| value|
+---------+
| John,28|
|Andrew,36|
|Clarke,22|
+---------+
only showing top 3 rows
// Converting dataframe into DataSet
scala> val ds1 = df1.as[String]
ds1: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds1.show(3)
+---------+
| value|
+---------+
| John,28|
|Andrew,36|
|Clarke,22|
+---------+
scala> val ds2 = ds1.map (x => x.split(","))
ds2: org.apache.spark.sql.Dataset[Array[String]] = [value: array<string>]
// Array of Strings
scala> ds2.show(5)
+-------------+
| value|
+-------------+
| [John, 28]|
| [Andrew, 36]|
| [Clarke, 22]|
| [Kevin, 42]|
|[Richard, 51]|
+-------------+
scala> case class Person(name:String,age:Long)
defined class Person
scala> val ds3 = ds2.map( x => {
val name = x(0)
val age = x(1)
Person(age,name)
})
ds3: org.apache.spark.sql.Dataset[(String, String)] = [_1: string, _2: string]
// DataSet of String, String but schema is still missing
scala> ds3.show(5)
+-------+---+
| _1| _2|
+-------+---+
| John| 28|
| Andrew| 36|
| Clarke| 22|
| Kevin| 42|
|Richard| 51|
+-------+---+
scala> ds3.printSchema
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
hadoop@hadoop:~$ cat > samplefile.txt
spark is a big data technology
hadoop is a big data technology
spark and hadoop are big data technologies
^C
hadoop@hadoop:~$ hdfs dfs -copyFromLocal samplefile.txt /user/
scala> val r1 = sc.textFile("hdfs://localhost:9000/user/samplefile.txt")
r1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/samplefile.txt MapPartitionsRDD[8] at textFile at <console>:24
scala> r1.collect.foreach(println)
spark is a big data technology
hadoop is a big data technology
spark and hadoop are big data technologies
scala> val d1 = spark.read.format("text").load("hdfs://localhost:9000/user/samplefile.txt")
d1: org.apache.spark.sql.DataFrame = [value: string]
scala> d1.show
+--------------------+
| value|
+--------------------+
|spark is a big da...|
|hadoop is a big d...|
|spark and hadoop ...|
+--------------------+
Subscribe to:
Post Comments (Atom)
Flume - Simple Demo
// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...
-
How to fetch Spark Application Id programmaticall while running the Spark Job? scala> spark.sparkContext.applicationId res124: String = l...
-
input data: ---------- customerID, itemID, amount 44,8602,37.19 35,5368,65.89 2,3391,40.64 47,6694,14.98 29,680,13.08 91,8900,24.59 ...
-
pattern matching is similar to switch statements in C#, Java no fall-through - at least one condition matched no breaks object PatternExa { ...
No comments:
Post a Comment