Monday, 10 August 2020

Spark RDD programming using Scala - Spark Core Examples

scala> val rdd = sc.parallelize(List(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val resultRDD = rdd.map ( x => x * x )
resultRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25

scala> resultRDD.collect()
res0: Array[Int] = Array(1, 4, 9, 16, 25)

scala> resultRDD.foreach(println)
9
4
1
16
25

// We have a file named : employee.txt 
D:\\Ex\\employee.txt

John,33,US
Jim,44,UK
Nav,30,India

// Reading the file content and create RDD
scala> val rdd = sc.textFile("D:\\Ex\\employee.txt")
rdd: org.apache.spark.rdd.RDD[String] = D:\Ex\employee.txt MapPartitionsRDD[3] at textFile at <conso

// find the length of each line
scala> val resultRDD = rdd.map (x => (x, x.length))
resultRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:25

scala> resultRDD.foreach(println)
(John,33,US,10)
(Nav,30,India,12)
(Jim,44,UK,9)

scala> resultRDD.collect()
res3: Array[(String, Int)] = Array((John,33,US,10), (Jim,44,UK,9), (Nav,30,India,12))





scala> val rdd = sc.textFile("D:\\Ex\\employee.txt")
rdd: org.apache.spark.rdd.RDD[String] = D:\Ex\employee.txt MapPartitionsRDD[3] at textFile at <console>:24

scala> rdd.flatMap(x => x.split(",")).collect()
res4: Array[String] = Array(John, 33, US, Jim, 44, UK, Nav, 30, India)

scala> val resultRDD =  rdd.flatMap(x => x.split(","))
resultRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at flatMap at <console>:25

scala> resultRDD.foreach(println)
Nav
30
India
John
33
US
Jim
44
UK

scala> resultRDD.collect()
res6: Array[String] = Array(John, 33, US, Jim, 44, UK, Nav, 30, India)











scala> val rdd = sc.makeRDD (List(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at makeRDD at <console>:24

scala> val resultRDD = rdd.filter( x => x%2 == 0)
resultRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at filter at <console>:25

scala> resultRDD.collect()
res7: Array[Int] = Array(2, 4)

scala> resultRDD.foreach(println)
4
2


scala> val rdd = sc.textFile("D:\\Ex\\employee.txt")
rdd: org.apache.spark.rdd.RDD[String] = D:\Ex\employee.txt MapPartitionsRDD[10] at textFile at <console>:24

scala> val filtered = rdd.filter ( line => line.split(",")(1).toInt < 40)
filtered: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at filter at <console>:25

scala> filtered.foreach(println)
Nav,30,India
John,33,US



scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(1,2,3,6,7))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:24


// intersection
scala> val resultRDD = rdd1.intersection(rdd2)
resultRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[20] at intersection at <console>:27

scala> resultRDD.collect()
res12: Array[Int] = Array(1, 2, 3)

scala> resultRDD.foreach(println)
2
3
1


scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(3,5))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24

// union
scala> val resultRDD = rdd1.union(rdd2)
resultRDD: org.apache.spark.rdd.RDD[Int] = UnionRDD[23] at union at <console>:27

scala> resultRDD.collect()
res14: Array[Int] = Array(1, 2, 3, 4, 5, 3, 5)

// Find the Distinct - Unique Values
scala> resultRDD.distinct().collect()
res21: Array[Int] = Array(1, 2, 3, 4, 5)



// Find the distinct 
scala> val rdd1 = sc.parallelize(List(1,1,2,3,3,2,2,1,4,5,5,4,4,5,5,3,2,1,2,3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:2

scala> val resultRDD = rdd1.distinct()
resultRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[30] at distinct at <console>:25

scala> resultRDD.collect()
res22: Array[Int] = Array(4, 1, 5, 2, 3)




// Find the union
scala> val rdd1 = sc.makeRDD(List("Raja","Ramesh","Siva"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[32] at makeRDD at <console>:24

scala> val rdd2 = sc.parallelize(List("Arun","Viswa","Kalai"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[33] at parallelize at <console>:24

scala> val resultRDD = rdd1.union(rdd2)
resultRDD: org.apache.spark.rdd.RDD[String] = UnionRDD[34] at union at <console>:27

scala> resultRDD.foreach(println)
Raja
Siva
Ramesh
Kalai
Viswa
Arun

birth.txt:
-----------
Dec,Arun
March,Kalai
April,Nila
April,John
Dec,Silva
January,Priya
Dec,Ashwin
January,Ohm
March,Sastha
January,Ashwath
April,Agila
April,Alex
January,Sridevi
Dec,Sridhar
March,Neelaveni


scala> val rdd1 = sc.textFile("D:\\Ex\\birth.txt")
rdd1: org.apache.spark.rdd.RDD[String] = D:\Ex\birth.txt MapPartitionsRDD[36] at textFile at <console>:24

case class birth (month:String, name:String)

val pairRDD = rdd1.map { line => 
              val w = line.split(",")
  val month = w(0)
  val name = w(1)
  (month,name)
}


scala> pairRDD.collect()
res27: Array[(String, String)] = Array((Dec,Arun), (March,Kalai), (April,Nila), (April,John), (Dec,Silva), (

scala> pairRDD.foreach(println)
(Dec,Arun)
(March,Kalai)
(April,Nila)
(April,John)
(January,Ashwath)
(Dec,Silva)
(April,Agila)
(January,Priya)
(April,Alex)
(Dec,Ashwin)
(January,Sridevi)
(January,Ohm)
(Dec,Sridhar)
(March,Sastha)
(March,Neelaveni)

scala> val groupedRDD = pairRDD.groupByKey()
groupedRDD: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[38] at groupByKey at <console

scala> groupedRDD.collect()
res29: Array[(String, Iterable[String])] = Array((April,CompactBuffer(Nila, John, Agila, Alex)), (January,Co

scala> groupedRDD.foreach(println)
(March,CompactBuffer(Kalai, Sastha, Neelaveni))
(April,CompactBuffer(Nila, John, Agila, Alex))
(January,CompactBuffer(Priya, Ohm, Ashwath, Sridevi))
(Dec,CompactBuffer(Arun, Silva, Ashwin, Sridhar))



 


// SortByKey Example

val rdd1 = sc.textFile("D:\\Ex\\birth.txt")
 
case class birth (month:String, name:String)

val pairRDD = rdd1.map { line => 
              val w = line.split(",")
  val month = w(0)
  val name = w(1)
  (month,name)
}
val sortedRDD = pairRDD.sortByKey()
 
scala> sortedRDD.collect()
res7: Array[(String, String)] = Array((April,Nila), (April,John), (April,Agila), (April,Alex), (Dec,Arun), (Dec,Silva), (Dec,Ashwin), (Dec,Sridhar), (January,Priya), (January,Ohm)


scala> sortedRDD.foreach(println)
(April,Nila)
(January,Priya)
(January,Ohm)
(April,John)
(January,Ashwath)
(April,Agila)
(January,Sridevi)
(April,Alex)
(March,Kalai)
(Dec,Arun)
(March,Sastha)
(Dec,Silva)
(March,Neelaveni)
(Dec,Ashwin)
(Dec,Sridhar)


scala> groupedRDD.toDebugString
res31: String =
(2) ShuffledRDD[38] at groupByKey at <console>:25 []
 +-(2) MapPartitionsRDD[37] at map at <console>:25 []
    |  D:\Ex\birth.txt MapPartitionsRDD[36] at textFile at <console>:24 []
    |  D:\Ex\birth.txt HadoopRDD[35] at textFile at <console>:24 []
// ReduceByKey Example
 
veggies.txt:
------------
Tomato,10
Brinjal,20
Tomato,20
Brinjal,33
Carrot,23
Potato,20
Carrot,50
Potato,23
Brinjal,22
Potato,20
Carrot,23


scala> val rdd1 = sc.textFile("D:\\Ex\\veggies.txt")
rdd1: org.apache.spark.rdd.RDD[String] = D:\Ex\veggies.txt MapPartitionsRDD[40] at textFile at <console>:24

scala> case class veggies (name:String, price:Int)
defined class veggies

scala> val pairRDD = rdd1.map{ line =>
       val w = line.split(",")
       val name = w(0)
       val price = w(1).toInt
       (name,price)
       }
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[41] at map at <console>:25

scala> val reducedRDD = pairRDD.reduceByKey( (x,y) => (x + y))
reducedRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[42] at reduceByKey at <console>:2

OR

scala> val reducedRDD = pairRDD.reduceByKey(_ + _)
reducedRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[43] at reduceByKey at <console>:2


scala> reducedRDD.foreach(println)
(Brinjal,75)
(Potato,63)
(Tomato,30)
(Carrot,96)


scala>  reducedRDD.sortByKey().collect().foreach(println)
(Brinjal,75)
(Carrot,96)
(Potato,63)
(Tomato,30)

// Reduce Vs Sum
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[44] at parallelize at <console>:24

// Scala function - shuffling then sum
scala> val summation = rdd1.sum()
summation: Double = 55.0

// sum then shuffling - data movement between nodes are less
scala> val summ = rdd1.reduce(_+_)
summ: Int = 55






students_roll.txt:
------------------
10,Ravi
11,Rahul
9,Lisa
7,Radha
5,Binal
6,Dev
8,Raj
1,Sri
3,Kalai
2,Arul
4,Priya


scala> val rdd1 = sc.textFile("D:\\Ex\\students_roll.txt")
rdd1: org.apache.spark.rdd.RDD[String] = D:\Ex\students_roll.txt MapPartitionsRDD[38] at textFile at <c

scala> case class student(rollno:Int, name:String)
defined class student

 val pairRDD = rdd1.map { line =>
 val w = line.split(",")
 val rollno = w(0).toInt
 val name = w(1)
 (rollno, name)
 }
 
 
 scala> pairRDD.collect()
res21: Array[(Int, String)] = Array((10,Ravi), (11,Rahul), (9,Lisa), (7,Radha), (5,Binal), (6,Dev), (8,


// Ascending Order
scala> pairRDD.sortByKey(fale).collect().foreach(println)
(1,Sri)
(2,Arul)
(3,Kalai)
(4,Priya)
(5,Binal)
(6,Dev)
(7,Radha)
(8,Raj)
(9,Lisa)
(10,Ravi)
(11,Rahul)

// Descending Order
scala> pairRDD.sortByKey(false).collect().foreach(println)
(11,Rahul)
(10,Ravi)
(9,Lisa)
(8,Raj)
(7,Radha)
(6,Dev)
(5,Binal)
(4,Priya)
(3,Kalai)
(2,Arul)
(1,Sri)

// SortBy Example
scala> pairRDD.sortBy(x => x._2).collect().foreach(println)
(2,Arul)
(5,Binal)
(6,Dev)
(3,Kalai)
(9,Lisa)
(4,Priya)
(7,Radha)
(11,Rahul)
(8,Raj)
(10,Ravi)
(1,Sri)

scala> pairRDD.sortBy(x => x._1).collect().foreach(println)
(1,Sri)
(2,Arul)
(3,Kalai)
(4,Priya)
(5,Binal)
(6,Dev)
(7,Radha)
(8,Raj)
(9,Lisa)
(10,Ravi)
(11,Rahul)


scala> pairRDD.sortBy(x => x._1,false).collect().foreach(println)
(11,Rahul)
(10,Ravi)
(9,Lisa)
(8,Raj)
(7,Radha)
(6,Dev)
(5,Binal)
(4,Priya)
(3,Kalai)
(2,Arul)
(1,Sri)






scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd1.partitions.size
res0: Int = 4

scala> val rdd2 = sc.parallelize(List(1,2,3,4,5,6),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> rdd2.partitions.size
res1: Int = 2

scala> rdd1.getNumPartitions
res2: Int = 4

scala> rdd2.getNumPartitions
res3: Int = 2


scala> val rdd1 = sc.textFile("D:\\Ex\\students_roll.txt")
rdd1: org.apache.spark.rdd.RDD[String] = D:\Ex\students_roll.txt MapPartitionsRDD[3] at textFi

scala> rdd1.getNumPartitions
res4: Int = 2


scala> val rdd1 = sc.textFile("D:\\Ex\\students_roll.txt",3)
rdd1: org.apache.spark.rdd.RDD[String] = D:\Ex\students_roll.txt MapPartitionsRDD[7] at textFi

scala> rdd1.getNumPartitions
res6: Int = 3

scala> rdd1.partitions.size
res7: Int = 3

scala> rdd1.partitions.length
res8: Int = 3


scala> val resultRDD = rdd1.mapPartitions(x => List(x.toList).iterator)
resultRDD: org.apache.spark.rdd.RDD[List[String]] = MapPartitionsRDD[8] at mapPartitions at <console>:25

scala> resultRDD.collect
res9: Array[List[String]] = Array(List(10,Ravi, 11,Rahul, 9,Lisa, 7,Radha), List(5,Binal, 6,Dev, 8,Raj, 1,Sri), List(3,Kalai


scala> resultRDD.collect().foreach(println)
List(10,Ravi, 11,Rahul, 9,Lisa, 7,Radha)
List(5,Binal, 6,Dev, 8,Raj, 1,Sri)
List(3,Kalai, 2,Arul, 4,Priya)



Coalese Example:  
// We can reduce the number of partitions using Coalesce. We can't increase the number of partitions

scala> val rdd1 = sc.parallelize(List(1,1,2,2,3,4,5,6,7,8,9,10),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24

scala> rdd1.partitions.size
res11: Int = 3

scala> rdd1.getNumPartitions
res12: Int = 3

scala> val coalescedRDD = rdd1.coalesce(1)
coalescedRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[10] at coalesce at <console>:25

scala> coalescedRDD.getNumPartitions
res13: Int = 1


Repartition Example:

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <conso

scala> rdd1.getNumPartitions
res17: Int = 2

// Increasing the number of partitions
scala> val rdd2 = rdd1.repartition(5)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[24] at repartition at <console>:2

scala> rdd2.getNumPartitions
res20: Int = 5


// Decreasing the number of partitions
scala> val rdd2 = rdd1.repartition(1)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[28] at repartition at <console>:2

scala> rdd2.partitions.size
res21: Int = 1



// Not applicable to increase the partitions using Coalesce
scala> val rdd3 = rdd1.coalesce(10)
rdd3: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[29] at coalesce at <console>:25

scala> rdd3.getNumPartitions
res22: Int = 2




val rdd1 = sc.parallelize(Array(("tamil","tamilnenjam@gmail.com"),("vijay","vijaybalajitheboss@gmail.com"),("aish","aishvaryasudha@gmail.com"),("raja","raja@gmail.com")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[30] at parallelize at <console>:24


val rdd2 = sc.parallelize(Array(("maha","mahalakshmi@gmail.com"),("tamil","tamilnenjam2017@gmail.com"),("vijay","vijaybalaji2001@gmail.com"),("aish","aishvaryasara@gmail.com")))

scala> val joinedRDD = rdd1.join(rdd2)
joinedRDD: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[37] at join at <console>:27

scala>  joinedRDD.collect.foreach(println)
(aish,(aishvaryasudha@gmail.com,aishvaryasara@gmail.com))
(tamil,(tamilnenjam@gmail.com,tamilnenjam2017@gmail.com))
(vijay,(vijaybalajitheboss@gmail.com,vijaybalaji2001@gmail.com))



scala> rdd1.leftOuterJoin(rdd2).collect().foreach(println)
(aish,(aishvaryasudha@gmail.com,Some(aishvaryasara@gmail.com)))
(raja,(raja@gmail.com,None))  -- available only in left table 
(tamil,(tamilnenjam@gmail.com,Some(tamilnenjam2017@gmail.com)))
(vijay,(vijaybalajitheboss@gmail.com,Some(vijaybalaji2001@gmail.com)))

scala>  rdd1.rightOuterJoin(rdd2).collect().foreach(println)
(aish,(Some(aishvaryasudha@gmail.com),aishvaryasara@gmail.com))
(maha,(None,mahalakshmi@gmail.com)) -- available only in right table 
(tamil,(Some(tamilnenjam@gmail.com),tamilnenjam2017@gmail.com))
(vijay,(Some(vijaybalajitheboss@gmail.com),vijaybalaji2001@gmail.com))



scala> rdd1.union(rdd2).collect().foreach(println)
(tamil,tamilnenjam@gmail.com)
(vijay,vijaybalajitheboss@gmail.com)
(aish,aishvaryasudha@gmail.com)
(raja,raja@gmail.com)
(maha,mahalakshmi@gmail.com)
(tamil,tamilnenjam2017@gmail.com)
(vijay,vijaybalaji2001@gmail.com)
(aish,aishvaryasara@gmail.com)



scala> val rdd1 = sc.makeRDD(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[45] at makeRDD at <console>:24

scala> val mappedRDD = rdd1.map( x => x*2)
mappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[46] at map at <console>:25

scala> mappedRDD.take(2)
res28: Array[Int] = Array(2, 4)

scala> mappedRDD.first
res29: Int = 2


scala> rdd1.count
res37: Long = 4


scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[72] at parallelize at <console>:24

scala> rdd1.top(3)
res39: Array[Int] = Array(10, 9, 8)

scala> rdd1.take(3)
res40: Array[Int] = Array(1, 2, 3)


// countByValue example 
scala> val rdd1 = sc.makeRDD(List(1,1,2,2,2,3,3,4,4,4,4,4,5,5,6,6,6,6,6,6,3,3,2,2,1,1,2,2,3,4,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[74] at makeRDD at <console>:24

scala> rdd1.countByValue()
res41: scala.collection.Map[Int,Long] = Map(5 -> 3, 1 -> 4, 6 -> 6, 2 -> 7, 3 -> 5, 4 -> 7)

// Reduce Examples
scala> rdd1.reduce(_+_)
res44: Int = 112

scala> rdd1.reduce(_*_)
res45: Int = 2013265920

scala> rdd1.reduce(_-_)
res46: Int = 44

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...