Thursday, 10 January 2019

Functional Programming in Scala

// Sum of the Array Elements
scala> val a1 = Array(1,2,3,4,5,6,7)
a1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7)

scala> var sum = 0
sum: Int = 0

scala> for (i <- 0 to a1.size - 1){
     | sum  = sum + a1(i)
     | }

scala> sum
res1: Int = 28

scala> a1.sum
res2: Int = 28

scala> a1.reduce(_+_)
res3: Int = 28

scala> a1.reduce( (x,y) => (x+y))
res4: Int = 28



scala> class A {
     | var id = 0
     | var name = ""
     | }
defined class A

scala> val n1 = new A
n1: A = A@7fa2473a

scala> n1.id = 20
n1.id: Int = 20

scala> n1.name="Spark"
n1.name: String = Spark

scala> n1
res5: A = A@7fa2473a

scala> println(n1.id + "\t" + n1.name)
20      Spark


// Find the even number of a given Array

scala> val a1 = Array(1,2,3,4,5,6,7)
a1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7)

// procedural programming
scala> for (i <- 0 to a1.size -1){
     | if (a1(i) % 2 == 0){
     | println("Even : " + a1(i))
     | }
     | }
Even : 2
Even : 4
Even : 6


//Functional programming
scala> a1.filter ( x => x % 2 == 0)
res2: Array[Int] = Array(2, 4, 6)

scala> a1.filter(_%2==0)
res3: Array[Int] = Array(2, 4, 6)





scala> val a1 = Array(1,2,3,4,5,6,7)
a1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7)

// Add 1 to each elements of an Array
scala> for (i <- 0 to a1.size - 1 by 1) {
     | a1(i) = a1(i) + 1
     | }

scala> a1
res5: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8)

scala> val a2 = Array(3,4,5,6,7,8)
a2: Array[Int] = Array(3, 4, 5, 6, 7, 8)

scala> a2.map ( x=> x + 1)
res6: Array[Int] = Array(4, 5, 6, 7, 8, 9)

// Add 10 to each elements of an Array
scala> a1.map(_+10)
res7: Array[Int] = Array(12, 13, 14, 15, 16, 17, 18)



scala> val r = Range(1,100,20)
r: scala.collection.immutable.Range = inexact Range 1 until 100 by 20

scala> r.map(x => x*x)
res11: scala.collection.immutable.IndexedSeq[Int] = Vector(1, 441, 1681, 3721,
561)

scala> r.map(x => x+x)
res12: scala.collection.immutable.IndexedSeq[Int] = Vector(2, 42, 82, 122, 162)




scala> def function6 = { (x:Int, y:Int) =>
     | val p = x+y
     | val q = x-y
     | val r = x*y
     | val s = x/y
     | p+q+r+s
     | }
function6: (Int, Int) => Int

scala> val result = function6(10,20)
result: Int = 220

scala> val result = function6(5,7)
result: Int = 45



scala> val a1 = Array("Spark is a big data technology","Hadoop is a big data tec
hnology", "Hadoop and Spark are big data technologies")
a1: Array[String] = Array(Spark is a big data technology, Hadoop is a big data t
echnology, Hadoop and Spark are big data technologies)

scala> a1.size
res13: Int = 3

scala> val a2 = a1.map(x => x.split(" "))
a2: Array[Array[String]] = Array(Array(Spark, is, a, big, data, technology), Arr
ay(Hadoop, is, a, big, data, technology), Array(Hadoop, and, Spark, are, big, da
ta, technologies))

scala> val a2 = a1.map(x => x.split(" ")).flatMap(x => x)
a2: Array[String] = Array(Spark, is, a, big, data, technology, Hadoop, is, a, bi
g, data, technology, Hadoop, and, Spark, are, big, data, technologies)



scala> val a2 = a1.flatMap(x => x.split(" "))
a2: Array[String] = Array(Spark, is, a, big, data, technology, Hadoop, i
g, data, technology, Hadoop, and, Spark, are, big, data, technologies)



scala> val a2 = a1.flatMap(x => x.split(" ")).map (x => (x,1))
a2: Array[(String, Int)] = Array((Spark,1), (is,1), (a,1), (big,1), (data,1), (t
echnology,1), (Hadoop,1), (is,1), (a,1), (big,1), (data,1), (technology,1), (Had
oop,1), (and,1), (Spark,1), (are,1), (big,1), (data,1), (technologies,1))


scala> val a3 = a2.groupBy(x => x._1)
a3: scala.collection.immutable.Map[String,Array[(String, Int)]] = Map(are -> Arr
ay((are,1)), is -> Array((is,1), (is,1)), big -> Array((big,1), (big,1), (big,1)
), data -> Array((data,1), (data,1), (data,1)), a -> Array((a,1), (a,1)), techno
logies -> Array((technologies,1)), technology -> Array((technology,1), (technolo
gy,1)), Spark -> Array((Spark,1), (Spark,1)), Hadoop -> Array((Hadoop,1), (Hadoo
p,1)), and -> Array((and,1)))


scala> val a3 = a2.groupBy(x => x._1).map(x => (x._1,x._2.size))
a3: scala.collection.immutable.Map[String,Int] = Map(are -> 1, is -> 2, big -> 3
, data -> 3, a -> 2, technologies -> 1, technology -> 2, Spark -> 2, Hadoop -> 2
, and -> 1)


scala> val a2 = a1.flatMap(x => x.split(" ")).map(x => (x,1)).groupBy(x => x._1)
.map(x => (x._1,x._2.size))
a2: scala.collection.immutable.Map[String,Int] = Map(are -> 1, is -> 2, big -> 3
, data -> 3, a -> 2, technologies -> 1, technology -> 2, Spark -> 2, Hadoop -> 2
, and -> 1)


scala> val x1 = List(2,3,4,5,6,7,8)
x1: List[Int] = List(2, 3, 4, 5, 6, 7, 8)

scala> x1.sum
res14: Int = 35

scala> x1.reduce( (x,y) => (x+y))
res15: Int = 35

scala> x1.reduce( (x,y) => (x-y))
res16: Int = -31

scala> x1.max
res17: Int = 8

scala> x1.min
res18: Int = 2

scala> x1.sum / x1.size
res19: Int = 5




scala> val x1 = List(1,2,3,4,5,6,7,8,9,10)
x1: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val y1 = List("a","b","c","d","e","f","g","h","i","j")
y1: List[String] = List(a, b, c, d, e, f, g, h, i, j)

scala> x1.zip(y1)
res20: List[(Int, String)] = List((1,a), (2,b), (3,c), (4,d), (5,e), (6,f), (7,g
), (8,h), (9,i), (10,j))

scala> y1.zip(x1)
res21: List[(String, Int)] = List((a,1), (b,2), (c,3), (d,4), (e,5), (f,6), (g,7
), (h,8), (i,9), (j,10))



scala> val r1 = sc.makeRDD(Array(1,2,3,4,5))
r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <cons
ole>:24

scala> val r2 = sc.makeRDD(Array("a","b","c","d","e"))
r2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at makeRDD at <c
onsole>:24

scala> r1.zip(r2).collect.foreach(println)
[Stage 0:>                                                          (0 + 0) / 4]
[Stage 0:>                                                          (0 + 4) / 4]

(1,a)
(2,b)
(3,c)
(4,d)
(5,e)


scala> r2.zip(r1).collect.foreach(println)
(a,1)
(b,2)
(c,3)
(d,4)
(e,5)


scala> val r1 = sc.parallelize(Array(1,2,3,4,5))
r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <
console>:24

scala> val r2 = sc.parallelize(Array("a","b","c","d","e"))
r2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize a
t <console>:24

scala> r1.zip(r2).collect.foreach(println)
(1,a)
(2,b)
(3,c)
(4,d)
(5,e)


scala> r2.zip(r1).collect.foreach(println)
(a,1)
(b,2)
(c,3)
(d,4)
(e,5)

scala> val r1 = Array(1,2,3,4,5)
r1: Array[Int] = Array(1, 2, 3, 4, 5)

scala> r1.zipWithIndex
res0: Array[(Int, Int)] = Array((1,0), (2,1), (3,2), (4,3), (5,4))


scala> val x1 = Array("a","b","c","d","e")
x1: Array[String] = Array(a, b, c, d, e)

scala> x1.zipWithIndex
res1: Array[(String, Int)] = Array((a,0), (b,1), (c,2), (d,3), (e,4))

// Read text lines from a file
sample.txt:
-----------
spark hadoop hadoop
spark scala hive spark
pig hive sqoop hadoop
flume kafka spark hadoop hive

scala> import scala.io.Source
import scala.io.Source

scala> val f1  = Source.fromFile("D:\\iEd\\sample.txt")
f1: scala.io.BufferedSource = <iterator>


scala> for (i <- f1.getLines()){
     | println(i)
     | }
spark hadoop hadoop
spark scala hive spark
pig hive sqoop hadoop
flume kafka spark hadoop hive

scala> import scala.io.Source
import scala.io.Source

scala> val f1  = Source.fromFile("D:\\iEd\\sample.txt")
f1: scala.io.BufferedSource = <iterator>

scala> for (i <- f1.getLines()){
     | val l = i.split(" ").map(x => (x,1)).groupBy(x => x._1).map(x => (x._1,x.
_2.size))
     | println(l.mkString(","))
     | }
hadoop -> 2,spark -> 1
spark -> 2,scala -> 1,hive -> 1
hadoop -> 1,sqoop -> 1,pig -> 1,hive -> 1
kafka -> 1,hadoop -> 1,spark -> 1,hive -> 1,flume -> 1



scala> val f2 = { (x:Int, y:Int) => x + y}
f2: (Int, Int) => Int = $$Lambda$1041/642087302@501957bf

scala> val f5 = { (a:Int, b:Int, c:(Int, Int) => Int) = c(a,b)}
<console>:1: error: ';' expected but '=' found.
       val f5 = { (a:Int, b:Int, c:(Int, Int) => Int) = c(a,b)}
                                                      ^

scala> val f5 = { (a:Int, b:Int, c:(Int, Int) => Int) => c(a,b)}
f5: (Int, Int, (Int, Int) => Int) => Int = $$Lambda$1065/1822045793@727986ad

scala> f5(3,4,f2)
res0: Int = 7


scala> val add = { (x:Int, y:Int) => x+y}
add: (Int, Int) => Int = $$Lambda$1072/440641678@51f34185

scala> val mul = { (x:Int, y:Int) => x * y}
mul: (Int, Int) => Int = $$Lambda$1073/2040509534@4f7ba0af

scala> val div = { (x:Int, y:Int) => x / y}
div: (Int, Int) => Int = $$Lambda$1074/1984032002@4ebed2b3

scala> val subt = { (x:Int, y:Int) => x - y}
subt: (Int, Int) => Int = $$Lambda$1075/146901982@6719f206


scala> add(3,4)
res1: Int = 7

scala> mul(3,3)
res2: Int = 9

scala> div(9,3)
res3: Int = 3

scala> subt(10,-3)
res6: Int = 13


// If any function which takes another function as input parameter,
the function is called higher order function

scala> val higherorder = {(a:Int, b:Int,c:(Int, Int) => Int) => c(a,b)}
higherorder: (Int, Int, (Int, Int) => Int) => Int = $$Lambda$1122/626010908@5dab
e7c8

scala> higherorder(3,2,add)
res7: Int = 5

scala> higherorder(3,2,mul)
res8: Int = 6

scala> higherorder(3,2,div)
res9: Int = 1

scala> higherorder(3,2,subt)
res10: Int = 1


// partial function
scala> def m2(x:Int)(y:Int):Int = {
     | x + y
     | }
m2: (x: Int)(y: Int)Int

scala> m2(4)(5)
res11: Int = 9

scala> m2(33)(34)
res12: Int = 67

scala> def m1(x:Int, y:Int):Int = {
     | x+y
     |
     | }
m1: (x: Int, y: Int)Int

scala> m1(3,3)
res13: Int = 6


higher order function
partial function
chaining functions



scala> class A{
     | val a:Int = 20
     | val b:Float = 2.2f
     | def m1(a:Int, b:Int):Int = { a + b }
     | def m2 = { (a:Int, b:Int) => a-b }
     | }
defined class A

scala> val obj = new A
obj: A = A@5a3cf878

scala> obj.m1(2,3)
res0: Int = 5

scala> obj.m2(2,3)
res1: Int = -1

scala> obj.m1(5,5)
res2: Int = 10

scala> obj.m2(33,22)
res3: Int = 11

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...