Showing posts with label core. Show all posts
Showing posts with label core. Show all posts

Monday, 10 August 2020

Customers and Orders - Table Joining using Spark RDD Example

scala> val custRDD = sc.textFile("hdfs://localhost:8020/user/data/customers/")
custRDD: org.apache.spark.rdd.RDD[String] = hdfs://localhost:8020/user/data/customers/ MapPartitionsRDD[7] at textFile at <console>:27

scala> custRDD.take(5).foreach(println)
1 Richard Hernandez Brownsville
2 Mary Barrett Littleton
3 Ann Smith Caguas
4 Mary Jones San Marcos
5 Robert Hudson Caguas

scala> custRDD.count()
res4: Long = 12435



scala> val ordersRDD = sc.textFile("hdfs://localhost:8020/user/data/orders/")
ordersRDD: org.apache.spark.rdd.RDD[String] = hdfs://localhost:8020/user/data/orders/ MapPartitionsRDD[9] at textFile at <console>:27

scala> ordersRDD.take(5).foreach(println)
1 2013-07-25 00:00:00.0 11599 CLOSED
2 2013-07-25 00:00:00.0 256 PENDING_PAYMENT
3 2013-07-25 00:00:00.0 12111 COMPLETE
4 2013-07-25 00:00:00.0 8827 CLOSED
5 2013-07-25 00:00:00.0 11318 COMPLETE


scala> ordersRDD.count()
res5: Long = 68893


scala> val orderTuple = ordersRDD.map (line => (line.split("\t")(2),1))
orderTuple: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[10] at map at <console>:29

scala> orderTuple.take(5)
res9: Array[(String, Int)] = Array((11599,1), (256,1), (12111,1), (8827,1), (11318,1))

scala> orderTuple.take(5).foreach(println)
(11599,1)
(256,1)
(12111,1)
(8827,1)
(11318,1)

 

scala> val reducedRDD = orderTuple.reduceByKey(_+_)
reducedRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at reduceByKey at <console>:31

scala> reducedRDD.take(5).foreach(println)
(2828,4)                                                                        
(2350,5)
(8602,8)
(11415,4)
(5302,10)

val custTuple = custRDD.map { line => \
val w = line.split("\t")  
val custid = w(0)  
val cfname = w(1)  
val clname = w(2)  
(custid,cfname + " " + clname) 
}
custTuple.take(5).foreach(println)

scala> custTuple.take(5).foreach(println)
(1,Richard Hernandez)
(2,Mary Barrett)
(3,Ann Smith)
(4,Mary Jones)
(5,Robert Hudson)

scala> val joinedRDD = custTuple.join(reducedRDD)
joinedRDD: org.apache.spark.rdd.RDD[(String, (String, Int))] = MapPartitionsRDD[15] at join at <console>:37

scala> joinedRDD.take(5).foreach(println)
(2828,(Mary Smith,4))
(2350,(Brian Perez,5))
(8602,(Timothy Gilbert,8))
(11415,(George Smith,4))
(5302,(Betty Smith,10))


scala> val sortedRDD = joinedRDD.sortBy(_._2._2,false)
sortedRDD: org.apache.spark.rdd.RDD[(String, (String, Int))] = MapPartitionsRDD[20] at sortBy at <console>:39

scala> sortedRDD.take(5).foreach(println)
(569,(Mary Frye,16))
(5897,(Mary Griffin,16))
(12431,(Mary Rios,16))
(6316,(Kyle Smith,16))
(5654,(Jerry Smith,15))


Find the population of each city using Spark RDD

Populattion of each city

customer.txt:
-------------
1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521
2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126
3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,00725
4,Mary,Jones,XXXXXXXXX,XXXXXXXXX,8324 Little Common,San Marcos,CA,92069
5,Robert,Hudson,XXXXXXXXX,XXXXXXXXX,10 Crystal River Mall ,Caguas,PR,00725
6,Mary,Smith,XXXXXXXXX,XXXXXXXXX,3151 Sleepy Quail Promenade,Passaic,NJ,07055
7,Melissa,Wilcox,XXXXXXXXX,XXXXXXXXX,9453 High Concession,Caguas,PR,00725
8,Megan,Smith,XXXXXXXXX,XXXXXXXXX,3047 Foggy Forest Plaza,Lawrence,MA,01841
9,Mary,Perez,XXXXXXXXX,XXXXXXXXX,3616 Quaking Street,Caguas,PR,00725
10,Melissa,Smith,XXXXXXXXX,XXXXXXXXX,8598 Harvest Beacon Plaza,Stafford,VA,22554

val custRDD = sc.textFile("D:\\Ex\\customer.txt")
val cityRDD = custRDD.map ( x => (x.split(",")(6),1))
val cityCountRDD = cityRDD.reduceByKey(_+_)
val sortedRDD = cityCountRDD.sortBy(x => x._2, false)
sortedRDD.take(10).foreach(println)


scala> val custRDD = sc.textFile("D:\\Ex\\customer.txt")
custRDD: org.apache.spark.rdd.RDD[String] = D:\Ex\customer.txt MapPartitionsRDD[26] at textFile at <console>:24

scala> val cityRDD = custRDD.map ( x => (x.split(",")(6),1))
cityRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[27] at map at <console>:25

scala> val cityCountRDD = cityRDD.reduceByKey(_+_)
cityCountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[28] at reduceByKey at <console>:25

scala> val sortedRDD = cityCountRDD.sortBy(x => x._2, false)
sortedRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[33] at sortBy at <console>:25

scala> sortedRDD.take(10).foreach(println)
(Caguas,1152)
(Chicago,62)
(Los Angeles,59)
(Brooklyn,47)
(Bronx,28)
(Las Vegas,27)
(Houston,26)
(Phoenix,24)
(Philadelphia,24)
(San Diego,23)


Finding Frequency of word in Text File using Spark RDD

Frequency of word in text file

customer.txt: 
-------------
1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521
2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126
3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,00725
4,Mary,Jones,XXXXXXXXX,XXXXXXXXX,8324 Little Common,San Marcos,CA,92069
5,Robert,Hudson,XXXXXXXXX,XXXXXXXXX,10 Crystal River Mall ,Caguas,PR,00725
6,Mary,Smith,XXXXXXXXX,XXXXXXXXX,3151 Sleepy Quail Promenade,Passaic,NJ,07055
7,Melissa,Wilcox,XXXXXXXXX,XXXXXXXXX,9453 High Concession,Caguas,PR,00725
8,Megan,Smith,XXXXXXXXX,XXXXXXXXX,3047 Foggy Forest Plaza,Lawrence,MA,01841
9,Mary,Perez,XXXXXXXXX,XXXXXXXXX,3616 Quaking Street,Caguas,PR,00725
10,Melissa,Smith,XXXXXXXXX,XXXXXXXXX,8598 Harvest Beacon Plaza,Stafford,VA,22554

val custRDD = sc.textFile("D:\\Ex\\customer.txt")
val words = custRDD.flatMap(x => x.split(",")).filter(x => x != "XXXXXXXXX")
val pairRDD = words.map(x => (x,1))
val reducedRDD = pairRDD.reduceByKey(_+_)
val sortedRDD = reducedRDD.sortBy(x => x._2, false)
sortedRDD.take(10).foreach(println)


scala> val custRDD = sc.textFile("D:\\Ex\\customer.txt")
custRDD: org.apache.spark.rdd.RDD[String] = D:\Ex\customer.txt MapPartitionsRDD[6] at textFile at <console>:24

scala> val words = custRDD.flatMap(x => x.split(",")).filter(x => x != "XXXXXXXXX")
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at filter at <console>:25

scala> val pairRDD = words.map(x => (x,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[9] at map at <console>:25

scala> val reducedRDD = pairRDD.reduceByKey(_+_)
reducedRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[10] at reduceByKey at <console>:25

scala> val sortedRDD = reducedRDD.sortBy(x => x._2, false)
sortedRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at sortBy at <console>:25

scala> sortedRDD.take(10).foreach(println)
(Mary,1196)
(PR,1186)
(Smith,1160)
(Caguas,1152)
(00725,1152)
(CA,505)
(TX,183)
(NY,168)
(IL,123)
(FL,73)



Spark RDD programming using Scala - Spark Core Examples

scala> val rdd = sc.parallelize(List(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val resultRDD = rdd.map ( x => x * x )
resultRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25

scala> resultRDD.collect()
res0: Array[Int] = Array(1, 4, 9, 16, 25)

scala> resultRDD.foreach(println)
9
4
1
16
25

// We have a file named : employee.txt 
D:\\Ex\\employee.txt

John,33,US
Jim,44,UK
Nav,30,India

// Reading the file content and create RDD
scala> val rdd = sc.textFile("D:\\Ex\\employee.txt")
rdd: org.apache.spark.rdd.RDD[String] = D:\Ex\employee.txt MapPartitionsRDD[3] at textFile at <conso

// find the length of each line
scala> val resultRDD = rdd.map (x => (x, x.length))
resultRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:25

scala> resultRDD.foreach(println)
(John,33,US,10)
(Nav,30,India,12)
(Jim,44,UK,9)

scala> resultRDD.collect()
res3: Array[(String, Int)] = Array((John,33,US,10), (Jim,44,UK,9), (Nav,30,India,12))





scala> val rdd = sc.textFile("D:\\Ex\\employee.txt")
rdd: org.apache.spark.rdd.RDD[String] = D:\Ex\employee.txt MapPartitionsRDD[3] at textFile at <console>:24

scala> rdd.flatMap(x => x.split(",")).collect()
res4: Array[String] = Array(John, 33, US, Jim, 44, UK, Nav, 30, India)

scala> val resultRDD =  rdd.flatMap(x => x.split(","))
resultRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at flatMap at <console>:25

scala> resultRDD.foreach(println)
Nav
30
India
John
33
US
Jim
44
UK

scala> resultRDD.collect()
res6: Array[String] = Array(John, 33, US, Jim, 44, UK, Nav, 30, India)











scala> val rdd = sc.makeRDD (List(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at makeRDD at <console>:24

scala> val resultRDD = rdd.filter( x => x%2 == 0)
resultRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at filter at <console>:25

scala> resultRDD.collect()
res7: Array[Int] = Array(2, 4)

scala> resultRDD.foreach(println)
4
2


scala> val rdd = sc.textFile("D:\\Ex\\employee.txt")
rdd: org.apache.spark.rdd.RDD[String] = D:\Ex\employee.txt MapPartitionsRDD[10] at textFile at <console>:24

scala> val filtered = rdd.filter ( line => line.split(",")(1).toInt < 40)
filtered: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at filter at <console>:25

scala> filtered.foreach(println)
Nav,30,India
John,33,US



scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(1,2,3,6,7))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:24


// intersection
scala> val resultRDD = rdd1.intersection(rdd2)
resultRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[20] at intersection at <console>:27

scala> resultRDD.collect()
res12: Array[Int] = Array(1, 2, 3)

scala> resultRDD.foreach(println)
2
3
1


scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(3,5))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24

// union
scala> val resultRDD = rdd1.union(rdd2)
resultRDD: org.apache.spark.rdd.RDD[Int] = UnionRDD[23] at union at <console>:27

scala> resultRDD.collect()
res14: Array[Int] = Array(1, 2, 3, 4, 5, 3, 5)

// Find the Distinct - Unique Values
scala> resultRDD.distinct().collect()
res21: Array[Int] = Array(1, 2, 3, 4, 5)



// Find the distinct 
scala> val rdd1 = sc.parallelize(List(1,1,2,3,3,2,2,1,4,5,5,4,4,5,5,3,2,1,2,3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:2

scala> val resultRDD = rdd1.distinct()
resultRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[30] at distinct at <console>:25

scala> resultRDD.collect()
res22: Array[Int] = Array(4, 1, 5, 2, 3)




// Find the union
scala> val rdd1 = sc.makeRDD(List("Raja","Ramesh","Siva"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[32] at makeRDD at <console>:24

scala> val rdd2 = sc.parallelize(List("Arun","Viswa","Kalai"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[33] at parallelize at <console>:24

scala> val resultRDD = rdd1.union(rdd2)
resultRDD: org.apache.spark.rdd.RDD[String] = UnionRDD[34] at union at <console>:27

scala> resultRDD.foreach(println)
Raja
Siva
Ramesh
Kalai
Viswa
Arun

birth.txt:
-----------
Dec,Arun
March,Kalai
April,Nila
April,John
Dec,Silva
January,Priya
Dec,Ashwin
January,Ohm
March,Sastha
January,Ashwath
April,Agila
April,Alex
January,Sridevi
Dec,Sridhar
March,Neelaveni


scala> val rdd1 = sc.textFile("D:\\Ex\\birth.txt")
rdd1: org.apache.spark.rdd.RDD[String] = D:\Ex\birth.txt MapPartitionsRDD[36] at textFile at <console>:24

case class birth (month:String, name:String)

val pairRDD = rdd1.map { line => 
              val w = line.split(",")
  val month = w(0)
  val name = w(1)
  (month,name)
}


scala> pairRDD.collect()
res27: Array[(String, String)] = Array((Dec,Arun), (March,Kalai), (April,Nila), (April,John), (Dec,Silva), (

scala> pairRDD.foreach(println)
(Dec,Arun)
(March,Kalai)
(April,Nila)
(April,John)
(January,Ashwath)
(Dec,Silva)
(April,Agila)
(January,Priya)
(April,Alex)
(Dec,Ashwin)
(January,Sridevi)
(January,Ohm)
(Dec,Sridhar)
(March,Sastha)
(March,Neelaveni)

scala> val groupedRDD = pairRDD.groupByKey()
groupedRDD: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[38] at groupByKey at <console

scala> groupedRDD.collect()
res29: Array[(String, Iterable[String])] = Array((April,CompactBuffer(Nila, John, Agila, Alex)), (January,Co

scala> groupedRDD.foreach(println)
(March,CompactBuffer(Kalai, Sastha, Neelaveni))
(April,CompactBuffer(Nila, John, Agila, Alex))
(January,CompactBuffer(Priya, Ohm, Ashwath, Sridevi))
(Dec,CompactBuffer(Arun, Silva, Ashwin, Sridhar))



 


// SortByKey Example

val rdd1 = sc.textFile("D:\\Ex\\birth.txt")
 
case class birth (month:String, name:String)

val pairRDD = rdd1.map { line => 
              val w = line.split(",")
  val month = w(0)
  val name = w(1)
  (month,name)
}
val sortedRDD = pairRDD.sortByKey()
 
scala> sortedRDD.collect()
res7: Array[(String, String)] = Array((April,Nila), (April,John), (April,Agila), (April,Alex), (Dec,Arun), (Dec,Silva), (Dec,Ashwin), (Dec,Sridhar), (January,Priya), (January,Ohm)


scala> sortedRDD.foreach(println)
(April,Nila)
(January,Priya)
(January,Ohm)
(April,John)
(January,Ashwath)
(April,Agila)
(January,Sridevi)
(April,Alex)
(March,Kalai)
(Dec,Arun)
(March,Sastha)
(Dec,Silva)
(March,Neelaveni)
(Dec,Ashwin)
(Dec,Sridhar)


scala> groupedRDD.toDebugString
res31: String =
(2) ShuffledRDD[38] at groupByKey at <console>:25 []
 +-(2) MapPartitionsRDD[37] at map at <console>:25 []
    |  D:\Ex\birth.txt MapPartitionsRDD[36] at textFile at <console>:24 []
    |  D:\Ex\birth.txt HadoopRDD[35] at textFile at <console>:24 []
// ReduceByKey Example
 
veggies.txt:
------------
Tomato,10
Brinjal,20
Tomato,20
Brinjal,33
Carrot,23
Potato,20
Carrot,50
Potato,23
Brinjal,22
Potato,20
Carrot,23


scala> val rdd1 = sc.textFile("D:\\Ex\\veggies.txt")
rdd1: org.apache.spark.rdd.RDD[String] = D:\Ex\veggies.txt MapPartitionsRDD[40] at textFile at <console>:24

scala> case class veggies (name:String, price:Int)
defined class veggies

scala> val pairRDD = rdd1.map{ line =>
       val w = line.split(",")
       val name = w(0)
       val price = w(1).toInt
       (name,price)
       }
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[41] at map at <console>:25

scala> val reducedRDD = pairRDD.reduceByKey( (x,y) => (x + y))
reducedRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[42] at reduceByKey at <console>:2

OR

scala> val reducedRDD = pairRDD.reduceByKey(_ + _)
reducedRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[43] at reduceByKey at <console>:2


scala> reducedRDD.foreach(println)
(Brinjal,75)
(Potato,63)
(Tomato,30)
(Carrot,96)


scala>  reducedRDD.sortByKey().collect().foreach(println)
(Brinjal,75)
(Carrot,96)
(Potato,63)
(Tomato,30)

// Reduce Vs Sum
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[44] at parallelize at <console>:24

// Scala function - shuffling then sum
scala> val summation = rdd1.sum()
summation: Double = 55.0

// sum then shuffling - data movement between nodes are less
scala> val summ = rdd1.reduce(_+_)
summ: Int = 55






students_roll.txt:
------------------
10,Ravi
11,Rahul
9,Lisa
7,Radha
5,Binal
6,Dev
8,Raj
1,Sri
3,Kalai
2,Arul
4,Priya


scala> val rdd1 = sc.textFile("D:\\Ex\\students_roll.txt")
rdd1: org.apache.spark.rdd.RDD[String] = D:\Ex\students_roll.txt MapPartitionsRDD[38] at textFile at <c

scala> case class student(rollno:Int, name:String)
defined class student

 val pairRDD = rdd1.map { line =>
 val w = line.split(",")
 val rollno = w(0).toInt
 val name = w(1)
 (rollno, name)
 }
 
 
 scala> pairRDD.collect()
res21: Array[(Int, String)] = Array((10,Ravi), (11,Rahul), (9,Lisa), (7,Radha), (5,Binal), (6,Dev), (8,


// Ascending Order
scala> pairRDD.sortByKey(fale).collect().foreach(println)
(1,Sri)
(2,Arul)
(3,Kalai)
(4,Priya)
(5,Binal)
(6,Dev)
(7,Radha)
(8,Raj)
(9,Lisa)
(10,Ravi)
(11,Rahul)

// Descending Order
scala> pairRDD.sortByKey(false).collect().foreach(println)
(11,Rahul)
(10,Ravi)
(9,Lisa)
(8,Raj)
(7,Radha)
(6,Dev)
(5,Binal)
(4,Priya)
(3,Kalai)
(2,Arul)
(1,Sri)

// SortBy Example
scala> pairRDD.sortBy(x => x._2).collect().foreach(println)
(2,Arul)
(5,Binal)
(6,Dev)
(3,Kalai)
(9,Lisa)
(4,Priya)
(7,Radha)
(11,Rahul)
(8,Raj)
(10,Ravi)
(1,Sri)

scala> pairRDD.sortBy(x => x._1).collect().foreach(println)
(1,Sri)
(2,Arul)
(3,Kalai)
(4,Priya)
(5,Binal)
(6,Dev)
(7,Radha)
(8,Raj)
(9,Lisa)
(10,Ravi)
(11,Rahul)


scala> pairRDD.sortBy(x => x._1,false).collect().foreach(println)
(11,Rahul)
(10,Ravi)
(9,Lisa)
(8,Raj)
(7,Radha)
(6,Dev)
(5,Binal)
(4,Priya)
(3,Kalai)
(2,Arul)
(1,Sri)






scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd1.partitions.size
res0: Int = 4

scala> val rdd2 = sc.parallelize(List(1,2,3,4,5,6),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> rdd2.partitions.size
res1: Int = 2

scala> rdd1.getNumPartitions
res2: Int = 4

scala> rdd2.getNumPartitions
res3: Int = 2


scala> val rdd1 = sc.textFile("D:\\Ex\\students_roll.txt")
rdd1: org.apache.spark.rdd.RDD[String] = D:\Ex\students_roll.txt MapPartitionsRDD[3] at textFi

scala> rdd1.getNumPartitions
res4: Int = 2


scala> val rdd1 = sc.textFile("D:\\Ex\\students_roll.txt",3)
rdd1: org.apache.spark.rdd.RDD[String] = D:\Ex\students_roll.txt MapPartitionsRDD[7] at textFi

scala> rdd1.getNumPartitions
res6: Int = 3

scala> rdd1.partitions.size
res7: Int = 3

scala> rdd1.partitions.length
res8: Int = 3


scala> val resultRDD = rdd1.mapPartitions(x => List(x.toList).iterator)
resultRDD: org.apache.spark.rdd.RDD[List[String]] = MapPartitionsRDD[8] at mapPartitions at <console>:25

scala> resultRDD.collect
res9: Array[List[String]] = Array(List(10,Ravi, 11,Rahul, 9,Lisa, 7,Radha), List(5,Binal, 6,Dev, 8,Raj, 1,Sri), List(3,Kalai


scala> resultRDD.collect().foreach(println)
List(10,Ravi, 11,Rahul, 9,Lisa, 7,Radha)
List(5,Binal, 6,Dev, 8,Raj, 1,Sri)
List(3,Kalai, 2,Arul, 4,Priya)



Coalese Example:  
// We can reduce the number of partitions using Coalesce. We can't increase the number of partitions

scala> val rdd1 = sc.parallelize(List(1,1,2,2,3,4,5,6,7,8,9,10),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24

scala> rdd1.partitions.size
res11: Int = 3

scala> rdd1.getNumPartitions
res12: Int = 3

scala> val coalescedRDD = rdd1.coalesce(1)
coalescedRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[10] at coalesce at <console>:25

scala> coalescedRDD.getNumPartitions
res13: Int = 1


Repartition Example:

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <conso

scala> rdd1.getNumPartitions
res17: Int = 2

// Increasing the number of partitions
scala> val rdd2 = rdd1.repartition(5)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[24] at repartition at <console>:2

scala> rdd2.getNumPartitions
res20: Int = 5


// Decreasing the number of partitions
scala> val rdd2 = rdd1.repartition(1)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[28] at repartition at <console>:2

scala> rdd2.partitions.size
res21: Int = 1



// Not applicable to increase the partitions using Coalesce
scala> val rdd3 = rdd1.coalesce(10)
rdd3: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[29] at coalesce at <console>:25

scala> rdd3.getNumPartitions
res22: Int = 2




val rdd1 = sc.parallelize(Array(("tamil","tamilnenjam@gmail.com"),("vijay","vijaybalajitheboss@gmail.com"),("aish","aishvaryasudha@gmail.com"),("raja","raja@gmail.com")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[30] at parallelize at <console>:24


val rdd2 = sc.parallelize(Array(("maha","mahalakshmi@gmail.com"),("tamil","tamilnenjam2017@gmail.com"),("vijay","vijaybalaji2001@gmail.com"),("aish","aishvaryasara@gmail.com")))

scala> val joinedRDD = rdd1.join(rdd2)
joinedRDD: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[37] at join at <console>:27

scala>  joinedRDD.collect.foreach(println)
(aish,(aishvaryasudha@gmail.com,aishvaryasara@gmail.com))
(tamil,(tamilnenjam@gmail.com,tamilnenjam2017@gmail.com))
(vijay,(vijaybalajitheboss@gmail.com,vijaybalaji2001@gmail.com))



scala> rdd1.leftOuterJoin(rdd2).collect().foreach(println)
(aish,(aishvaryasudha@gmail.com,Some(aishvaryasara@gmail.com)))
(raja,(raja@gmail.com,None))  -- available only in left table 
(tamil,(tamilnenjam@gmail.com,Some(tamilnenjam2017@gmail.com)))
(vijay,(vijaybalajitheboss@gmail.com,Some(vijaybalaji2001@gmail.com)))

scala>  rdd1.rightOuterJoin(rdd2).collect().foreach(println)
(aish,(Some(aishvaryasudha@gmail.com),aishvaryasara@gmail.com))
(maha,(None,mahalakshmi@gmail.com)) -- available only in right table 
(tamil,(Some(tamilnenjam@gmail.com),tamilnenjam2017@gmail.com))
(vijay,(Some(vijaybalajitheboss@gmail.com),vijaybalaji2001@gmail.com))



scala> rdd1.union(rdd2).collect().foreach(println)
(tamil,tamilnenjam@gmail.com)
(vijay,vijaybalajitheboss@gmail.com)
(aish,aishvaryasudha@gmail.com)
(raja,raja@gmail.com)
(maha,mahalakshmi@gmail.com)
(tamil,tamilnenjam2017@gmail.com)
(vijay,vijaybalaji2001@gmail.com)
(aish,aishvaryasara@gmail.com)



scala> val rdd1 = sc.makeRDD(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[45] at makeRDD at <console>:24

scala> val mappedRDD = rdd1.map( x => x*2)
mappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[46] at map at <console>:25

scala> mappedRDD.take(2)
res28: Array[Int] = Array(2, 4)

scala> mappedRDD.first
res29: Int = 2


scala> rdd1.count
res37: Long = 4


scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[72] at parallelize at <console>:24

scala> rdd1.top(3)
res39: Array[Int] = Array(10, 9, 8)

scala> rdd1.take(3)
res40: Array[Int] = Array(1, 2, 3)


// countByValue example 
scala> val rdd1 = sc.makeRDD(List(1,1,2,2,2,3,3,4,4,4,4,4,5,5,6,6,6,6,6,6,3,3,2,2,1,1,2,2,3,4,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[74] at makeRDD at <console>:24

scala> rdd1.countByValue()
res41: scala.collection.Map[Int,Long] = Map(5 -> 3, 1 -> 4, 6 -> 6, 2 -> 7, 3 -> 5, 4 -> 7)

// Reduce Examples
scala> rdd1.reduce(_+_)
res44: Int = 112

scala> rdd1.reduce(_*_)
res45: Int = 2013265920

scala> rdd1.reduce(_-_)
res46: Int = 44

Sunday, 19 July 2020

How to read multi sheet excel file using Python?

$ pip3 install pandas  // install pandas
$ pip3 install xlrd // install excel reading package

>>> import pandas as pd
>>> xls = pd.ExcelFile('/home/hadoop/Downloads/multisheets.xlsx')  // read excel file
>>> df1 = pd.read_excel(xls, 'first')  // read particular sheet
>>> df2 = pd.read_excel(xls, 'second')
>>> df3 = pd.read_excel(xls, 'third')

>>> print(df1)  // display the content of python dataframe
    Ravi  22    Chennai
    Rahul  23  Bengaluru

>>> print(df2)
  selvi  32 Chennai
  Usha  38  Singai

>>> print(df3)
  Ayush   5   Bengaluru
  Ram  50  Aranthangi


The content of multi sheets excel file is given below:

Friday, 15 May 2020

Introduction to Pandas - Sample Programs


#Series Example with 0,1,2,3,. are indexes
import pandas as pd
s1 = pd.Series([1,2,3,4,5,6,7,8,9,10])
print(s1)

0     1
1     2
2     3
3     4
4     5
5     6
6     7
7     8
8     9
9    10
dtype: int64


type(s1)

pandas.core.series.Series


#Series example with Custom indexes
import pandas as pd
s1 = pd.Series([86,63,85,81,90],index=["Tamil","English","Maths","Science","Social"])
print(s1)

Tamil      86
English    63
Maths      85
Science    81
Social     90
dtype: int64


#Passing dictionary object to the Series
#Keys of a given Dictionary will become Indexes
import pandas as pd
subjectDict = {"Tamil":85, "English":63, "Maths":85, "Science":81, "Social":90}
s1 = pd.Series(subjectDict)
print(s1)

Tamil      85
English    63
Maths      85
Science    81
Social     90
dtype: int64




# 'b' and 'd' are not there in the given indexes, so, NaN as the value assigned to them
import pandas as pd
s1 = pd.Series({"a":10,"c":30,"e":40},index=["b","c","d","a"])
print(s1)

b     NaN
c    30.0
d     NaN
a    10.0
dtype: float64



s1 = pd.Series([5,7,3,2,88,22,-1,0,33])
print(s1[3])

2

print(s1[:2])

0    5
1    7
dtype: int64

print(s1[-1:])

8    33
dtype: int64


print(s1[:6])

0     5
1     7
2     3
3     2
4    88
5    22
dtype: int64

#Arithmetic operations
s1 = pd.Series([10,20,30,40])
s2 = pd.Series([11,22,33,44])
s3 = s1 + s2
print(s3)

0    21
1    42
2    63
3    84
dtype: int64


s1 = pd.Series([11,66,77,55])
s2 = pd.Series([5,22,22,44])
s3 = s1 - s2
print(s3)


0     6
1    44
2    55
3    11
dtype: int64

print(s1+15)

0    26
1    81
2    92
3    70
dtype: int64



print(s2 ** 1.3)

0      8.103283
1     55.609563
2     55.609563
3    136.926807
dtype: float64


s1 = pd.Series([1,2,3])
s2 = pd.Series([6,7,8])
print(s1,s2)

0    1
1    2
2    3
dtype: int64 0    6
1    7
2    8
dtype: int64


print(s1+s2)

0     7
1     9
2    11
dtype: int64


print(s1*s2)

0     6
1    14
2    24
dtype: int64

print(s1-s2, s2-s1)

0   -5
1   -5
2   -5
dtype: int64 0    5
1    5
2    5
dtype: int64




#DataFrame Example

import pandas as pd
subjectDict = {"Subjects":["Tamil","English","Maths","Science","Social"],"Marks":[86,63,85,81,90]}
df = pd.DataFrame(subjectDict)
print(df)


  Subjects  Marks
0    Tamil     86
1  English     63
2    Maths     85
3  Science     81
4   Social     90




import pandas as pd
subjectDict = {"Names":["Arjun","Ram","Biswa","Kalai","Nila"],"Age":[78,37,88,43,93]}
df = pd.DataFrame(subjectDict)
print(df)


   Names  Age
0  Arjun   78
1    Ram   37
2  Biswa   88
3  Kalai   43
4   Nila   93


df = pd.read_csv("https://gist.githubusercontent.com/netj/8836201/raw/6f9306ad21398ea43cba4f7d537619d0e07d5ae3/iris.csv")
df.head()

df = pd.read_csv("E:\\PyExa\\iris.csv")
df.head()

sepal.length sepal.width petal.length petal.width variety
0 5.1 3.5 1.4 0.2 Setosa
1 4.9 3.0 1.4 0.2 Setosa
2 4.7 3.2 1.3 0.2 Setosa
3 4.6 3.1 1.5 0.2 Setosa
4 5.0 3.6 1.4 0.2 Setosa


df.tail()

sepal.length sepal.width petal.length petal.width variety
145 6.7 3.0 5.2 2.3 Virginica
146 6.3 2.5 5.0 1.9 Virginica
147 6.5 3.0 5.2 2.0 Virginica
148 6.2 3.4 5.4 2.3 Virginica
149 5.9 3.0 5.1 1.8 Virginica


print(df.shape)
(150, 5)  # 150 X 5 ==> 150 Rows X 5 columns


df.describe()

sepal.length sepal.width petal.length petal.width
count 150.000000 150.000000 150.000000 150.000000
mean 5.843333 3.057333 3.758000 1.199333
std 0.828066 0.435866 1.765298 0.762238
min 4.300000 2.000000 1.000000 0.100000
25% 5.100000 2.800000 1.600000 0.300000
50% 5.800000 3.000000 4.350000 1.300000
75% 6.400000 3.300000 5.100000 1.800000
max 7.900000 4.400000 6.900000 2.500000




df.iloc[0:3,0:2]
#1st 3 Rows and 1st 2 Columns

sepal.length sepal.width
0 5.1 3.5
1 4.9 3.0
2 4.7 3.2


df.iloc[0:4,0:4]
#1st 4 Rows and 1st 4 Columns


  sepal.length sepal.width petal.length petal.width
0 5.1 3.5 1.4 0.2
1 4.9 3.0 1.4 0.2
2 4.7 3.2 1.3 0.2
3 4.6 3.1 1.5 0.2



df.loc[0:7,("petal.length","petal.width","variety")]
#1st 7 rows and specified columns

petal.length petal.width variety
0 1.4 0.2 Setosa
1 1.4 0.2 Setosa
2 1.3 0.2 Setosa
3 1.5 0.2 Setosa
4 1.4 0.2 Setosa
5 1.7 0.4 Setosa
6 1.4 0.3 Setosa
7 1.5 0.2 Setosa

#Drop variety column in the dataframe
s1 = df.drop("variety",axis=1)
print (s1.head())

sepal.length  sepal.width  petal.length  petal.width
0           5.1          3.5           1.4          0.2
1           4.9          3.0           1.4          0.2
2           4.7          3.2           1.3          0.2
3           4.6          3.1           1.5          0.2
4           5.0          3.6           1.4          0.2


#Drop  3 rows

s1 = df.drop([1,2,3],axis=0)
print(s1.head())


sepal.length  sepal.width  petal.length  petal.width variety
0           5.1          3.5           1.4          0.2  Setosa
4           5.0          3.6           1.4          0.2  Setosa
5           5.4          3.9           1.7          0.4  Setosa
6           4.6          3.4           1.4          0.3  Setosa
7           5.0          3.4           1.5          0.2  Setosa



df.mean()

sepal.length    5.843333
sepal.width     3.057333
petal.length    3.758000
petal.width     1.199333
dtype: float64


df.median()

sepal.length    5.80
sepal.width     3.00
petal.length    4.35
petal.width     1.30
dtype: float64


df.min()

sepal.length       4.3
sepal.width          2
petal.length         1
petal.width        0.1
variety         Setosa
dtype: object


df.max()

sepal.length          7.9
sepal.width           4.4
petal.length          6.9
petal.width           2.5
variety         Virginica
dtype: object


#applying user defined function

def half(s):
    return s*0.5

s1 = df[["sepal.length","petal.length"]].apply(half) #half is the udf
print(df[["sepal.length","petal.length"]].head())
print(s1.head())


   sepal.length  petal.length
0           5.1           1.4
1           4.9           1.4
2           4.7           1.3
3           4.6           1.5
4           5.0           1.4

   sepal.length  petal.length
0          2.55          0.70
1          2.45          0.70
2          2.35          0.65
3          2.30          0.75
4          2.50          0.70

#user defined function to double the dataframe values
def double_make(s):
    return s*2

print(df[["sepal.width","petal.width"]].head(5))
s1 = df[["sepal.width","petal.width"]].apply(double_make)
print(s1.head())


  sepal.width  petal.width
0          3.5          0.2
1          3.0          0.2
2          3.2          0.2
3          3.1          0.2
4          3.6          0.2
   sepal.width  petal.width
0          7.0          0.4
1          6.0          0.4
2          6.4          0.4
3          6.2          0.4
4          7.2          0.4


#grouping and counting of particular column

s1 = df["variety"].value_counts()
print(s1)

Virginica     50
Setosa        50
Versicolor    50
Name: variety, dtype: int64

#Sort order
s
df.sort_values(by="sepal.length").head()

sepal.length sepal.width petal.length petal.width variety
13 4.3 3.0 1.1 0.1 Setosa
42 4.4 3.2 1.3 0.2 Setosa
38 4.4 3.0 1.3 0.2 Setosa
8 4.4 2.9 1.4 0.2 Setosa
41 4.5 2.3 1.3 0.3 Setosa

Numpy Basics

#1 Dimensional Array

import numpy as np
n1 = np.array([1,2,3,4,5,6])
print(n1)
print(type(n1))

[1 2 3 4 5 6]
<class 'numpy.ndarray'>


#2 Dimensional Array

import numpy as np
n2 = np.array([[5,6,7,8,9,10],[50,51,52,53,54,55]])
print(n2)
print(type(n2))

[[ 5  6  7  8  9 10]
 [50 51 52 53 54 55]]
<class 'numpy.ndarray'>

#Initialize with Zeros - fill the array with zeros

import numpy as np
n3 = np.zeros(6)
print(n3)

[0. 0. 0. 0. 0. 0.]


import numpy as np
n4 = np.zeros((4,4))
print(n4)

[[0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]]
 
 
import numpy as np
n5 = np.zeros((5,3,2))
print(n5)

[[[0. 0.]
  [0. 0.]
  [0. 0.]]

 [[0. 0.]
  [0. 0.]
  [0. 0.]]

 [[0. 0.]
  [0. 0.]
  [0. 0.]]

 [[0. 0.]
  [0. 0.]
  [0. 0.]]

 [[0. 0.]
  [0. 0.]
  [0. 0.]]]
  
  
#Fill the array with a constant value

n5 = np.full((2,3),5)
print(n5)

[[5 5 5]
 [5 5 5]]
 
n6 = np.full((2,6),3)
print(n6)

[[3 3 3 3 3 3]
 [3 3 3 3 3 3]]
 
 
n7 = np.full((5),6)
print(n7)

[6 6 6 6 6]



#fill with a range (arange) 

n1 = np.arange(10,50,2.5)
print(n1)

[10 11 12 13 14 15 16 17 18 19]


n1 = np.arange(10,50,2.5)
print(n1)

[10.  12.5 15.  17.5 20.  22.5 25.  27.5 30.  32.5 35.  37.5 40.  42.5
 45.  47.5]
 
 
n1 = np.arange(10,50,7)
print(n1)

[10 17 24 31 38 45]


#fill with random integer values

n1 = np.random.randint(1,100,10)
print(n1)

[55 65 10 38 86 97 35 55 11 99]


n1 = np.random.randint(5,55,5)
print(n1)

[37 27 28 11 53]

#shape of the array

n1 = np.array([ [1,2,3],[4,3,2]  ])
print(n1)

(2,3)


n1 = np.array([ [1,2,3],[4,3,2] ,[53,2,3] ])
print(n1.shape)

(3, 3)



#stacking examples - vstack, hstack,column_stack
#vstack
n1 = np.array([2,6,3])
n2 = np.array([6,3,8])
np.vstack((n1,n2))

array([[2, 6, 3],
       [6, 3, 8]])
   
n1 = np.array([2,6,3])
n2 = np.array([6,3,8])
n3 = np.array([10,20,33])
np.vstack((n1,n2,n3))

array([[ 2,  6,  3],
       [ 6,  3,  8],
       [10, 20, 33]])


#hstack

n1 = np.array([2,6,3])
n2 = np.array([6,3,8])
np.hstack((n1,n2))

array([2, 6, 3, 6, 3, 8])



n1 = np.array([2,6,3])
n2 = np.array([6,3,8])
n3 = np.array([10,20,33])
np.vstack((n1,n2,n3))

array([[ 2,  6,  3],
       [ 6,  3,  8],
       [10, 20, 33]])
   
#column_stack

n1 = np.array([2,6,3])
n2 = np.array([6,3,8])
np.column_stack((n1,n2))

array([[2, 6],
       [6, 3],
       [3, 8]])
   
n1 = np.array([2,6,3])
n2 = np.array([6,3,8])
n3 = np.array([10,20,33])
np.column_stack((n1,n2,n3))


array([[ 2,  6, 10],
       [ 6,  3, 20],
       [ 3,  8, 33]])
   


n1 = np.array([2,6,3])
n2 = np.array([6,3,8])
n3 = np.array([10,20,33])
n4 = np.array([55,33,66])
np.column_stack((n1,n2,n3,n4))


array([[ 2,  6, 10, 55],
       [ 6,  3, 20, 33],
       [ 3,  8, 33, 66]])
   
   
#intersection

import numpy as np
n1  = np.array([1,2,3,4,5,6])
n2 = np.array([5,6,7,8,9])
np.intersect1d(n1,n2)

array([5, 6])


import numpy as np
n1  = np.array([1,2,3,4,5,6])
n2 = np.array([5,6,7,8,9])
np.intersect1d(n2,n1)

array([5, 6])

#difference

import numpy as np
n1  = np.array([1,2,3,4,5,6])
n2 = np.array([5,6,7,8,9])
np.setdiff1d(n1,n2)


array([1, 2, 3, 4])



import numpy as np
n1  = np.array([1,2,3,4,5,6])
n2 = np.array([5,6,7,8,9])
np.setdiff1d(n2,n1)

array([7, 8, 9])


import numpy as np
n1  = np.array([1,2,3,4,5,6])
n2 = np.array([1,2,3,4,5,6])
np.setdiff1d(n1,n2)

array([], dtype=int32)



#Sum
import numpy as np
n1  = np.array([10,20,30])
n2 = np.array([40,50,60])
np.sum([n1,n2])

210

np.sum([n1,n2],axis=0) #axis 0 : x axis - horizontal

array([50, 70, 90])


np.sum([n1,n2],axis=1) #axis 1 : y axis - vertical

array([ 60, 150])



import numpy as np
n1 = np.array([1,2,3,4,5,6])
n2 = np.array([7,8,9,10,11,12])
n3 = np.array([10,20,30,40,50,60])
n4 = np.array([100,200,300,400,500,600])
np.sum([n1,n2,n3,n4])

2388


np.sum([n1,n2,n3,n4],axis=0)

array([118, 230, 342, 454, 566, 678])


np.sum([n1,n2,n3,n4],axis=1)

array([  21,   57,  210, 2100])


Numpy Array Mathemetics
#Addition

import numpy as np
n1 = np.array([55,33,44,66,22,11,77])
n1 = n1 + 1
print(n1)

[56 34 45 67 23 12 78]


import numpy as np
n1 = np.array([55,33,44,66,22,11,77])
n1 = n1 + 10
print(n1)

[65 43 54 76 32 21 87]


import numpy as np
n1 = np.array([55,33,44,66,22,11,77])
n1 = n1 - 1
print(n1)

[54 32 43 65 21 10 76]



import numpy as np
n1 = np.array([55,33,44,66,22,11,77])
n1 = n1 - 10
print(n1)

[45 23 34 56 12  1 67]



import numpy as np
n1 = np.array([55,33,44,66,22,11,77])
n1 = n1 * 5
print(n1)

[275 165 220 330 110  55 385]



import numpy as np
n1 = np.array([55,33,44,66,22,11,77])
n1 = n1 * 55
print(n1)

[3025 1815 2420 3630 1210  605 4235]



import numpy as np
n1 = np.array([55,33,44,66,22,11,77])
n1 = n1 / 2.0
print(n1)


[27.5 16.5 22.  33.  11.   5.5 38.5]



import numpy as np
n1 = np.array([55,33,44,66,22,11,77])
n1 = n1 / 5
print(n1)

[11.   6.6  8.8 13.2  4.4  2.2 15.4]



import numpy as np
n1 = np.array([55,33,44,66,22,11,77])
n1 = n1 ** 3
print(n1)


[166375  35937  85184 287496  10648   1331 456533]



import numpy as np
n1 = np.array([55,33,44,66,22,11,77])
n1 = n1 / (3 * 3)
print(n1)

[6.11111111 3.66666667 4.88888889 7.33333333 2.44444444 1.22222222 8.55555556]


#statistics functions
import numpy as np
n1 = np.array([55,33,44,66,22,11,77])
np.mean(n1)
np.sum(n1) / len(n1)  # mean is nothing but average

44.0

#middle element 
import numpy as np
n1 = np.array([11,77,33,44,66,22])
np.median(n1)

38.5

#standard deviation
import numpy as np
n1 = np.array([11,77,33,44,66,22])
np.std(n1)

23.262392157490787




#saving and Loading

import numpy as np
n1 = np.array([1,2,3,5,4,6,7,9,8,10,22])
np.save('result',n1)  #saving
 
n2 = np.load('result.npy') #loading

print(n2*10)

[ 10  20  30  50  40  60  70  90  80 100 220]
   

Thursday, 14 May 2020

Python OOPs - Class, Parameters, Constructor, Inheritance (Multi Level, Multiple)

#Create the first class in python

class Phone:
    def make_call(self):
        print("Let's make a phone call")
    
    def play_game(self):
        print("Let's play a game")
        
p1 = Phone()
p1.make_call()
p1.play_game()



#Adding parameters to a class method
class Phone:
    def set_color(self,color):
        self.color=color
        
    def set_cost(self,cost):
        self.cost=cost
        
    def show_color(self):
        return self.color
    
    def show_cost(self):
        return self.cost
    
    def play_game(self):
        print("Playing a game")
        
    def make_call(self):
        print("Making a call")
        
p1=Phone()
p1.set_color("Blue")
p1.set_cost(500)

p1.show_cost()

500

p1.show_color()

'Blue'


p1.play_game()
p1.make_call()

Playing a game
Making a call


#creating a class with constructor

class Employee:
    def __init__(self,name,age,gender,city,salary,pin):
        self.name=name
        self.age=age
        self.gender=gender
        self.city=city
        self.salary=salary
        self.pin=pin
        
    def show_details(self):
        print("Name : {}, Age : {}, Gender: {}, City: {}, Salary: {}, Pin: {}".format(self.name,self.age,self.gender,self.city,self.salary,self.pin))
        
        
e1 = Employee("Sankara",43,'M','Bangalore',67000,560093)
e1.show_details()

Name : Sankara, Age : 43, Gender: M, City: Bangalore, Salary: 67000, Pin: 560093




#Inheritance example:
#Multiple Inheritance
class Parent1:
    def assign_str1(self,str1):
        self.str1 = str1
        
    def show_str1(self):
        print(self.str1)
        
class Parent2:
    def assign_str2(self,str2):
        self.str2 = str2
    
    def show_str2(self):
        print(self.str2)
        
class Child(Parent1, Parent2):
    def assign_str3(self,str3):
        self.str3 = str3
        
    def show_str3(self):
        print(self.str3)

c1 = Child()
c1.assign_str1("I")
c1.assign_str2("Love")
c1.assign_str3("India")

c1.show_str1()

I

c1.show_str2()

Love

c1.show_str3()

India




#Multi Level inheritance

class Parent:
    def assign_name(self,name):
        self.name = name
        
    def show_name(self):
        return self.name
    
class Child(Parent):
    def assign_age(self,age):
        self.age=age
        
    def show_age(self):
        return self.age
    
class GrandChild(Child):
    def assign_gender(self,gender):
        self.gender=gender
        
    def show_gender(self):
        return self.gender
g1 = GrandChild()
g1.assign_name("Anbu")
g1.assign_age(5)
g1.assign_gender("F")

name = g1.show_name()
age = g1.show_age()
gender = g1.show_gender()

print("Name : {}, Age : {}, Gender : {}".format(name,age,gender))

Name : Anbu, Age : 5, Gender : F



Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...