from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("aggregations").getOrCreate()
myList = [1,2,3,4,5,6,7,8,9,10,11]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.getNumPartitions())
#answer : 3
def f1(x): yield list(x)
def f2(x) : yield sum(x)
r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())
#answer : [[1, 2, 3], [4, 5, 6], [7, 8, 9, 10, 11]]
r3 = r1.mapPartitions(lambda x: f2(x))
print(r3.collect())
#answer : [6, 15, 45]
r4 = r1.coalesce(1)
print(r4.getNumPartitions())
#answer : 1
r4.reduce(lambda x,y : x+y)
#answer : 66
r1.reduce(lambda a,b: a-b)
#answer : 34
#find Min and Max of a given list
def findMax(x,y):
if x > y:
return x
else:
return y
def findMin(x,y):
if x < y:
return x
else:
return x
myList = [1,2,3,4,5,6,7,8,9,10,11]
r1 = spark.sparkContext.parallelize(myList)
a3 = r1.reduce(lambda x,y : findMax(x,y))
print(a3)
#answer : 11
a4 = r1.reduce(lambda x,y : findMin(x,y))
print(a4)
#answer : 1
Fold example:
myList = [5,10]
r1 = spark.sparkContext.parallelize(myList)
a1 = r1.reduce(lambda x,y : x+y)
print(a1)
a3 = r1.fold(5,lambda x,y : x+y) # (5+5) + (5+10+10) + 5
print(a3)
a4 = r1.fold(0,lambda x,y : x+y) #(0+5) + (0+5+10) + 0
print(a4)
5+5 => 10+10 => 20+5
myList = ["Arun","Vijay","Kalai","Mani","Nila","Raji","Waran","Bill"]
r1 = spark.sparkContext.parallelize(myList,3)
def f1(x): yield list(x)
r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())
#answer: [['Arun', 'Vijay'], ['Kalai', 'Mani'], ['Nila', 'Raji', 'Waran', 'Bill']]
myList = [20000,12000,30000,25000,42000,10000]
r1 = spark.sparkContext.parallelize(myList,2)
def f1(x): yield list(x)
r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())
#answer : [[20000, 12000, 30000], [25000, 42000, 10000]]
def findMin(x,y):
if x <= y:
return x
else:
return y
rd3 = r1.fold(5000,lambda x,y:findMin(x,y))
print(rd3)
[[20000, 12000, 30000], [25000, 42000, 10000]]
5000
myList = [20000,12000,30000,25000,42000,10000]
r1 = spark.sparkContext.parallelize(myList,2)
def f1(x): yield list(x)
r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())
#answer : [[2000, 1000, 3000], [2500, 4200, 1000]]
def findMin(x,y):
if x <= y:
return x
else:
return y
rd3 = r1.fold(42005,lambda x,y:findMin(x,y))
print(rd3)
[[20000, 12000, 30000], [25000, 42000, 10000]]
10000
myList = [1,2,3,4,5,6,7,8]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())
def f1(x): yield list(x)
r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())
[1, 2, 3, 4, 5, 6, 7, 8]
[[1, 2], [3, 4], [5, 6], [7, 8]]
c1 = r1.aggregate(2,(lambda x,y:x+y), (lambda x,y:x-y))
print(c1)
-42
myList = [1,2,3,4,5,6,7,8]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())
def f1(x): yield list(x)
r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())
[1, 2, 3, 4, 5, 6, 7, 8]
[[1, 2], [3, 4], [5, 6, 7, 8]]
c1 = r1.aggregate(2,(lambda x,y:x+y), (lambda x,y:x-y))
print(c1)
-40
reduce(f1)
f1 will be applied on partitions and results of partitions
fold(zv,f1)
f1 will be applied on partitions and results of partitions with zv
aggregate(zv,f1,f2)
f1 will be applied on partitions with zv and
f2 will be applied on the results of partitions with zv
No comments:
Post a Comment