Word count program using Dataframe and DataSet:
-------------------------------------
scala> val df = spark.read.format("text").load("e://vow//sample.txt")
df2: org.apache.spark.sql.DataFrame = [value: string]
scala> df.printSchema
root
|-- value: string (nullable = true)
scala> df.show
+--------------------+
| value|
+--------------------+
|spark is bigdata...|
|hadoop is bigdata...|
|spark and hadoop ...|
+--------------------+
scala> val ds = df.as[String]
ds: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds.show
+--------------------+
| value|
+--------------------+
|spark is bigdata...|
|hadoop is bigdata...|
|spark and hadoop ...|
+--------------------+
scala> val ds1 = ds.map(x => x.split(" "))
ds1: org.apache.spark.sql.Dataset[Array[String]] = [value: array<string>]
scala> ds1.show
+--------------------+
| value|
+--------------------+
|[spark, is, , big...|
|[hadoop, is, bigd...|
|[spark, and, hado...|
+--------------------+
scala> val ds2 = ds.flatMap(x => x.split(" "))
ds2: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds2.show
+------------+
| value|
+------------+
| spark|
| is|
| |
| bigdata|
| technology|
| hadoop|
| is|
| bigdata|
| technology|
| spark|
| and|
| hadoop|
| are|
| bigdata|
|technologies|
+------------+
scala> val ds3 = ds2.map(x => (x,1))
ds3: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]
scala> ds3.show
+------------+---+
| _1| _2|
+------------+---+
| spark| 1|
| is| 1|
| | 1|
| bigdata| 1|
| technology| 1|
| hadoop| 1|
| is| 1|
| bigdata| 1|
| technology| 1|
| spark| 1|
| and| 1|
| hadoop| 1|
| are| 1|
| bigdata| 1|
|technologies| 1|
+------------+---+
scala> df.groupBy("_1").agg(count("_2")).show
+------------+---------+
| _1|count(_2)|
+------------+---------+
| technology| 2|
| is| 2|
| spark| 2|
| and| 1|
| are| 1|
|technologies| 1|
| bigdata| 3|
| | 1|
| hadoop| 2|
+------------+---------+
-------------------------------------
scala> val df = spark.read.format("text").load("e://vow//sample.txt")
df2: org.apache.spark.sql.DataFrame = [value: string]
scala> df.printSchema
root
|-- value: string (nullable = true)
scala> df.show
+--------------------+
| value|
+--------------------+
|spark is bigdata...|
|hadoop is bigdata...|
|spark and hadoop ...|
+--------------------+
scala> val ds = df.as[String]
ds: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds.show
+--------------------+
| value|
+--------------------+
|spark is bigdata...|
|hadoop is bigdata...|
|spark and hadoop ...|
+--------------------+
scala> val ds1 = ds.map(x => x.split(" "))
ds1: org.apache.spark.sql.Dataset[Array[String]] = [value: array<string>]
scala> ds1.show
+--------------------+
| value|
+--------------------+
|[spark, is, , big...|
|[hadoop, is, bigd...|
|[spark, and, hado...|
+--------------------+
scala> val ds2 = ds.flatMap(x => x.split(" "))
ds2: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds2.show
+------------+
| value|
+------------+
| spark|
| is|
| |
| bigdata|
| technology|
| hadoop|
| is|
| bigdata|
| technology|
| spark|
| and|
| hadoop|
| are|
| bigdata|
|technologies|
+------------+
scala> val ds3 = ds2.map(x => (x,1))
ds3: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]
scala> ds3.show
+------------+---+
| _1| _2|
+------------+---+
| spark| 1|
| is| 1|
| | 1|
| bigdata| 1|
| technology| 1|
| hadoop| 1|
| is| 1|
| bigdata| 1|
| technology| 1|
| spark| 1|
| and| 1|
| hadoop| 1|
| are| 1|
| bigdata| 1|
|technologies| 1|
+------------+---+
scala> df.groupBy("_1").agg(count("_2")).show
+------------+---------+
| _1|count(_2)|
+------------+---------+
| technology| 2|
| is| 2|
| spark| 2|
| and| 1|
| are| 1|
|technologies| 1|
| bigdata| 3|
| | 1|
| hadoop| 2|
+------------+---------+
No comments:
Post a Comment