scala> val rdd = sc.parallelize(List(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val resultRDD = rdd.map ( x => x * x )
resultRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25
scala> resultRDD.collect()
res0: Array[Int] = Array(1, 4, 9, 16, 25)
scala> resultRDD.foreach(println)
9
4
1
16
25
// We have a file named : employee.txt
D:\\Ex\\employee.txt
John,33,US
Jim,44,UK
Nav,30,India
// Reading the file content and create RDD
scala> val rdd = sc.textFile("D:\\Ex\\employee.txt")
rdd: org.apache.spark.rdd.RDD[String] = D:\Ex\employee.txt MapPartitionsRDD[3] at textFile at <conso
// find the length of each line
scala> val resultRDD = rdd.map (x => (x, x.length))
resultRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:25
scala> resultRDD.foreach(println)
(John,33,US,10)
(Nav,30,India,12)
(Jim,44,UK,9)
scala> resultRDD.collect()
res3: Array[(String, Int)] = Array((John,33,US,10), (Jim,44,UK,9), (Nav,30,India,12))
scala> val rdd = sc.textFile("D:\\Ex\\employee.txt")
rdd: org.apache.spark.rdd.RDD[String] = D:\Ex\employee.txt MapPartitionsRDD[3] at textFile at <console>:24
scala> rdd.flatMap(x => x.split(",")).collect()
res4: Array[String] = Array(John, 33, US, Jim, 44, UK, Nav, 30, India)
scala> val resultRDD = rdd.flatMap(x => x.split(","))
resultRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at flatMap at <console>:25
scala> resultRDD.foreach(println)
Nav
30
India
John
33
US
Jim
44
UK
scala> resultRDD.collect()
res6: Array[String] = Array(John, 33, US, Jim, 44, UK, Nav, 30, India)
scala> val rdd = sc.makeRDD (List(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at makeRDD at <console>:24
scala> val resultRDD = rdd.filter( x => x%2 == 0)
resultRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at filter at <console>:25
scala> resultRDD.collect()
res7: Array[Int] = Array(2, 4)
scala> resultRDD.foreach(println)
4
2
scala> val rdd = sc.textFile("D:\\Ex\\employee.txt")
rdd: org.apache.spark.rdd.RDD[String] = D:\Ex\employee.txt MapPartitionsRDD[10] at textFile at <console>:24
scala> val filtered = rdd.filter ( line => line.split(",")(1).toInt < 40)
filtered: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at filter at <console>:25
scala> filtered.foreach(println)
Nav,30,India
John,33,US
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List(1,2,3,6,7))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:24
// intersection
scala> val resultRDD = rdd1.intersection(rdd2)
resultRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[20] at intersection at <console>:27
scala> resultRDD.collect()
res12: Array[Int] = Array(1, 2, 3)
scala> resultRDD.foreach(println)
2
3
1
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List(3,5))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24
// union
scala> val resultRDD = rdd1.union(rdd2)
resultRDD: org.apache.spark.rdd.RDD[Int] = UnionRDD[23] at union at <console>:27
scala> resultRDD.collect()
res14: Array[Int] = Array(1, 2, 3, 4, 5, 3, 5)
// Find the Distinct - Unique Values
scala> resultRDD.distinct().collect()
res21: Array[Int] = Array(1, 2, 3, 4, 5)
// Find the distinct
scala> val rdd1 = sc.parallelize(List(1,1,2,3,3,2,2,1,4,5,5,4,4,5,5,3,2,1,2,3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:2
scala> val resultRDD = rdd1.distinct()
resultRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[30] at distinct at <console>:25
scala> resultRDD.collect()
res22: Array[Int] = Array(4, 1, 5, 2, 3)
// Find the union
scala> val rdd1 = sc.makeRDD(List("Raja","Ramesh","Siva"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[32] at makeRDD at <console>:24
scala> val rdd2 = sc.parallelize(List("Arun","Viswa","Kalai"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[33] at parallelize at <console>:24
scala> val resultRDD = rdd1.union(rdd2)
resultRDD: org.apache.spark.rdd.RDD[String] = UnionRDD[34] at union at <console>:27
scala> resultRDD.foreach(println)
Raja
Siva
Ramesh
Kalai
Viswa
Arun
birth.txt:
-----------
Dec,Arun
March,Kalai
April,Nila
April,John
Dec,Silva
January,Priya
Dec,Ashwin
January,Ohm
March,Sastha
January,Ashwath
April,Agila
April,Alex
January,Sridevi
Dec,Sridhar
March,Neelaveni
scala> val rdd1 = sc.textFile("D:\\Ex\\birth.txt")
rdd1: org.apache.spark.rdd.RDD[String] = D:\Ex\birth.txt MapPartitionsRDD[36] at textFile at <console>:24
case class birth (month:String, name:String)
val pairRDD = rdd1.map { line =>
val w = line.split(",")
val month = w(0)
val name = w(1)
(month,name)
}
scala> pairRDD.collect()
res27: Array[(String, String)] = Array((Dec,Arun), (March,Kalai), (April,Nila), (April,John), (Dec,Silva), (
scala> pairRDD.foreach(println)
(Dec,Arun)
(March,Kalai)
(April,Nila)
(April,John)
(January,Ashwath)
(Dec,Silva)
(April,Agila)
(January,Priya)
(April,Alex)
(Dec,Ashwin)
(January,Sridevi)
(January,Ohm)
(Dec,Sridhar)
(March,Sastha)
(March,Neelaveni)
scala> val groupedRDD = pairRDD.groupByKey()
groupedRDD: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[38] at groupByKey at <console
scala> groupedRDD.collect()
res29: Array[(String, Iterable[String])] = Array((April,CompactBuffer(Nila, John, Agila, Alex)), (January,Co
scala> groupedRDD.foreach(println)
(March,CompactBuffer(Kalai, Sastha, Neelaveni))
(April,CompactBuffer(Nila, John, Agila, Alex))
(January,CompactBuffer(Priya, Ohm, Ashwath, Sridevi))
(Dec,CompactBuffer(Arun, Silva, Ashwin, Sridhar))
// SortByKey Example
val rdd1 = sc.textFile("D:\\Ex\\birth.txt")
case class birth (month:String, name:String)
val pairRDD = rdd1.map { line =>
val w = line.split(",")
val month = w(0)
val name = w(1)
(month,name)
}
val sortedRDD = pairRDD.sortByKey()
scala> sortedRDD.collect()
res7: Array[(String, String)] = Array((April,Nila), (April,John), (April,Agila), (April,Alex), (Dec,Arun), (Dec,Silva), (Dec,Ashwin), (Dec,Sridhar), (January,Priya), (January,Ohm)
scala> sortedRDD.foreach(println)
(April,Nila)
(January,Priya)
(January,Ohm)
(April,John)
(January,Ashwath)
(April,Agila)
(January,Sridevi)
(April,Alex)
(March,Kalai)
(Dec,Arun)
(March,Sastha)
(Dec,Silva)
(March,Neelaveni)
(Dec,Ashwin)
(Dec,Sridhar)
scala> groupedRDD.toDebugString
res31: String =
(2) ShuffledRDD[38] at groupByKey at <console>:25 []
+-(2) MapPartitionsRDD[37] at map at <console>:25 []
| D:\Ex\birth.txt MapPartitionsRDD[36] at textFile at <console>:24 []
| D:\Ex\birth.txt HadoopRDD[35] at textFile at <console>:24 []
// ReduceByKey Example
veggies.txt:
------------
Tomato,10
Brinjal,20
Tomato,20
Brinjal,33
Carrot,23
Potato,20
Carrot,50
Potato,23
Brinjal,22
Potato,20
Carrot,23
scala> val rdd1 = sc.textFile("D:\\Ex\\veggies.txt")
rdd1: org.apache.spark.rdd.RDD[String] = D:\Ex\veggies.txt MapPartitionsRDD[40] at textFile at <console>:24
scala> case class veggies (name:String, price:Int)
defined class veggies
scala> val pairRDD = rdd1.map{ line =>
val w = line.split(",")
val name = w(0)
val price = w(1).toInt
(name,price)
}
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[41] at map at <console>:25
scala> val reducedRDD = pairRDD.reduceByKey( (x,y) => (x + y))
reducedRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[42] at reduceByKey at <console>:2
OR
scala> val reducedRDD = pairRDD.reduceByKey(_ + _)
reducedRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[43] at reduceByKey at <console>:2
scala> reducedRDD.foreach(println)
(Brinjal,75)
(Potato,63)
(Tomato,30)
(Carrot,96)
scala> reducedRDD.sortByKey().collect().foreach(println)
(Brinjal,75)
(Carrot,96)
(Potato,63)
(Tomato,30)
// Reduce Vs Sum
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[44] at parallelize at <console>:24
// Scala function - shuffling then sum
scala> val summation = rdd1.sum()
summation: Double = 55.0
// sum then shuffling - data movement between nodes are less
scala> val summ = rdd1.reduce(_+_)
summ: Int = 55
students_roll.txt:
------------------
10,Ravi
11,Rahul
9,Lisa
7,Radha
5,Binal
6,Dev
8,Raj
1,Sri
3,Kalai
2,Arul
4,Priya
scala> val rdd1 = sc.textFile("D:\\Ex\\students_roll.txt")
rdd1: org.apache.spark.rdd.RDD[String] = D:\Ex\students_roll.txt MapPartitionsRDD[38] at textFile at <c
scala> case class student(rollno:Int, name:String)
defined class student
val pairRDD = rdd1.map { line =>
val w = line.split(",")
val rollno = w(0).toInt
val name = w(1)
(rollno, name)
}
scala> pairRDD.collect()
res21: Array[(Int, String)] = Array((10,Ravi), (11,Rahul), (9,Lisa), (7,Radha), (5,Binal), (6,Dev), (8,
// Ascending Order
scala> pairRDD.sortByKey(fale).collect().foreach(println)
(1,Sri)
(2,Arul)
(3,Kalai)
(4,Priya)
(5,Binal)
(6,Dev)
(7,Radha)
(8,Raj)
(9,Lisa)
(10,Ravi)
(11,Rahul)
// Descending Order
scala> pairRDD.sortByKey(false).collect().foreach(println)
(11,Rahul)
(10,Ravi)
(9,Lisa)
(8,Raj)
(7,Radha)
(6,Dev)
(5,Binal)
(4,Priya)
(3,Kalai)
(2,Arul)
(1,Sri)
// SortBy Example
scala> pairRDD.sortBy(x => x._2).collect().foreach(println)
(2,Arul)
(5,Binal)
(6,Dev)
(3,Kalai)
(9,Lisa)
(4,Priya)
(7,Radha)
(11,Rahul)
(8,Raj)
(10,Ravi)
(1,Sri)
scala> pairRDD.sortBy(x => x._1).collect().foreach(println)
(1,Sri)
(2,Arul)
(3,Kalai)
(4,Priya)
(5,Binal)
(6,Dev)
(7,Radha)
(8,Raj)
(9,Lisa)
(10,Ravi)
(11,Rahul)
scala> pairRDD.sortBy(x => x._1,false).collect().foreach(println)
(11,Rahul)
(10,Ravi)
(9,Lisa)
(8,Raj)
(7,Radha)
(6,Dev)
(5,Binal)
(4,Priya)
(3,Kalai)
(2,Arul)
(1,Sri)
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd1.partitions.size
res0: Int = 4
scala> val rdd2 = sc.parallelize(List(1,2,3,4,5,6),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd2.partitions.size
res1: Int = 2
scala> rdd1.getNumPartitions
res2: Int = 4
scala> rdd2.getNumPartitions
res3: Int = 2
scala> val rdd1 = sc.textFile("D:\\Ex\\students_roll.txt")
rdd1: org.apache.spark.rdd.RDD[String] = D:\Ex\students_roll.txt MapPartitionsRDD[3] at textFi
scala> rdd1.getNumPartitions
res4: Int = 2
scala> val rdd1 = sc.textFile("D:\\Ex\\students_roll.txt",3)
rdd1: org.apache.spark.rdd.RDD[String] = D:\Ex\students_roll.txt MapPartitionsRDD[7] at textFi
scala> rdd1.getNumPartitions
res6: Int = 3
scala> rdd1.partitions.size
res7: Int = 3
scala> rdd1.partitions.length
res8: Int = 3
scala> val resultRDD = rdd1.mapPartitions(x => List(x.toList).iterator)
resultRDD: org.apache.spark.rdd.RDD[List[String]] = MapPartitionsRDD[8] at mapPartitions at <console>:25
scala> resultRDD.collect
res9: Array[List[String]] = Array(List(10,Ravi, 11,Rahul, 9,Lisa, 7,Radha), List(5,Binal, 6,Dev, 8,Raj, 1,Sri), List(3,Kalai
scala> resultRDD.collect().foreach(println)
List(10,Ravi, 11,Rahul, 9,Lisa, 7,Radha)
List(5,Binal, 6,Dev, 8,Raj, 1,Sri)
List(3,Kalai, 2,Arul, 4,Priya)
Coalese Example:
// We can reduce the number of partitions using Coalesce. We can't increase the number of partitions
scala> val rdd1 = sc.parallelize(List(1,1,2,2,3,4,5,6,7,8,9,10),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24
scala> rdd1.partitions.size
res11: Int = 3
scala> rdd1.getNumPartitions
res12: Int = 3
scala> val coalescedRDD = rdd1.coalesce(1)
coalescedRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[10] at coalesce at <console>:25
scala> coalescedRDD.getNumPartitions
res13: Int = 1
Repartition Example:
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <conso
scala> rdd1.getNumPartitions
res17: Int = 2
// Increasing the number of partitions
scala> val rdd2 = rdd1.repartition(5)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[24] at repartition at <console>:2
scala> rdd2.getNumPartitions
res20: Int = 5
// Decreasing the number of partitions
scala> val rdd2 = rdd1.repartition(1)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[28] at repartition at <console>:2
scala> rdd2.partitions.size
res21: Int = 1
// Not applicable to increase the partitions using Coalesce
scala> val rdd3 = rdd1.coalesce(10)
rdd3: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[29] at coalesce at <console>:25
scala> rdd3.getNumPartitions
res22: Int = 2
val rdd1 = sc.parallelize(Array(("tamil","tamilnenjam@gmail.com"),("vijay","vijaybalajitheboss@gmail.com"),("aish","aishvaryasudha@gmail.com"),("raja","raja@gmail.com")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[30] at parallelize at <console>:24
val rdd2 = sc.parallelize(Array(("maha","mahalakshmi@gmail.com"),("tamil","tamilnenjam2017@gmail.com"),("vijay","vijaybalaji2001@gmail.com"),("aish","aishvaryasara@gmail.com")))
scala> val joinedRDD = rdd1.join(rdd2)
joinedRDD: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[37] at join at <console>:27
scala> joinedRDD.collect.foreach(println)
(aish,(aishvaryasudha@gmail.com,aishvaryasara@gmail.com))
(tamil,(tamilnenjam@gmail.com,tamilnenjam2017@gmail.com))
(vijay,(vijaybalajitheboss@gmail.com,vijaybalaji2001@gmail.com))
scala> rdd1.leftOuterJoin(rdd2).collect().foreach(println)
(aish,(aishvaryasudha@gmail.com,Some(aishvaryasara@gmail.com)))
(raja,(raja@gmail.com,None)) -- available only in left table
(tamil,(tamilnenjam@gmail.com,Some(tamilnenjam2017@gmail.com)))
(vijay,(vijaybalajitheboss@gmail.com,Some(vijaybalaji2001@gmail.com)))
scala> rdd1.rightOuterJoin(rdd2).collect().foreach(println)
(aish,(Some(aishvaryasudha@gmail.com),aishvaryasara@gmail.com))
(maha,(None,mahalakshmi@gmail.com)) -- available only in right table
(tamil,(Some(tamilnenjam@gmail.com),tamilnenjam2017@gmail.com))
(vijay,(Some(vijaybalajitheboss@gmail.com),vijaybalaji2001@gmail.com))
scala> rdd1.union(rdd2).collect().foreach(println)
(tamil,tamilnenjam@gmail.com)
(vijay,vijaybalajitheboss@gmail.com)
(aish,aishvaryasudha@gmail.com)
(raja,raja@gmail.com)
(maha,mahalakshmi@gmail.com)
(tamil,tamilnenjam2017@gmail.com)
(vijay,vijaybalaji2001@gmail.com)
(aish,aishvaryasara@gmail.com)
scala> val rdd1 = sc.makeRDD(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[45] at makeRDD at <console>:24
scala> val mappedRDD = rdd1.map( x => x*2)
mappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[46] at map at <console>:25
scala> mappedRDD.take(2)
res28: Array[Int] = Array(2, 4)
scala> mappedRDD.first
res29: Int = 2
scala> rdd1.count
res37: Long = 4
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[72] at parallelize at <console>:24
scala> rdd1.top(3)
res39: Array[Int] = Array(10, 9, 8)
scala> rdd1.take(3)
res40: Array[Int] = Array(1, 2, 3)
// countByValue example
scala> val rdd1 = sc.makeRDD(List(1,1,2,2,2,3,3,4,4,4,4,4,5,5,6,6,6,6,6,6,3,3,2,2,1,1,2,2,3,4,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[74] at makeRDD at <console>:24
scala> rdd1.countByValue()
res41: scala.collection.Map[Int,Long] = Map(5 -> 3, 1 -> 4, 6 -> 6, 2 -> 7, 3 -> 5, 4 -> 7)
// Reduce Examples
scala> rdd1.reduce(_+_)
res44: Int = 112
scala> rdd1.reduce(_*_)
res45: Int = 2013265920
scala> rdd1.reduce(_-_)
res46: Int = 44