Friday, 24 April 2020

Revisiting Scala - Part I

scala> var num = List(1,2,3,4)
num: List[Int] = List(1, 2, 3, 4)

scala> num.head
res38: Int = 1

scala> num.tail
res39: List[Int] = List(2, 3, 4)

scala> num.sum
res40: Int = 10

scala> val myList = List(1,2,3,3,2,1,1,1,3,3,2,2)
myList: List[Int] = List(1, 2, 3, 3, 2, 1, 1, 1, 3, 3, 2, 2)

scala> val distinctt = myList.distinct
distinctt: List[Int] = List(1, 2, 3)

scala> val distt = myList.toSet
distt: scala.collection.immutable.Set[Int] = Set(1, 2, 3)

scala> val d = distt.toList
d: List[Int] = List(1, 2, 3)

scala> myList.size
res52: Int = 12

scala> myList.length
res53: Int = 12


scala> val num = List(1,2,3,4)
num: List[Int] = List(1, 2, 3, 4)

scala> num(0)
res54: Int = 1

scala> num(2)
res55: Int = 3

scala> num(3)
res56: Int = 4

scala> num.size
res57: Int = 4

scala> num.length
res58: Int = 4

scala> num.reverse
res59: List[Int] = List(4, 3, 2, 1)

scala> num.min
res60: Int = 1

scala> num.sum
res61: Int = 10

scala> num.max
res62: Int = 4

scala> num.isEmpty
res63: Boolean = false


scala> val myL = List()
myL: List[Nothing] = List()

scala> myL.isEmpty
res64: Boolean = true

scala> val myL = List(1,2,3,4)
myL: List[Int] = List(1, 2, 3, 4)

scala> myL.isEmpty
res65: Boolean = false


//Array of integer
scala> var num  = Array(1,2,3,4,5)
num: Array[Int] = Array(1, 2, 3, 4, 5)

scala> num(0)=10

scala> num(2)=20

scala> num
res68: Array[Int] = Array(10, 2, 20, 4, 5)


scala> var languages = Array("Scala","Java","Python","JavaScript")
languages: Array[String] = Array(Scala, Java, Python, JavaScript)


scala> languages.head
res70: String = Scala

scala> languages.tail
res71: Array[String] = Array(Java, Python, JavaScript)


scala> languages(0)
res72: String = Scala

scala> languages(1)
res73: String = Java

scala> languages(2)
res74: String = Python

scala> languages(3)
res75: String = JavaScript

// change the Array element values
scala> num
res77: Array[Int] = Array(10, 2, 20, 4, 5)

scala> num(0)=1

scala> num(2)=3

scala> num
res80: Array[Int] = Array(1, 2, 3, 4, 5)


scala> languages
res81: Array[String] = Array(Scala, Java, Python, JavaScript)

scala> languages(0)="C#"

scala> languages(1)="COBOL"

scala> languages(2)="Lisp"

scala> languages
res85: Array[String] = Array(C#, COBOL, Lisp, JavaScript)


Array size is fixed


mutable Array is dynamically grow

ArrayBuffer size can dynamically grow

scala> import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ArrayBuffer

scala> var cars = new ArrayBuffer[String]()
cars: scala.collection.mutable.ArrayBuffer[String] = ArrayBuffer()

scala> cars += "BMW"
res86: scala.collection.mutable.ArrayBuffer[String] = ArrayBuffer(BMW)

scala> cars += "Jaguar"
res87: scala.collection.mutable.ArrayBuffer[String] = ArrayBuffer(BMW, Jaguar)

scala> cars += "Ford"
res88: scala.collection.mutable.ArrayBuffer[String] = ArrayBuffer(BMW, Jaguar, Ford)

scala> cars.foreach(println)
BMW
Jaguar
Ford

scala> cars-="Ford"
res90: scala.collection.mutable.ArrayBuffer[String] = ArrayBuffer(BMW, Jaguar)

scala> cars-="BMW"
res91: scala.collection.mutable.ArrayBuffer[String] = ArrayBuffer(Jaguar)

scala> cars
res92: scala.collection.mutable.ArrayBuffer[String] = ArrayBuffer(Jaguar)

scala> cars+="Hundai"
res93: scala.collection.mutable.ArrayBuffer[String] = ArrayBuffer(Jaguar, Hundai)

scala> cars
res94: scala.collection.mutable.ArrayBuffer[String] = ArrayBuffer(Jaguar, Hundai)



scala> var numbers = new ArrayBuffer[Int]()
numbers: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer()

scala> numbers += 100
res95: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer(100)

scala> numbers += 101
res96: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer(100, 101)

scala> numbers += 102
res97: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer(100, 101, 102)

scala> numbers.length
res98: Int = 3

scala> numbers += 103
res99: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer(100, 101, 102, 103)

scala> numbers.length
res100: Int = 4




scala> cars.foreach(println)
Jaguar
Hundai

scala> cars.trimEnd(1) -- remove last element

scala> cars.foreach(print)
Jaguar
scala> cars.trimEnd(1)

scala> cars.foreach(print)



scala> cars.insert(0,"Bentley")

scala> cars.insert(1,"BMW")

scala> cars.insert(2,"Maruthi")

scala> cars.foreach(println)
Bentley
BMW
Maruthi


scala> val num = List(1,2,3,4)
num: List[Int] = List(1, 2, 3, 4)

scala> num.map(x => x+x)
res113: List[Int] = List(2, 4, 6, 8)

scala> num.map(x => x*x)
res114: List[Int] = List(1, 4, 9, 16)

scala> num.map(x => x*x*x)
res115: List[Int] = List(1, 8, 27, 64)


scala> val n = num.map(x => (x,x+1,x+2,x+3))
n: List[(Int, Int, Int, Int)] = List((1,2,3,4), (2,3,4,5), (3,4,5,6), (4,5,6,7))

scala> val n = num.map(x => (x,x*1,x*2,x*3))
n: List[(Int, Int, Int, Int)] = List((1,1,2,3), (2,2,4,6), (3,3,6,9), (4,4,8,12))



scala> val n = num.map(x => x+1)
n: List[Int] = List(2, 3, 4, 5)


scala> val num = List(1,2,3,4)
num: List[Int] = List(1, 2, 3, 4)

scala> num.map(x => x*x).map(x => -x).map(x => x+1)
res117: List[Int] = List(0, -3, -8, -15)


scala> val fruitList = List("Apple","Orange","Mango","Grapes","Sapotta")
fruitList: List[String] = List(Apple, Orange, Mango, Grapes, Sapotta)

// length of each element
scala> fruitList.map( x=> (x,x.length))
res121: List[(String, Int)] = List((Apple,5), (Orange,6), (Mango,5), (Grapes,6), (Sapotta,7))


scala> fruitList.filter(x => x.length > 5)
res118: List[String] = List(Orange, Grapes, Sapotta)

scala> fruitList.filter(x => x.length <= 5)
res120: List[String] = List(Apple, Mango)


scala> fruitList.map( x=> (x,x.size))
res122: List[(String, Int)] = List((Apple,5), (Orange,6), (Mango,5), (Grapes,6), (Sapotta,7))



Rating

scala> val rating = List(2.4, 5.6,7.4, 8.9)
rating: List[Double] = List(2.4, 5.6, 7.4, 8.9)

scala> val marks = rating.map (x => x * 10)
marks: List[Double] = List(24.0, 56.0, 74.0, 89.0)

scala> val gradeB = marks.filter (x => x >=60 && x <= 74)
gradeB: List[Double] = List(74.0)

scala> val gradeBRating = gradeB.map (x => x / 10)
gradeBRating: List[Double] = List(7.4)


nested functions:

scala> object nestedFuncEg{
        def main(arg: Array[String]) {
               
                def square(x: Double): Double ={
                        return x * x
                }
               
                def sumsquares(x: Double, y:Double): Double ={
                        return square(x) + square(y)
                }
               
                println("Sum of Squares :" + sumsquares(2,3))
        }
      }
defined object nestedFuncEg

// run it
scala> nestedFuncEg.main(null)
Sum of Squares :13.0



scala> class MyClass(msg:String) {
        def sayHi = { println (msg) }
      }
defined class MyClass


scala> val ob = new MyClass("Mumbai")
ob: MyClass = MyClass@190cc39e

scala> ob.sayHi
Mumbai


scala> class MyClass(msg:String) {
         def sayHi = { println (msg) }
       }
defined class MyClass


scala> object classEg {
                    def main(arg:Array[String]){
                     var ob1 = new MyClass("Hello Singapore")
                     ob1.sayHi
         
                    var ob2 = new MyClass("Hello London")
                     ob2.sayHi
         
                    var ob3 = new MyClass("Hello Srilanka")
                     ob3.sayHi
                    }
                    }
defined object classEg

scala> classEg.main(null)
Hello Singapore
Hello London
Hello Srilanka

object funcByValEx{
def main(args:Array[String]){

def time(): Long ={
println("Inside Time Function")
return System.nanoTime()
}

def exec(t:Long) : Long = {
println("Inside Exec Function")
println("Time : " + t)
println("Exiting from Exec function")
return t
}

println("Main Function : " + exec(time()))
}
}


scala> funcByValEx.main(null)
Inside Time Function
Inside Exec Function
Time : 34709256789660
Exiting from Exec function
Main Function : 34709256789660



// Return statement specified
object funcByValEx{
def main(args:Array[String]){

def time() ={
println("Inside Time Function")
System.nanoTime()
}

def exec(t:Long)  = {
println("Inside Exec Function")
println("Time : " + t)
println("Exiting from Exec function")
t
}

println("Main Function : " + exec(time()))
}
}





//Return statement is optional
scala> object funcByValEx{
        def main(args:Array[String]){
               
                def time() ={
                        println("Inside Time Function")
                        System.nanoTime()
                }
               
                def exec(t:Long)  = {
                        println("Inside Exec Function")
                        println("Time : " + t)
                        println("Exiting from Exec function")
                        t
                }
               
                println("Main Function : " + exec(time))
        }
      }
defined object funcByValEx

scala> funcByValEx.main(null)
Inside Time Function
Inside Exec Function
Time : 34769862864743
Exiting from Exec function
Main Function : 34769862864743


  object funcByValEx{
         def main(args:Array[String]){
               
                 def time  ={
                         println("Inside Time Function")
                         System.nanoTime()
                 }
               
                 def exec(t:Long)  = {
                         println("Inside Exec Function")
                         println("Time : " + t)
                         println("Exiting from Exec function")
                         t
                 }
               
                 println("Main Function : " + exec(time))
         }
       }



















Friday, 17 April 2020

Spark SQL with Yelp Database

yelp

scala> val biz = spark.read.json("hdfs://localhost:9000/sparkfiles/yelp.json")

scala> biz.printSchema
root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: string (nullable = true)
 |    |-- GoodForKids: string (nullable = true)
 |    |-- GoodForMeal: string (nullable = true)
 |    |-- HairSpecializesIn: string (nullable = true)
 |    |-- HappyHour: string (nullable = true)
 |    |-- HasTV: string (nullable = true)
 |    |-- Music: string (nullable = true)
 |    |-- NoiseLevel: string (nullable = true)
 |    |-- Open24Hours: string (nullable = true)
 |    |-- OutdoorSeating: string (nullable = true)
 |    |-- RestaurantsAttire: string (nullable = true)
 |    |-- RestaurantsCounterService: string (nullable = true)
 |    |-- RestaurantsDelivery: string (nullable = true)
 |    |-- RestaurantsGoodForGroups: string (nullable = true)
 |    |-- RestaurantsPriceRange2: string (nullable = true)
 |    |-- RestaurantsReservations: string (nullable = true)
 |    |-- RestaurantsTableService: string (nullable = true)
 |    |-- RestaurantsTakeOut: string (nullable = true)
 |    |-- Smoking: string (nullable = true)
 |    |-- WheelchairAccessible: string (nullable = true)
 |    |-- WiFi: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- city: string (nullable = true)
 |-- hours: struct (nullable = true)
 |    |-- Friday: string (nullable = true)
 |    |-- Monday: string (nullable = true)
 |    |-- Saturday: string (nullable = true)
 |    |-- Sunday: string (nullable = true)
 |    |-- Thursday: string (nullable = true)
 |    |-- Tuesday: string (nullable = true)
 |    |-- Wednesday: string (nullable = true)
 |-- is_open: long (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- state: string (nullable = true)


scala>  biz.createOrReplaceTempView("biz")

scala> spark.sql("select count(1) as businesses from biz").show
+----------+                                                                   
|businesses|
+----------+
|    209393|
+----------+

scala> spark.sql("select state, count(1) as businesses from biz group by state").show(10)
+-----+----------+                                                             
|state|businesses|
+-----+----------+
|   AZ|     60803|
|   SC|      1328|
|   OR|         1|
|   VA|         1|
|   QC|     10233|
|   BC|         2|
|   MI|         2|
|   NV|     39084|
|   WI|      5525|
|   CA|        23|
+-----+----------+
only showing top 10 rows


scala> spark.sql("select state, count(1) as businesses from biz group by state order by businesses desc").show(10)
+-----+----------+                                                             
|state|businesses|
+-----+----------+
|   AZ|     60803|
|   NV|     39084|
|   ON|     36627|
|   OH|     16392|
|   NC|     16218|
|   PA|     12376|
|   QC|     10233|
|   AB|      8682|
|   WI|      5525|
|   IL|      2034|
+-----+----------+
only showing top 10 rows


scala> spark.sql("select name,stars,review_count, city,state from biz where stars = 5.0").show(10)
+--------------------+-----+------------+----------+-----+
|                name|stars|review_count|      city|state|
+--------------------+-----+------------+----------+-----+
|   Carlos Santo, NMD|  5.0|           4|Scottsdale|   AZ|
|             Felinus|  5.0|           5|  Montreal|   QC|
|Junction Tire & A...|  5.0|          18|      Mesa|   AZ|
|Chinook Landscapi...|  5.0|           3|   Calgary|   AB|
|Dependable Brakes...|  5.0|           5|Pittsburgh|   PA|
|      Desert Storage|  5.0|           5| Henderson|   NV|
|      GK's Vapor Pub|  5.0|           8|  Surprise|   AZ|
|Christ's Church o...|  5.0|           4|    Anthem|   AZ|
|Annette Thomas Ha...|  5.0|           7| Las Vegas|   NV|
|           Local Pet|  5.0|           3|Pittsburgh|   PA|
+--------------------+-----+------------+----------+-----+


scala> spark.sql("select state, sum(review_count) as reviews from biz group by state").show(10)
+-----+-------+                                                               
|state|reviews|
+-----+-------+
|   AZ|2403606|
|   SC|  27406|
|   OR|      9|
|   VA|     27|
|   QC| 204199|
|   BC|      6|
|   MI|     25|
|   NV|2702841|
|   WI| 147463|
|   CA|    363|
+-----+-------+
only showing top 10 rows


scala> spark.sql("select stars, count(1) as businesses from biz group by stars").show(10)
+-----+----------+                                                             
|stars|businesses|
+-----+----------+
|  3.5|     38079|
|  4.5|     29940|
|  2.5|     21435|
|  1.0|      5898|
|  4.0|     39199|
|  3.0|     28634|
|  2.0|     13124|
|  1.5|      6004|
|  5.0|     27080|
+-----+----------+


scala> spark.sql("select state, avg(review_count) as avg_reviews from biz group by state").show(10)
+-----+------------------+                                                     
|state|       avg_reviews|
+-----+------------------+
|   AZ| 39.53104287617387|
|   SC|20.637048192771083|
|   OR|               9.0|
|   VA|              27.0|
|   QC|19.954949672627773|
|   BC|               3.0|
|   MI|              12.5|
|   NV| 69.15466687135401|
|   WI|26.690135746606334|
|   CA|15.782608695652174|
+-----+------------------+
only showing top 10 rows


scala> spark.sql("select state,round(avg(review_count)) as avg_reviews from biz group by state order by avg_reviews desc").show(5)
+-----+-----------+                                                           
|state|avg_reviews|
+-----+-----------+
|   TX|      184.0|
|   NV|       69.0|
|   AR|       59.0|
|   HI|       51.0|
|   AZ|       40.0|
+-----+-----------+

scala> spark.sql("select name,stars,review_count from biz where city='Las Vegas' order by stars desc, review_count desc limit 5").show()
+--------------------+-----+------------+                                     
|                name|stars|review_count|
+--------------------+-----+------------+
|        Brew Tea Bar|  5.0|        1827|
|Paranormal - Mind...|  5.0|         979|
|            Eco-Tint|  5.0|         853|
|      Zenaida's Cafe|  5.0|         717|
|      Carpet Monkeys|  5.0|         688|
+--------------------+-----+------------+

scala> spark.sql("select name,stars,review_count, state,hours from biz where hours.wednesday = '7:0-16:0'  limit 5").show()
+--------------------+-----+------------+-----+--------------------+           
|                name|stars|review_count|state|               hours|
+--------------------+-----+------------+-----+--------------------+
|Nevada House of Hose|  2.5|           3|   NV|[7:0-16:0, 7:0-16...|
|     Karb King Karbs|  1.0|          11|   PA|[7:0-16:0, 7:0-16...|
|Steinberg Diagnos...|  3.0|          96|   NV|[7:0-16:0, 7:0-16...|
|Sonora Quest Labo...|  2.5|          35|   AZ|[7:0-16:0, 0:0-0:...|
|Mj Parker Auto Tr...|  3.0|           9|   AZ|[7:0-16:0, 7:0-16...|
+--------------------+-----+------------+-----+--------------------+


Thursday, 16 April 2020

Spark with Scala - Revisit Part I

scala> val myFirstRDD = sc.parallelize(List("spark","scala","hadoop"))
myFirstRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> myFirstRDD.foreach(println)
spark
scala
hadoop
                                                                               
scala> val txtRDD = sc.textFile("/home/hadoop/a.txt")
txtRDD: org.apache.spark.rdd.RDD[String] = /home/hadoop/a.txt MapPartitionsRDD[2] at textFile at <console>:24

scala> txtRDD.foreach(println)
001,Ravi,10000
0002,Rahul,20000
003,Arjun,30000
004,Anbu,24350

scala> case class emp (id:Int,name:String,sal:Int)
defined class emp



val rd1 = txtRDD.map { e =>
       val employeeRow = e.split(",")
       val id = employeeRow(0).toInt
       val name = employeeRow(1)
       val salary = employeeRow(2).toInt
       emp(id,name,salary)
       }


scala> rd1.foreach(println)
emp(1,Ravi,10000)
emp(2,Rahul,20000)
emp(3,Arjun,30000)
emp(4,Anbu,24350)

scala> val df = rd1.toDF()
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df.show
+---+-----+-----+
| id| name|  sal|
+---+-----+-----+
|  1| Ravi|10000|
|  2|Rahul|20000|
|  3|Arjun|30000|
|  4| Anbu|24350|
+---+-----+-----+


scala> df.printSchema
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- sal: integer (nullable = false)

scala> df.createOrReplaceTempView("emp")

scala> sql("select * from emp").show

+---+-----+-----+
| id| name|  sal|
+---+-----+-----+
|  1| Ravi|10000|
|  2|Rahul|20000|
|  3|Arjun|30000|
|  4| Anbu|24350|
+---+-----+-----+

scala> txtRDD.take(100)
res9: Array[String] = Array(001,Ravi,10000, 0002,Rahul,20000, 003,Arjun,30000, 004,Anbu,24350)

scala> val newRDD = txtRDD.filter (x => x.contains("Ra"))
newRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at filter at <console>:25

scala> newRDD.foreach(println)
001,Ravi,10000
0002,Rahul,20000

scala> val oneMoreRDD = txtRDD.filter(x => x.contains("Arjun"))
oneMoreRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at filter at <console>:25

scala> oneMoreRDD.foreach(println)
003,Arjun,30000


-- filter example ---
scala> val numbers = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
numbers: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24

scala> val evens = numbers.filter(_%2==0).collect
evens: Array[Int] = Array(2, 4, 6, 8, 10)





--- Union and intersection ----
scala> val l1 = sc.parallelize(1 to 10)
l1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at parallelize at <console>:24

scala> val l2 = sc.parallelize(5 to 15)
l2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at parallelize at <console>:24

scala> l1.intersection(l2).collect
res15: Array[Int] = Array(6, 7, 9, 8, 10, 5)


scala> l1.union(l2).collect
res16: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)



--reduce example--
scala> val a = sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[46] at parallelize at <console>:24

scala> a.reduce(_+_)
res18: Int = 55


--sum of the elements in a given partition
scala> val b = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[49] at parallelize at <console>:24

scala> b.foreachPartition(x => println(x.reduce(_ + _)))
6
15
24



scala> val product = List((1,"Redmi",30000),(2,"OnePlus",50000),(3,"Oppo",34000))
product: List[(Int, String, Int)] = List((1,Redmi,30000), (2,OnePlus,50000), (3,Oppo,34000))

scala> val df = product.toDF("id","brand","price")
df: org.apache.spark.sql.DataFrame = [id: int, brand: string ... 1 more field]

scala> df.printSchema
root
 |-- id: integer (nullable = false)
 |-- brand: string (nullable = true)
 |-- price: integer (nullable = false)


scala> df.show
+---+-------+-----+
| id|  brand|price|
+---+-------+-----+
|  1|  Redmi|30000|
|  2|OnePlus|50000|
|  3|   Oppo|34000|
+---+-------+-----+


--read a file from hdfs --

-- copy local file into hadoop
hadoop@hadoop:~/Downloads$ hdfs dfs -copyFromLocal yelp.json /sparkfiles/


scala> val textFile = sc.textFile("hdfs://localhost:9000/sparkfiles/yelp.json")
textFile: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/sparkfiles/yelp.json MapPartitionsRDD[51] at textFile at <console>:27

scala> textFile.first
res28: String = {"business_id":"f9NumwFMBDn751xgFiRbNA","name":"The Range At Lake Norman","address":"10913 Bailey Rd","city":"Cornelius","state":"NC","postal_code":"28031","latitude":35.4627242,"longitude":-80.8526119,"stars":3.5,"review_count":36,"is_open":1,"attributes":{"BusinessAcceptsCreditCards":"True","BikeParking":"True","GoodForKids":"False","BusinessParking":"{'garage': False, 'street': False, 'validated': False, 'lot': True, 'valet': False}","ByAppointmentOnly":"False","RestaurantsPriceRange2":"3"},"categories":"Active Life, Gun\/Rifle Ranges, Guns & Ammo, Shopping","hours":{"Monday":"10:0-18:0","Tuesday":"11:0-20:0","Wednesday":"10:0-18:0","Thursday":"11:0-20:0","Friday":"11:0-20:0","Saturday":"11:0-20:0","Sunday":"13:0-18:0"}}



create a json:


hadoop@hadoop:~$ cat student1.json
{"age": 20,"name": "Sam"},
{"age": 17,"name": "Mick"},
{"age": 18,"name": "Jennet"},
{"age": 19,"name": "Serena"}

hadoop@hadoop:~$ hdfs dfs -rm /sparkfiles/student1.json
Deleted /sparkfiles/student1.json

hadoop@hadoop:~$ hdfs dfs -copyFromLocal student1.json /sparkfiles/


scala> val df = spark.read.json("hdfs://localhost:9000/sparkfiles/student1.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)


scala> df.show
+---+------+
|age|  name|
+---+------+
| 20|   Sam|
| 17|  Mick|
| 18|Jennet|
| 19|Serena|
+---+------+


scala> df.select("name").show
+------+
|  name|
+------+
|   Sam|
|  Mick|
|Jennet|
|Serena|
+------+


scala> df.filter($"age" >= 18).show
+---+------+
|age|  name|
+---+------+
| 20|   Sam|
| 18|Jennet|
| 19|Serena|
+---+------+


scala> df.groupBy("age").count().show
+---+-----+                                                                   
|age|count|
+---+-----+
| 19|    1|
| 17|    1|
| 18|    1|
| 20|    1|
+---+-----+


scala> df.createOrReplaceTempView("student")

scala> spark.sql("select * from student").show
+---+------+
|age|  name|
+---+------+
| 20|   Sam|
| 17|  Mick|
| 18|Jennet|
| 19|Serena|
+---+------+

scala> spark.sql("select * from student where age >= 18").show
+---+------+
|age|  name|
+---+------+
| 20|   Sam|
| 18|Jennet|
| 19|Serena|
+---+------+

Input json:

people.json:
--------------
{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}

adoop@hadoop:~$ cat > people.json
{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}
^C

hadoop@hadoop:~$ cat people.json
{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}

hadoop@hadoop:~$ hdfs dfs -copyFromLocal people.json /sparkfiles/

hadoop@hadoop:~$ hdfs dfs -cat hdfs://localhost:9000/sparkfiles/people.json
{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}



hdfs://localhost:9000/sparkfiles/people.json


scala> val df = spark.read.json("hdfs://localhost:9000/sparkfiles/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]               

scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)


scala> df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

scala> df.columns
res2: Array[String] = Array(age, name)

scala> df.select("age").show
+----+
| age|
+----+
|null|
|  30|
|  19|
+----+


scala> df.select($"age").show
+----+
| age|
+----+
|null|
|  30|
|  19|
+----+


scala> df.select($"age"+5).show
+---------+
|(age + 5)|
+---------+
|     null|
|       35|
|       24|
+---------+


scala> df.describe()
res8: org.apache.spark.sql.DataFrame = [summary: string, age: string ... 1 more field]

scala> df.describe().show
+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+

Make your own schema:
Applying user defined schema while reading json

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

val mySchema = StructType(StructField("age",IntegerType)::StructField("name",StringType)::Nil)

val df_with_schema = spark.read.schema(mySchema).json("hdfs://localhost:9000/sparkfiles/people.json")

scala> df_with_schema.printSchema
root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)


scala> df_with_schema.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



scala> df.head(10).foreach(println)
[null,Michael]
[30,Andy]
[19,Justin]

scala> df.withColumn("newAge",$"age").show
+----+-------+------+
| age|   name|newAge|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    30|
|  19| Justin|    19|
+----+-------+------+


scala> df.withColumn("newAge",$"age"+1).show
+----+-------+------+
| age|   name|newAge|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    31|
|  19| Justin|    20|
+----+-------+------+

Rename a column:
scala> val df1 = df.withColumnRenamed("age","superNewAge")
df1: org.apache.spark.sql.DataFrame = [superNewAge: bigint, name: string]

scala> df1.printSchema
root
 |-- superNewAge: long (nullable = true)
 |-- name: string (nullable = true)


scala> df1.show
+-----------+-------+
|superNewAge|   name|
+-----------+-------+
|       null|Michael|
|         30|   Andy|
|         19| Justin|
+-----------+-------+


scala> df.createOrReplaceTempView("people")
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


text file : people.txt

hadoop@hadoop:~$ cat > people.txt
Michale,29
Justin,19
^C


hadoop@hadoop:~$ cat people.txt
Michale,29
Justin,19


hadoop@hadoop:~$ hdfs dfs -copyFromLocal people.txt /sparkfiles/

hadoop@hadoop:~$ hdfs dfs -cat hdfs://localhost:9000/sparkfiles/people.txt
Michale,29
Justin,19

//add one more record
hadoop@hadoop:~$ cat >> people.txt
Andy,30
^C

// delete existing file in hdfs
hadoop@hadoop:~$ hdfs dfs -rm /sparkfiles/people.txt
Deleted /sparkfiles/people.txt

hadoop@hadoop:~$ hdfs dfs -copyFromLocal people.txt /sparkfiles/


scala> lines.top(3)
res26: Array[String] = Array(Michale,29, Justin,19, Andy,30)

scala>  case class Person (name:String, age:Int)
defined class Person

scala> val parts = lines.map{ x =>
     |       val l = x.split(",")
     |       val name = l(0)
     |       val age = l(1).toInt
     |       Person(name,age)
     |       }
parts: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[71] at map at <console>:30

scala> parts.foreach(println)
Person(Michale,29)
Person(Justin,19)
Person(Andy,30)

scala> val df = spark.createDataFrame(parts)
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> df
res28: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> df.printSchema
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)


scala> df.show
+-------+---+
|   name|age|
+-------+---+
|Michale| 29|
| Justin| 19|
|   Andy| 30|
+-------+---+


Teenagers:
----------

scala> val teens = spark.sql("select * from people where age >= 13 and age <= 19")
teens: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> teens.show
+------+---+
|  name|age|
+------+---+
|Justin| 19|
+------+---+



Friday, 3 April 2020

How to synchronize local git repository with github remote?

$ ssh-keygen -t rsa -b 4096 -C "tamilnenjam@gmail.com"
Generating public/private rsa key pair.
Enter file in which to save the key (/c/Users/tamil/.ssh/id_rsa):
/c/Users/tamil/.ssh/id_rsa already exists.
Overwrite (y/n)? y
Enter passphrase (empty for no passphrase): something
Enter same passphrase again: something
Your identification has been saved in /c/Users/tamil/.ssh/id_rsa
Your public key has been saved in /c/Users/tamil/.ssh/id_rsa.pub
The key fingerprint is:
SHA256:p1SFwc/Nu+Z5X/aAPJHH8ri28iugWxzjE0TmbI68NCI tamilnenjam@gmail.com
The key's randomart image is:
+---[RSA 4096]----+
|        o..o.    |
|       =  o.     |
|        = .o o   |
|     . = .  ooo  |
|  E . = S . + o. |
|   . o *.* . B.  |
|      ..*.  = o.o|
|      .. .o .oo++|
|      ..   ===o.+|
+----[SHA256]-----+


$  eval $(ssh-agent -s)
Agent pid 144


$ ssh-add ~/.ssh/id_rsa
Enter passphrase for /c/Users/tamil/.ssh/id_rsa:
Identity added: /c/Users/tamil/.ssh/id_rsa (tamilnenjam@gmail.com)


clip < ~/.ssh/id_rsa.pub

paste this key in git hub settings - SSH and GPG keys


https://github.com/tamilnenjam/awesome.git


echo "# awesome" >> README.md
git init
git add README.md
git commit -m "first commit"
git remote add origin https://github.com/tamilnenjam/awesome.git
git push -u origin master

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...