Sunday, 16 August 2020

RDD and DataFrame Examples in Pyspark

a = spark.sparkContext.parallelize((1,2,3,4,5,6))
print(a.getNumPartitions) 
4


from pyspark.sql.types import Row
from datetime import datetime

simple_data = sc.parallelize( ([100,"Raja",500], [101, "Kala",550], [102,"Raji",650]))

print(simple_data.count())
3


print(simple_data.first())
[100, 'Raja', 500]


print(simple_data.take(2))
[[100, 'Raja', 500], [101, 'Kala', 550]]


print(simple_data.collect())
[[100, 'Raja', 500], [101, 'Kala', 550], [102, 'Raji', 650]]
 


df = simple_data.toDF(["RollNo","Name","Wages"])

df.printSchema()
root
 |-- RollNo: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Wages: long (nullable = true)


df.show()

+------+----+-----+
|RollNo|Name|Wages|
+------+----+-----+
|   100|Raja|  500|
|   101|Kala|  550|
|   102|Raji|  650|
+------+----+-----+


studentsRDD =  sc.parallelize([
Row(id=100,name="Ravi",score=50,Gender="M"),
Row(id=101,name="Janani",score=66,Gender="F"),
Row(id=102,name="Siva",score=63,Gender="M"), 
Row(id=103,name="Aishvarya",score=58,Gender="F"), 
Row(id=104,name="Vijay",score=52,Gender="M"),
Row(id=105,name="Varadaraj",score=55,Gender="M"),
Row(id=106,name="Parvathi",score=60,Gender="F"),
Row(id=107,name="Aasqhique",score=62,Gender="M")
])


print(studentsRDD.take(3))

[Row(Gender='M', id=100, name='Ravi', score=50), Row(Gender='F', id=101, name='Janani', score=66), Row(Gender='M', id=102, name='Siva', score=63)]



studentsRDD.count()
8

studentsRDD.collect()
[Row(Gender='M', id=100, name='Ravi', score=50),
 Row(Gender='F', id=101, name='Janani', score=66),
 Row(Gender='M', id=102, name='Siva', score=63),
 Row(Gender='F', id=103, name='Aishvarya', score=58),
 Row(Gender='M', id=104, name='Vijay', score=52),
 Row(Gender='M', id=105, name='Varadaraj', score=55),
 Row(Gender='F', id=106, name='Parvathi', score=60),
 Row(Gender='M', id=107, name='Aasqhique', score=62)]
 
 



df = studentsRDD.toDF()

df.show()
 +------+---+---------+-----+
|Gender| id|     name|score|
+------+---+---------+-----+
|     M|100|     Ravi|   50|
|     F|101|   Janani|   66|
|     M|102|     Siva|   63|
|     F|103|Aishvarya|   58|
|     M|104|    Vijay|   52|
|     M|105|Varadaraj|   55|
|     F|106| Parvathi|   60|
|     M|107|Aasqhique|   62|
+------+---+---------+-----+



// Display only Male candiates

from pyspark.sql import functions as F
df.select("id","name","score","Gender").where(F.col("Gender") == "M").show()

+---+---------+-----+------+
| id|     name|score|Gender|
+---+---------+-----+------+
|100|     Ravi|   50|     M|
|102|     Siva|   63|     M|
|104|    Vijay|   52|     M|
|105|Varadaraj|   55|     M|
|107|Aasqhique|   62|     M|
+---+---------+-----+------+

// Display only Female candiates 
from pyspark.sql import functions as F
df.where(F.col("Gender") == "F").show()

+------+---+---------+-----+
|Gender| id|     name|score|
+------+---+---------+-----+
|     F|101|   Janani|   66|
|     F|103|Aishvarya|   58|
|     F|106| Parvathi|   60|
+------+---+---------+-----+

df.printSchema()

root
 |-- Gender: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- score: long (nullable = true)
 
 
 
 
from pyspark.sql import functions as F
df.groupBy("Gender").agg(F.sum(F.col("score")).alias("Summ")).show()

+------+----+
|Gender|Summ|
+------+----+
|     F| 184|
|     M| 282|
+------+----+



cmplx_data = sc.parallelize([Row(
col_list = ['a','b','c'],
col_dict = {"India":"Delhi"},
col_row  = Row(id=100,name="Ravi",score=33,city="Bangalore",Gender="M"),
col_time =datetime(2020,1,1,14,1,5)
),
Row(
col_list = ['z','y','x','w'],
col_dict = {"Srilanka":"Colombo"},
col_row  = Row(id=100,name="Ravi",score=33,city="Bangalore",Gender="M"),
col_time =datetime(2019,2,1,14,1,5)
),
Row(
col_list = ['m','n','o','q','r'],
col_dict = {"Myanmar":"Naypyidaw"},
col_row  = Row(id=100,name="Ravi",score=33,city="Bangalore",Gender="M"),
col_time =datetime(2018,3,1,14,1,5)
),
Row(
col_list = ['s','t','u','v','w','x','y'],
col_dict = {"Pakistan":"Islamabad"},
col_row  = Row(id=100,name="Ravi",score=33,city="Bangalore",Gender="M"),
col_time =datetime(2017,4,1,14,1,5)
),
Row(
col_list = ['h','i','j','k','l','m','n','o','p'],
col_dict = {"Bangaladesh":"Dhaka"},
col_row  = Row(id=100,name="Ravi",score=33,city="Bangalore",Gender="M"),
col_time =datetime(2016,5,1,14,1,5)
)
])


df = cmplx_data.toDF()

// Display the complex datatypes schema
df.printSchema()

root
 |-- col_dict: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- col_list: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- col_row: struct (nullable = true)
 |    |-- Gender: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- score: long (nullable = true)
 |-- col_time: timestamp (nullable = true)
 
 df.show()
 
+--------------------+--------------------+--------------------+-------------------+
|            col_dict|            col_list|             col_row|           col_time|
+--------------------+--------------------+--------------------+-------------------+
|    [India -> Delhi]|           [a, b, c]|[M, Bangalore, 10...|2020-01-01 14:01:05|
|[Srilanka -> Colo...|        [z, y, x, w]|[M, Bangalore, 10...|2019-02-01 14:01:05|
|[Myanmar -> Naypy...|     [m, n, o, q, r]|[M, Bangalore, 10...|2018-03-01 14:01:05|
|[Pakistan -> Isla...|[s, t, u, v, w, x...|[M, Bangalore, 10...|2017-04-01 14:01:05|
|[Bangaladesh -> D...|[h, i, j, k, l, m...|[M, Bangalore, 10...|2016-05-01 14:01:05|
+--------------------+--------------------+--------------------+-------------------+


// Display Year and Month of col_time 
from pyspark.sql import functions as F
df.select(F.year(F.col("col_time")).alias("Year"), F.month(F.col("col_time")).alias("Month")).show()


+----+-----+
|Year|Month|
+----+-----+
|2020|    1|
|2019|    2|
|2018|    3|
|2017|    4|
|2016|    5|
+----+-----+

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...