Wednesday, 27 May 2020

Aggregation operations with Pair RDDs in Pyspark

Pair RDD Aggregations:

myList=[('a',10),('b',20),('c',15),('d',25),('a',30),('b',26),('c',10),('a',10),('d',10)]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())

def f1(x) : yield list(x)
    
r2 = r1.mapPartitions(lambda x:f1(x)) 
print(r2.collect())

[('a', 10), ('b', 20), ('c', 15), ('d', 25), ('a', 30), ('b', 26), ('c', 10), ('a', 10), ('d', 10)]
[[('a', 10), ('b', 20), ('c', 15)], [('d', 25), ('a', 30), ('b', 26)], [('c', 10), ('a', 10), ('d', 10)]]


re1 = r1.countByKey()
print(re1)


defaultdict(<class 'int'>, {'a': 3, 'b': 2, 'c': 2, 'd': 2})


re2 = r1.countByValue()
print(re2)

defaultdict(<class 'int'>, {('a', 10): 2, ('b', 20): 1, ('c', 15): 1, ('d', 25): 1, ('a', 30): 1, ('b', 26): 1, ('c', 10): 1, ('d', 10): 1})


re3 = r1.sortByKey()
print(re3.collect())

[('a', 10), ('a', 30), ('a', 10), ('b', 20), ('b', 26), ('c', 15), ('c', 10), ('d', 25), ('d', 10)]


re4  = r1.reduceByKey(lambda x,y : x+y)
print(re4.collect())

[('d', 35), ('b', 46), ('a', 50), ('c', 25)]




myList = [1,2,3,4,5,6,7,8]
r = spark.sparkContext.parallelize(myList,3)
re1 = r.count()
print(re1)

8


myList=[('a',10),('b',20),('c',15),('d',25),('a',30),('b',26),('c',10),('a',10),('d',10)]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())
re4  = r1.reduceByKey(lambda x,y : x+y)
print(re4.sortByKey().collect())

[('a', 10), ('b', 20), ('c', 15), ('d', 25), ('a', 30), ('b', 26), ('c', 10), ('a', 10), ('d', 10)]
[('a', 50), ('b', 46), ('c', 25), ('d', 35)]


myList=[('a',10),('b',20),('c',15),('d',25),('a',30),('b',26),('c',10),('a',10),('d',10)]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())

def f1(x) : yield list(x)
    
r2 = r1.mapPartitions(lambda x:f1(x)) 
print(r2.collect())

re1 = r1.reduceByKey(lambda x,y : x+y)
print(re1.collect())

re2 = re1.sortByKey().collect()
print(re2)

[('a', 10), ('b', 20), ('c', 15), ('d', 25), ('a', 30), ('b', 26), ('c', 10), ('a', 10), ('d', 10)]
[[('a', 10), ('b', 20), ('c', 15)], [('d', 25), ('a', 30), ('b', 26)], [('c', 10), ('a', 10), ('d', 10)]]
[('d', 35), ('b', 46), ('a', 50), ('c', 25)]
[('a', 50), ('b', 46), ('c', 25), ('d', 35)]


re3 = r1.foldByKey(2,lambda x,y : x+y)
print(re3.sortByKey().collect())

[('a', 56), ('b', 50), ('c', 29), ('d', 39)]


No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...