Pair RDD Aggregations:
myList=[('a',10),('b',20),('c',15),('d',25),('a',30),('b',26),('c',10),('a',10),('d',10)]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())
def f1(x) : yield list(x)
r2 = r1.mapPartitions(lambda x:f1(x))
print(r2.collect())
[('a', 10), ('b', 20), ('c', 15), ('d', 25), ('a', 30), ('b', 26), ('c', 10), ('a', 10), ('d', 10)]
[[('a', 10), ('b', 20), ('c', 15)], [('d', 25), ('a', 30), ('b', 26)], [('c', 10), ('a', 10), ('d', 10)]]
re1 = r1.countByKey()
print(re1)
defaultdict(<class 'int'>, {'a': 3, 'b': 2, 'c': 2, 'd': 2})
re2 = r1.countByValue()
print(re2)
defaultdict(<class 'int'>, {('a', 10): 2, ('b', 20): 1, ('c', 15): 1, ('d', 25): 1, ('a', 30): 1, ('b', 26): 1, ('c', 10): 1, ('d', 10): 1})
re3 = r1.sortByKey()
print(re3.collect())
[('a', 10), ('a', 30), ('a', 10), ('b', 20), ('b', 26), ('c', 15), ('c', 10), ('d', 25), ('d', 10)]
re4 = r1.reduceByKey(lambda x,y : x+y)
print(re4.collect())
[('d', 35), ('b', 46), ('a', 50), ('c', 25)]
myList = [1,2,3,4,5,6,7,8]
r = spark.sparkContext.parallelize(myList,3)
re1 = r.count()
print(re1)
8
myList=[('a',10),('b',20),('c',15),('d',25),('a',30),('b',26),('c',10),('a',10),('d',10)]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())
re4 = r1.reduceByKey(lambda x,y : x+y)
print(re4.sortByKey().collect())
[('a', 10), ('b', 20), ('c', 15), ('d', 25), ('a', 30), ('b', 26), ('c', 10), ('a', 10), ('d', 10)]
[('a', 50), ('b', 46), ('c', 25), ('d', 35)]
myList=[('a',10),('b',20),('c',15),('d',25),('a',30),('b',26),('c',10),('a',10),('d',10)]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())
def f1(x) : yield list(x)
r2 = r1.mapPartitions(lambda x:f1(x))
print(r2.collect())
re1 = r1.reduceByKey(lambda x,y : x+y)
print(re1.collect())
re2 = re1.sortByKey().collect()
print(re2)
[('a', 10), ('b', 20), ('c', 15), ('d', 25), ('a', 30), ('b', 26), ('c', 10), ('a', 10), ('d', 10)]
[[('a', 10), ('b', 20), ('c', 15)], [('d', 25), ('a', 30), ('b', 26)], [('c', 10), ('a', 10), ('d', 10)]]
[('d', 35), ('b', 46), ('a', 50), ('c', 25)]
[('a', 50), ('b', 46), ('c', 25), ('d', 35)]
re3 = r1.foldByKey(2,lambda x,y : x+y)
print(re3.sortByKey().collect())
[('a', 56), ('b', 50), ('c', 29), ('d', 39)]
No comments:
Post a Comment