Wednesday, 27 May 2020

Join operation in RDD - Pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()

r1 = spark.sparkContext.parallelize([ ('a',10),('b',5),('c',15),('d',12),('a',10),('b',30)])
r2 = spark.sparkContext.parallelize([ ('a',50),('b',15),('c',10),('d',15),('e',12),('c',10),('a',30)])

re1 = r1.join(r2)
print(re1.collect())

[('a', (10, 50)), ('a', (10, 30)), ('a', (10, 50)), ('a', (10, 30)), ('b', (5, 15)), ('b', (30, 15)), ('c', (15, 10)), ('c', (15, 10)), ('d', (12, 15))] 







from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()

r1 = spark.sparkContext.parallelize([ ('a',1),('b',5),('c',4)])
r2 = spark.sparkContext.parallelize([ ('a',2),('b',7),('c',7)])

re1 = r1.join(r2)
print(re1.collect())

[('a', (1, 2)), ('b', (5, 7)), ('c', (4, 7))]







from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()

r1 = spark.sparkContext.parallelize([ ('a',1),('a','6'),('b',5),('b',8),('c',4),('c',9)])
r2 = spark.sparkContext.parallelize([ ('a',2),('b',7),('c',7)])

re1 = r1.join(r2)
print(re1.collect())

[('a', (1, 2)), ('a', ('6', 2)), ('b', (5, 7)), ('b', (8, 7)), ('c', (4, 7)), ('c', (9, 7))]






from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()

r1 = spark.sparkContext.parallelize([ ('a',1),('a',2),('a',3),('b',5),('b',8),('b',9),('c',4),('c',9),('c',3)])
r2 = spark.sparkContext.parallelize([ ('a',2),('b',7),('c',7)])

re1 = r1.join(r2)
print(re1.collect())


[('a', (1, 2)), ('a', (2, 2)), ('a', (3, 2)), 
('b', (5, 7)), ('b', (8, 7)), ('b', (9, 7)), 
('c', (4, 7)), ('c', (9, 7)), ('c', (3, 7))]

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...