a = spark.sparkContext.parallelize((1,2,3,4,5,6))
print(a.getNumPartitions)
4
from pyspark.sql.types import Row
from datetime import datetime
simple_data = sc.parallelize( ([100,"Raja",500], [101, "Kala",550], [102,"Raji",650]))
print(simple_data.count())
3
print(simple_data.first())
[100, 'Raja', 500]
print(simple_data.take(2))
[[100, 'Raja', 500], [101, 'Kala', 550]]
print(simple_data.collect())
[[100, 'Raja', 500], [101, 'Kala', 550], [102, 'Raji', 650]]
df = simple_data.toDF(["RollNo","Name","Wages"])
df.printSchema()
root
|-- RollNo: long (nullable = true)
|-- Name: string (nullable = true)
|-- Wages: long (nullable = true)
df.show()
+------+----+-----+
|RollNo|Name|Wages|
+------+----+-----+
| 100|Raja| 500|
| 101|Kala| 550|
| 102|Raji| 650|
+------+----+-----+
studentsRDD = sc.parallelize([
Row(id=100,name="Ravi",score=50,Gender="M"),
Row(id=101,name="Janani",score=66,Gender="F"),
Row(id=102,name="Siva",score=63,Gender="M"),
Row(id=103,name="Aishvarya",score=58,Gender="F"),
Row(id=104,name="Vijay",score=52,Gender="M"),
Row(id=105,name="Varadaraj",score=55,Gender="M"),
Row(id=106,name="Parvathi",score=60,Gender="F"),
Row(id=107,name="Aasqhique",score=62,Gender="M")
])
print(studentsRDD.take(3))
[Row(Gender='M', id=100, name='Ravi', score=50), Row(Gender='F', id=101, name='Janani', score=66), Row(Gender='M', id=102, name='Siva', score=63)]
studentsRDD.count()
8
studentsRDD.collect()
[Row(Gender='M', id=100, name='Ravi', score=50),
Row(Gender='F', id=101, name='Janani', score=66),
Row(Gender='M', id=102, name='Siva', score=63),
Row(Gender='F', id=103, name='Aishvarya', score=58),
Row(Gender='M', id=104, name='Vijay', score=52),
Row(Gender='M', id=105, name='Varadaraj', score=55),
Row(Gender='F', id=106, name='Parvathi', score=60),
Row(Gender='M', id=107, name='Aasqhique', score=62)]
df = studentsRDD.toDF()
df.show()
+------+---+---------+-----+
|Gender| id| name|score|
+------+---+---------+-----+
| M|100| Ravi| 50|
| F|101| Janani| 66|
| M|102| Siva| 63|
| F|103|Aishvarya| 58|
| M|104| Vijay| 52|
| M|105|Varadaraj| 55|
| F|106| Parvathi| 60|
| M|107|Aasqhique| 62|
+------+---+---------+-----+
// Display only Male candiates
from pyspark.sql import functions as F
df.select("id","name","score","Gender").where(F.col("Gender") == "M").show()
+---+---------+-----+------+
| id| name|score|Gender|
+---+---------+-----+------+
|100| Ravi| 50| M|
|102| Siva| 63| M|
|104| Vijay| 52| M|
|105|Varadaraj| 55| M|
|107|Aasqhique| 62| M|
+---+---------+-----+------+
// Display only Female candiates
from pyspark.sql import functions as F
df.where(F.col("Gender") == "F").show()
+------+---+---------+-----+
|Gender| id| name|score|
+------+---+---------+-----+
| F|101| Janani| 66|
| F|103|Aishvarya| 58|
| F|106| Parvathi| 60|
+------+---+---------+-----+
df.printSchema()
root
|-- Gender: string (nullable = true)
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- score: long (nullable = true)
from pyspark.sql import functions as F
df.groupBy("Gender").agg(F.sum(F.col("score")).alias("Summ")).show()
+------+----+
|Gender|Summ|
+------+----+
| F| 184|
| M| 282|
+------+----+
cmplx_data = sc.parallelize([Row(
col_list = ['a','b','c'],
col_dict = {"India":"Delhi"},
col_row = Row(id=100,name="Ravi",score=33,city="Bangalore",Gender="M"),
col_time =datetime(2020,1,1,14,1,5)
),
Row(
col_list = ['z','y','x','w'],
col_dict = {"Srilanka":"Colombo"},
col_row = Row(id=100,name="Ravi",score=33,city="Bangalore",Gender="M"),
col_time =datetime(2019,2,1,14,1,5)
),
Row(
col_list = ['m','n','o','q','r'],
col_dict = {"Myanmar":"Naypyidaw"},
col_row = Row(id=100,name="Ravi",score=33,city="Bangalore",Gender="M"),
col_time =datetime(2018,3,1,14,1,5)
),
Row(
col_list = ['s','t','u','v','w','x','y'],
col_dict = {"Pakistan":"Islamabad"},
col_row = Row(id=100,name="Ravi",score=33,city="Bangalore",Gender="M"),
col_time =datetime(2017,4,1,14,1,5)
),
Row(
col_list = ['h','i','j','k','l','m','n','o','p'],
col_dict = {"Bangaladesh":"Dhaka"},
col_row = Row(id=100,name="Ravi",score=33,city="Bangalore",Gender="M"),
col_time =datetime(2016,5,1,14,1,5)
)
])
df = cmplx_data.toDF()
// Display the complex datatypes schema
df.printSchema()
root
|-- col_dict: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- col_list: array (nullable = true)
| |-- element: string (containsNull = true)
|-- col_row: struct (nullable = true)
| |-- Gender: string (nullable = true)
| |-- city: string (nullable = true)
| |-- id: long (nullable = true)
| |-- name: string (nullable = true)
| |-- score: long (nullable = true)
|-- col_time: timestamp (nullable = true)
df.show()
+--------------------+--------------------+--------------------+-------------------+
| col_dict| col_list| col_row| col_time|
+--------------------+--------------------+--------------------+-------------------+
| [India -> Delhi]| [a, b, c]|[M, Bangalore, 10...|2020-01-01 14:01:05|
|[Srilanka -> Colo...| [z, y, x, w]|[M, Bangalore, 10...|2019-02-01 14:01:05|
|[Myanmar -> Naypy...| [m, n, o, q, r]|[M, Bangalore, 10...|2018-03-01 14:01:05|
|[Pakistan -> Isla...|[s, t, u, v, w, x...|[M, Bangalore, 10...|2017-04-01 14:01:05|
|[Bangaladesh -> D...|[h, i, j, k, l, m...|[M, Bangalore, 10...|2016-05-01 14:01:05|
+--------------------+--------------------+--------------------+-------------------+
// Display Year and Month of col_time
from pyspark.sql import functions as F
df.select(F.year(F.col("col_time")).alias("Year"), F.month(F.col("col_time")).alias("Month")).show()
+----+-----+
|Year|Month|
+----+-----+
|2020| 1|
|2019| 2|
|2018| 3|
|2017| 4|
|2016| 5|
+----+-----+
No comments:
Post a Comment