Wednesday, 27 May 2020

Aggregate operations with RDDs in Pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("aggregations").getOrCreate()

myList = [1,2,3,4,5,6,7,8,9,10,11]
r1 = spark.sparkContext.parallelize(myList,3)

print(r1.getNumPartitions())
#answer : 3

def f1(x): yield list(x)

def f2(x) : yield sum(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())
#answer : [[1, 2, 3], [4, 5, 6], [7, 8, 9, 10, 11]]


r3 = r1.mapPartitions(lambda x: f2(x))
print(r3.collect())
#answer : [6, 15, 45]


r4 = r1.coalesce(1)
print(r4.getNumPartitions())
#answer : 1

r4.reduce(lambda x,y : x+y)
#answer : 66

r1.reduce(lambda a,b: a-b)
#answer : 34




#find Min and Max of a given list

def findMax(x,y):
if x > y:
return x
else:
return y
        
def findMin(x,y):
if x < y:
return x
else:
return x
myList = [1,2,3,4,5,6,7,8,9,10,11]
r1 = spark.sparkContext.parallelize(myList)
a3 = r1.reduce(lambda x,y : findMax(x,y))
print(a3)
#answer : 11

a4 = r1.reduce(lambda x,y : findMin(x,y))
print(a4)
#answer : 1


Fold example:

myList = [5,10]
r1 = spark.sparkContext.parallelize(myList)

a1 = r1.reduce(lambda x,y : x+y)
print(a1)

a3 = r1.fold(5,lambda x,y : x+y) # (5+5) + (5+10+10) + 5
print(a3)

a4 = r1.fold(0,lambda x,y : x+y) #(0+5) + (0+5+10) + 0
print(a4)


5+5 => 10+10 => 20+5



myList = ["Arun","Vijay","Kalai","Mani","Nila","Raji","Waran","Bill"]
r1 = spark.sparkContext.parallelize(myList,3)

def f1(x): yield list(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())

#answer: [['Arun', 'Vijay'], ['Kalai', 'Mani'], ['Nila', 'Raji', 'Waran', 'Bill']]



myList = [20000,12000,30000,25000,42000,10000]
r1 = spark.sparkContext.parallelize(myList,2)

def f1(x): yield list(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())

#answer : [[20000, 12000, 30000], [25000, 42000, 10000]]

def findMin(x,y):
if x <= y:
return x
else:
return y
rd3 = r1.fold(5000,lambda x,y:findMin(x,y))
print(rd3)

[[20000, 12000, 30000], [25000, 42000, 10000]]
5000

myList = [20000,12000,30000,25000,42000,10000]
r1 = spark.sparkContext.parallelize(myList,2)

def f1(x): yield list(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())

#answer : [[2000, 1000, 3000], [2500, 4200, 1000]]

def findMin(x,y):
if x <= y:
return x
else:
return y
rd3 = r1.fold(42005,lambda x,y:findMin(x,y))
print(rd3)

[[20000, 12000, 30000], [25000, 42000, 10000]]
10000


myList = [1,2,3,4,5,6,7,8]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())
def f1(x): yield list(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())

[1, 2, 3, 4, 5, 6, 7, 8]
[[1, 2], [3, 4], [5, 6], [7, 8]]


c1 = r1.aggregate(2,(lambda x,y:x+y), (lambda x,y:x-y))
print(c1)

-42





myList = [1,2,3,4,5,6,7,8]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())
def f1(x): yield list(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())
[1, 2, 3, 4, 5, 6, 7, 8]
[[1, 2], [3, 4], [5, 6, 7, 8]]

c1 = r1.aggregate(2,(lambda x,y:x+y), (lambda x,y:x-y))
print(c1)

-40



reduce(f1)
f1 will be applied on partitions and results of partitions
fold(zv,f1) 
f1 will be applied on partitions and results of partitions with zv
aggregate(zv,f1,f2)
f1 will be applied on partitions with zv and
f2 will be applied on the results of partitions with zv


No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...