Tuesday, 29 January 2019

How to make our own schema for a dataframe?

How to do with StructType, StructField Examples in Spark with Scala?
How to make our own schema for a dataframe?
Input
file1.csv:
-----------
hdfs dfs -cat hdfs://localhost:9000/user/file1.csv
1,2,3.4
2,3,4.5
3,4.5,6.7
4,5,7.8

// we accidentally added header=true. so it shows 1,2,3.4 as headers
scala> val df1 = spark.read.format("csv").option("inferSchema","true").option("header","true").load("hdfs://localhost:9000/user/file1.csv")
df1: org.apache.spark.sql.DataFrame = [1: int, 2: double ... 1 more field]

scala> df1.printSchema
root
 |-- 1: integer (nullable = true)
 |-- 2: double (nullable = true)
 |-- 3.4: double (nullable = true)

// this one is autogenerated schema with header=true but all headers are just 1,2,3.4 numbers. that is wrong
scala> df1.show()
+---+---+---+
|  1|  2|3.4|
+---+---+---+
|  2|3.0|4.5|
|  3|4.5|6.7|
|  4|5.0|7.8|
+---+---+---+

// we removed header=true so it added _c0, _c1, _c2 as autogenerated headers.
// But how to use our own columns.. answer is given below.
scala> val df1 = spark.read.format("csv").option("inferSchema","true").load("hdfs://localhost:9000/user/file1.csv")
df1: org.apache.spark.sql.DataFrame = [_c0: int, _c1: double ... 1 more field]

scala> df1.show()
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|  1|2.0|3.4|
|  2|3.0|4.5|
|  3|4.5|6.7|
|  4|5.0|7.8|
+---+---+---+

// Instead of autogenerated structure, we are going to make our own structure for a dataframe
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._


// Here we are making our own schema
scala> val structure = StructType(StructField("id1",FloatType)::StructField("id2",FloatType)::StructField("id3",FloatType)::Nil)
structure: org.apache.spark.sql.types.StructType = StructType(StructField(id1,FloatType,true), StructField(id2,FloatType,true), StructField(id3,FloatType,true))

scala> val df1 = spark.read.format("csv").option("inferSchema","true").schema(structure).load("hdfs://localhost:9000/user/file1.csv")
df1: org.apache.spark.sql.DataFrame = [id1: float, id2: float ... 1 more field]

scala> df1.printSchema
root
 |-- id1: float (nullable = true)
 |-- id2: float (nullable = true)
 |-- id3: float (nullable = true)

scala> df1.show
+---+---+---+
|id1|id2|id3|
+---+---+---+
|1.0|2.0|3.4|
|2.0|3.0|4.5|
|3.0|4.5|6.7|
|4.0|5.0|7.8|
+---+---+---+

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...