Saturday, 29 December 2018

Spark Notes - Part 1

Transformations:
---------------
Map
flatMap
filter
distinct
union
cartesion

Actions:
--------
reduce
collect
take
first
top
countByValue

//copy Web_log.tsv to hdfs

[cloudera@quickstart ds]$ hdfs dfs -copyFromLocal Web_log.txt /user/cloudera/myweblog.tsv
[cloudera@quickstart ds]$ hdfs dfs -ls myweblog.tsv
-rw-r--r--   1 cloudera cloudera    4147000 2018-12-29 06:15 myweblog.tsv


// load file and make an RDD in spark
scala> val file = sc.textFile("/user/cloudera/myweblog.tsv")
file: org.apache.spark.rdd.RDD[String] = /user/cloudera/myweblog.tsv MapPartitionsRDD[3] at textFile at <console>:27

// total lines
scala> file.count()
res6: Long = 1891710

// show the very first line
scala> file.take(1)
res7: Array[String] = Array(host logname time method url response bytes referer useragent)

scala> val countdownvisits = file.filter( line => line.contains("countdown.html"))
countdownvisits: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at filter at <console>:29

scala> countdownvisits.count()
res11: Long = 8586 


//error file - file not found
scala> val file = sc.textFile("/user/cloudera/myweblogerrorfile.tsv")
file: org.apache.spark.rdd.RDD[String] = /user/cloudera/myweblogerrorfile.tsv MapPartitionsRDD[6] at textFile at <console>:27

scala> file.count()
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://quickstart.cloudera:8020/user/cloudera/myweblogerrorfile.tsv


scala> countdownvisits.persist()
res13: countdownvisits.type = MapPartitionsRDD[4] at filter at <console>:29

scala> val ipAddress = count
count             countDistinct     countdownvisits 

// take(1): retrieve 1st column
scala> val ipAddress = countdownvisits.map (line => line.split("\t")).map(parts => parts.take(1))
ipAddress: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[8] at map at <console>:31

// take(2) :: retrieve 1st and 2nd columns
scala> val logTime = countdownvisits.map (line => line.split("\t")).map(parts => parts.take(2))
logTime: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[10] at map at <console>:31

scala> ipAddress.collect()
res14: Array[Array[String]] = Array(Array(205.212.115.106), Array(onyx.southwind.net), Array(ix-ftw-tx1-24.ix.netcom.com), Array(janice.cc.wwu.edu), Array(ottgate2.bnr.ca), Array(dialup61.afn.org), Array(129.188.154.200), Array(ppp160.iadfw.net), Array(palona1.cns.hp.com), Array(halon.sybase.com), Array(htlulx.htl-bw.ch), Array(ip16-085.phx.primenet.com), Array(crl9.crl.com), Array(thimble-d229.sierra.net), Array(larryr.cts.com), Array(cad60.cadvision.com), Array(sartre.execpc.com), Array(ppp31.cowan.edu.au), Array(ppp31.cowan.edu.au), Array(gclab034.ins.gu.edu.au), Array(ix-la17-15.ix.netcom.com), Array(ruger-50.slip.uiuc.edu), Array(paradise.iii.net), Array(ppp21.ns.net), Array(ix-ont4-27.ix.netcom.com), Array(prg1.prgone.com), Array(marden.ultranet.com), Array(fractal.mech.tohoku.ac....


scala> logTime.collect()
res15: Array[Array[String]] = Array(Array(205.212.115.106, -), Array(onyx.southwind.net, -), Array(ix-ftw-tx1-24.ix.netcom.com, -), Array(janice.cc.wwu.edu, -), Array(ottgate2.bnr.ca, -), Array(dialup61.afn.org, -), Array(129.188.154.200, -), Array(ppp160.iadfw.net, -), Array(palona1.cns.hp.com, -), Array(halon.sybase.com, -), Array(htlulx.htl-bw.ch, -), Array(ip16-085.phx.primenet.com, -), Array(crl9.crl.com, -), Array(thimble-d229.sierra.net, -), Array(larryr.cts.com, -), Array(cad60.cadvision.com, -), Array(sartre.execpc.com, -), Array(ppp31.cowan.edu.au, -), Array(ppp31.cowan.edu.au, -), Array(gclab034.ins.gu.edu.au, -), Array(ix-la17-15.ix.netcom.com, -), Array(ruger-50.slip.uiuc.edu, -), Array(paradise.iii.net, -), Array(ppp21.ns.net, -), Array(ix-ont4-27.ix.netcom.com, -), Array(...
scala> ipAddress.count()
res16: Long = 8586

scala> logTime.count()
res17: Long = 8586

scala> countdownvisits.unpersist()
res18: countdownvisits.type = MapPartitionsRDD[4] at filter at <console>:29

scala> countdownvisits.take(1)
res19: Array[String] = Array("205.212.115.106 - 804571212 GET /shuttle/countdown/countdown.html 200 3985 ")



scala> val lines = sc.parallelize(List("who is the hero?", "Who is zero?","Who is captain?","who is teacher?"))
lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:27

// map example
scala> val map1 = lines.map(line => line.split(","))
map1: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[15] at map at <console>:29

// produces Array of Array
scala> map1.collect()
res26: Array[Array[String]] = Array(Array(who is the hero?), Array(Who is zero?), Array(Who is captain?), Array(who is teacher?))

//flatMap Example
scala> val map2 = lines.flatMap(line => line.split(","))
map2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at flatMap at <console>:29


// produces single Array  -- Map makes flattened line
scala> map2.collect()
res27: Array[String] = Array(who is the hero?, Who is zero?, Who is captain?, who is teacher?)


scala> map2.collect.foreach(println)
who is the hero?
Who is zero?
Who is captain?
who is teacher?



scala> val first = sc.parallelize(List("Arulmozhi","Kavya","Sandhya","Amala"))
first: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[17] at parallelize at <console>:27

scala> val second = sc.parallelize(List("Kavya","Mala","Nila","Amala","Ramya"))
second: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:27

scala> val union1 = first.union(second)
union1: org.apache.spark.rdd.RDD[String] = UnionRDD[19] at union at <console>:31


// union
scala> union1.collect.foreach(println)
Arulmozhi
Kavya
Sandhya
Amala
Kavya
Mala
Nila
Amala
Ramya


scala> union1.collect()
res32: Array[String] = Array(Arulmozhi, Kavya, Sandhya, Amala, Kavya, Mala, Nila, Amala, Ramya)


//intersection
scala> val intersection1 = first.intersection(second)
intersection1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[25] at intersection at <console>:31

scala> intersection1.collect()
res33: Array[String] = Array(Amala, Kavya)

scala> intersection1.foreach(println)
Amala
Kavya


// cross join - cartesian
scala> val cartesion1 = first.cartesian(second)
cartesion1: org.apache.spark.rdd.RDD[(String, String)] = CartesianRDD[26] at cartesian at <console>:31

scala> cartesion1.collect
res35: Array[(String, String)] = Array((Arulmozhi,Kavya), (Arulmozhi,Mala), (Arulmozhi,Nila), (Arulmozhi,Amala), (Arulmozhi,Ramya), (Kavya,Kavya), (Kavya,Mala), (Kavya,Nila), (Kavya,Amala), (Kavya,Ramya), (Sandhya,Kavya), (Sandhya,Mala), (Sandhya,Nila), (Sandhya,Amala), (Sandhya,Ramya), (Amala,Kavya), (Amala,Mala), (Amala,Nila), (Amala,Amala), (Amala,Ramya))

scala> cartesion1.collect.foreach(println)
(Arulmozhi,Kavya)
(Arulmozhi,Mala)
(Arulmozhi,Nila)
(Arulmozhi,Amala)
(Arulmozhi,Ramya)
(Kavya,Kavya)
(Kavya,Mala)
(Kavya,Nila)
(Kavya,Amala)
(Kavya,Ramya)
(Sandhya,Kavya)
(Sandhya,Mala)
(Sandhya,Nila)
(Sandhya,Amala)
(Sandhya,Ramya)
(Amala,Kavya)
(Amala,Mala)
(Amala,Nila)
(Amala,Amala)
(Amala,Ramya)


//subtract
scala> val subtract1 = first.subtract(second)
subtract1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[30] at subtract at <console>:31

scala> subtract1.collect.foreach(println)
Arulmozhi
Sandhya

scala> val distinct1 = first.distinct()
distinct1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[33] at distinct at <console>:29

scala> distinct1.collect
res38: Array[String] = Array(Arulmozhi, Amala, Sandhya, Kavya)



//distinct example 2
scala> val withDuplicates = sc.parallelize(List("a","b","a","c","b","a","b","c","0","1","a"))
withDuplicates: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at parallelize at <console>:27

scala> val distinct2 = withDuplicates.distinct()
distinct2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[37] at distinct at <console>:29

scala> distinct2.collect
res39: Array[String] = Array(a, 0, b, 1, c)

scala> distinct2.collect.foreach(println)
a
0
b
1
c

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...