Saturday, 30 May 2020

Call Log Data Extraction using Pyspark

def extract(str):
    pnos = re.search('\d{20}',str).group()
    fromno = pnos[0:10]
    tono = pnos[10:20]
    #status = re.search('[A-Z]{6,7}',str).group()
    
    status=re.search('SUCCESS|DROPPED|FAILED',str).group()
    timestamps = re.findall('(\d{4}-\d{2}-\d{2}\s{1}\d{2}:\d{2}:\d{2})',str)
    starttime = timestamps[0]
    endtime=timestamps[1]
    return (fromno,tono,status,starttime,endtime)


for logg in inputs:
    print(extract(logg))
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()
data = spark.sparkContext.textFile("E:\\vow\\calllogdata.txt")
#data2 = data.map(lambda x:x.encode('utf-8'))

data.first()
'ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:05DROPPED 80526900577757919463'

data2 = data.map(lambda x:extract(x))
data2.take(1)

[('8052690057',
  '7757919463',
  'DROPPED',
  '2014-03-15 00:02:48',
  '2014-03-15 00:06:05')]
  
  
df = data2.toDF(["fromno","tono","status","starttime","endtime"])
df.printSchema()

root
 |-- fromno: string (nullable = true)
 |-- tono: string (nullable = true)
 |-- status: string (nullable = true)
 |-- starttime: string (nullable = true)  #starttime should be timestamp
 |-- endtime: string (nullable = true)   #endtime should be timestamp
 
from pyspark.sql.functions import col, current_date

#changing the datatype into timestamp, create new column 'day' with current_date
dfCall = df.withColumn("starttime",col("starttime").cast("timestamp")).
withColumn("endtime",col("endtime").cast("timestamp")).
withColumn("day",current_date())
dfCall.printSchema()

#we have changed the datatype of starttime, endtime fields into timestamps
root
 |-- fromno: string (nullable = true)
 |-- tono: string (nullable = true)
 |-- status: string (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- endtime: timestamp (nullable = true)
 |-- day: date (nullable = false)


dfCall.show(5)

+----------+----------+-------+-------------------+-------------------+----------+
|    fromno|      tono| status|          starttime|            endtime|       day|
+----------+----------+-------+-------------------+-------------------+----------+
|8052690057|7757919463|DROPPED|2014-03-15 00:02:48|2014-03-15 00:06:05|2020-05-31|
|9886177375|9916790556|DROPPED|2014-03-15 00:02:48|2014-03-15 00:06:07|2020-05-31|
|8618627996|9886177375|SUCCESS|2014-03-16 00:02:48|2014-03-16 00:06:45|2020-05-31|
|9876515616|4894949494|DROPPED|2014-03-16 00:02:48|2014-03-16 00:06:53|2020-05-31|
|5454545454|6469496477| FAILED|2014-03-16 00:02:48|2014-03-16 00:06:12|2020-05-31|
+----------+----------+-------+-------------------+-------------------+----------+

dfCall.write.format("orc").mode("append").partionBy("day").save("e:\\datasets\callout")

Call log Regular Expressions in Pyspark

import re

in1 = 'ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:332014-03-15 06:03:42DROPPED 80526900577757919463'
in2 = 'ec59cea2-5006-448f-a047-d5e53f33be232014-03-19 00:03:482014-03-19 05:02:33DROPPED 57554548979797979797'
in3 = 'ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:04:452014-03-17 04:06:05FAILED  44554584848449644469'
in4 = 'ec59cea2-5006-448f-a045-d5e53f33be232014-03-19 00:05:482014-03-19 03:02:34SUCCESS 84645645605646064646'
in5 = 'ec59cea2-5006-448f-a050-d5e53f33be232014-03-20 00:06:282014-03-20 01:06:05SUCCESS 74894086489489489489'
inputs = list((in1,in2,in3,in4,in5))


def extract(str):
    pnos = re.search('\d{20}',str).group()
    fromno = pnos[0:10]
    tono = pnos[10:20]
    #status = re.search('[A-Z]{6,7}',str).group()
    
    status=re.search('SUCCESS|DROPPED|FAILED',str).group()
    timestamps = re.findall('(\d{4}-\d{2}-\d{2}\s{1}\d{2}:\d{2}:\d{2})',str)
    starttime = timestamps[0]
    endtime=timestamps[1]
    return (fromno,tono,status,starttime,endtime)


for logg in inputs:
    print(extract(logg))

Google Adsense data Analysis using Pyspark DataFrame

adsenseDF = spark.read.format("csv").option("header",True).option("inferSchema",True).load("E:\\DataSets\\adsense_data.csv")
adsenseDF.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- Page views: integer (nullable = true)
 |-- Impressions: integer (nullable = true)
 |-- Clicks: integer (nullable = true)
 |-- Page RPM (USD): double (nullable = true)
 |-- Impression RPM (USD): double (nullable = true)
 |-- Active View Viewable: string (nullable = true)
 |-- Estimated earnings (USD): double (nullable = true)

dfAdsense = adsenseDF.withColumnRenamed("date",'fldDate').withColumnRenamed('Page views','fldPageViews').withColumnRenamed('Impressions','fldImpressions').withColumnRenamed('Clicks','fldClicks').withColumnRenamed('Page RPM (USD)','fldPageRPM').withColumnRenamed('Impression RPM (USD)','fldimpressionRPM').withColumnRenamed("Active View Viewable","fldActiveViewViewable").withColumnRenamed("Estimated earnings (USD)","fldEstimatedEarnings")
dfAdsense.printSchema()

root
 |-- fldDate: timestamp (nullable = true)
 |-- fldPageViews: integer (nullable = true)
 |-- fldImpressions: integer (nullable = true)
 |-- fldClicks: integer (nullable = true)
 |-- fldPageRPM: double (nullable = true)
 |-- fldimpressionRPM: double (nullable = true)
 |-- fldActiveViewViewable: string (nullable = true)
 |-- fldEstimatedEarnings: double (nullable = true)



#delete the records which are having zero revenue
from pyspark.sql.functions import col
df = dfAdsense.filter(col("fldEstimatedEarnings") != 0)
print(df.count())

1511


#Find the total Earnings 
from pyspark.sql import functions as F 
df.select(F.sum(col("fldEstimatedEarnings")).alias("Estimated Earnings Total")).show()

+------------------------+
|Estimated Earnings Total|
+------------------------+
|      16995.739999999976|
+------------------------+

#Find the top 10 Earnings without Date
from pyspark.sql.functions import col
df.select(col("fldEstimatedEarnings")) .sort(col("fldEstimatedEarnings").desc()).show(10)

+--------------------+
|fldEstimatedEarnings|
+--------------------+
|               63.71|
|               54.14|
|               53.77|
|               52.73|
|               52.14|
|               50.86|
|               49.63|
|               48.91|
|                48.8|
|               48.58|
+--------------------+

#Find the top 10 Earnings without Date
from pyspark.sql.functions import desc
df.select("fldEstimatedEarnings").sort(desc("fldEstimatedEarnings")).show(10)

+--------------------+
|fldEstimatedEarnings|
+--------------------+
|               63.71|
|               54.14|
|               53.77|
|               52.73|
|               52.14|
|               50.86|
|               49.63|
|               48.91|
|                48.8|
|               48.58|
+--------------------+


#Find the top 10 Earnings with Date

from pyspark.sql.functions import desc
df.select("fldDate","fldEstimatedEarnings").sort(desc("fldEstimatedEarnings")).show(10)

+-------------------+--------------------+
|            fldDate|fldEstimatedEarnings|
+-------------------+--------------------+
|2010-02-25 00:00:00|               63.71|
|2011-11-28 00:00:00|               54.14|
|2010-12-18 00:00:00|               53.77|
|2011-11-21 00:00:00|               52.73|
|2010-12-19 00:00:00|               52.14|
|2011-07-13 00:00:00|               50.86|
|2011-12-02 00:00:00|               49.63|
|2011-06-24 00:00:00|               48.91|
|2011-09-27 00:00:00|                48.8|
|2011-07-04 00:00:00|               48.58|
+-------------------+--------------------+


 #Find the least 10 Earnings without Date
 df.select("fldEstimatedEarnings").sort(("fldEstimatedEarnings")).show(10)
 
 +--------------------+
|fldEstimatedEarnings|
+--------------------+
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
+--------------------+

#Find the least 10 Earnings with Date
df.select("fldDate","fldEstimatedEarnings").sort(("fldEstimatedEarnings")).show(10)

+-------------------+--------------------+
|            fldDate|fldEstimatedEarnings|
+-------------------+--------------------+
|2008-09-14 00:00:00|                0.01|
|2012-12-22 00:00:00|                0.01|
|2008-08-30 00:00:00|                0.01|
|2009-02-17 00:00:00|                0.01|
|2012-01-28 00:00:00|                0.01|
|2009-03-07 00:00:00|                0.01|
|2012-02-06 00:00:00|                0.01|
|2011-12-13 00:00:00|                0.01|
|2012-01-24 00:00:00|                0.01|
|2008-09-08 00:00:00|                0.01|
+-------------------+--------------------+

#in which dates we got Estimated Earnings >= 40
from pyspark.sql.functions import col
df.select("fldDate","fldEstimatedEarnings").where (col("fldEstimatedEarnings") >= 40).show()

+-------------------+--------------------+
|            fldDate|fldEstimatedEarnings|
+-------------------+--------------------+
|2010-02-25 00:00:00|               63.71|
|2011-07-04 00:00:00|               48.58|
|2011-11-21 00:00:00|               52.73|
|2011-11-28 00:00:00|               54.14|
|2011-07-08 00:00:00|               46.45|
|2011-11-20 00:00:00|               42.98|
|2010-12-18 00:00:00|               53.77|
|2011-11-29 00:00:00|               43.27|
|2011-11-17 00:00:00|               41.15|
|2011-11-22 00:00:00|               43.78|
|2011-11-30 00:00:00|               46.69|
|2011-12-02 00:00:00|               49.63|
|2011-11-15 00:00:00|               41.43|
|2011-09-17 00:00:00|               42.28|
|2011-07-12 00:00:00|               46.85|
|2011-07-11 00:00:00|               43.33|
|2011-07-09 00:00:00|               44.45|
|2011-07-05 00:00:00|               43.08|
|2011-09-18 00:00:00|               43.02|
|2011-06-22 00:00:00|               44.82|
+-------------------+--------------------+

 #How many days we got Estimated Earnings >= 40?
 df.where(col("fldEstimatedEarnings") >= 40).count()
 
 47
 
 >10 and <= 50 - count
 df.filter( (col("fldEstimatedEarnings") >= 10) & (col("fldEstimatedEarnings") <= 50)).count()
 
 583
 
 #top 10 page views
from pyspark.sql.functions import desc
df.select("fldDate","fldPageViews").sort(desc("fldPageViews")).show(10)


+-------------------+------------+
|            fldDate|fldPageViews|
+-------------------+------------+
|2010-02-25 00:00:00|       61358|
|2011-07-03 00:00:00|       60066|
|2011-04-03 00:00:00|       59079|
|2011-07-04 00:00:00|       55221|
|2011-11-21 00:00:00|       50808|
|2011-07-08 00:00:00|       49545|
|2011-11-20 00:00:00|       49020|
|2011-11-13 00:00:00|       48613|
|2011-11-28 00:00:00|       48360|
|2011-11-22 00:00:00|       47939|
+-------------------+------------+

#top 10 clicks 
from pyspark.sql.functions import desc
df.select("fldDate","fldClicks").sort(desc("fldClicks")).show(10)

+-------------------+---------+
|            fldDate|fldClicks|
+-------------------+---------+
|2011-11-26 00:00:00|     1044|
|2011-11-25 00:00:00|     1026|
|2011-11-28 00:00:00|      996|
|2011-11-29 00:00:00|      968|
|2011-11-18 00:00:00|      924|
|2011-11-16 00:00:00|      920|
|2011-11-17 00:00:00|      917|
|2011-11-12 00:00:00|      912|
|2011-08-06 00:00:00|      878|
|2011-11-13 00:00:00|      873|
+-------------------+---------+


# no of clicks >= 800 - Descending order
from pyspark.sql.functions import desc
df.select("fldDate","fldClicks").where(col('fldClicks') >= 800).sort(desc("fldClicks")).show()

+-------------------+---------+
|            fldDate|fldClicks|
+-------------------+---------+
|2011-11-26 00:00:00|     1044|
|2011-11-25 00:00:00|     1026|
|2011-11-28 00:00:00|      996|
|2011-11-29 00:00:00|      968|
|2011-11-18 00:00:00|      924|
|2011-11-16 00:00:00|      920|
|2011-11-17 00:00:00|      917|
|2011-11-12 00:00:00|      912|
|2011-08-06 00:00:00|      878|
|2011-11-13 00:00:00|      873|
|2011-11-30 00:00:00|      849|
|2011-07-23 00:00:00|      845|
|2011-12-02 00:00:00|      842|
|2011-11-15 00:00:00|      829|
|2011-11-27 00:00:00|      823|
|2011-07-09 00:00:00|      812|
|2011-07-08 00:00:00|      806|
|2011-07-04 00:00:00|      802|
|2011-11-19 00:00:00|      801|
|2011-11-11 00:00:00|      800|

#no of clicks >= 800 - Ascending Order
from pyspark.sql.functions import desc
df.select("fldDate","fldClicks").where(col('fldClicks') >= 800).sort("fldClicks").show()

+-------------------+---------+
|            fldDate|fldClicks|
+-------------------+---------+
|2011-11-11 00:00:00|      800|
|2011-11-19 00:00:00|      801|
|2011-07-04 00:00:00|      802|
|2011-07-08 00:00:00|      806|
|2011-07-09 00:00:00|      812|
|2011-11-27 00:00:00|      823|
|2011-11-15 00:00:00|      829|
|2011-12-02 00:00:00|      842|
|2011-07-23 00:00:00|      845|
|2011-11-30 00:00:00|      849|
|2011-11-13 00:00:00|      873|
|2011-08-06 00:00:00|      878|
|2011-11-12 00:00:00|      912|
|2011-11-17 00:00:00|      917|
|2011-11-16 00:00:00|      920|
|2011-11-18 00:00:00|      924|
|2011-11-29 00:00:00|      968|
|2011-11-28 00:00:00|      996|
|2011-11-25 00:00:00|     1026|
|2011-11-26 00:00:00|     1044|
+-------------------+---------+


 
#How many days we got clicks >= 800
#adsense_r4.filter(lambda x: x.fldClicks >= 800).count()
df.select("fldClicks").where(col("fldClicks") >= 800).count()

20


df.filter(col("fldClicks") >= 800).select(F.sum(col("fldClicks")).alias('Sum')).show()
+-----+
|  Sum|
+-----+
|17667|
+-----+

#Total number of clicks all the day 
df.select(F.sum(col("fldClicks")).alias("All clicks count")).show()

+----------------+
|All clicks count|
+----------------+
|          248190|
+----------------+

#Total number of page views all the day 
df.select(F.sum(col("fldPageViews")).alias("Total Page views")).show()

+----------------+
|Total Page views|
+----------------+
|        18626239|
+----------------+

#Total days when page views >= 50000
df.where(col("fldPageViews") >= 50000).count()
#answer : 5

#Total days when page views >= 10000
df.where(col("fldPageViews") >= 10000).count()
#answer : 649



from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()
adsenseDF = spark.read.format("csv").option("header",True).option("inferSchema",True).load("E:\\DataSets\\adsense_data.csv")
adsenseDF.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- Page views: integer (nullable = true)
 |-- Impressions: integer (nullable = true)
 |-- Clicks: integer (nullable = true)
 |-- Page RPM (USD): double (nullable = true)
 |-- Impression RPM (USD): double (nullable = true)
 |-- Active View Viewable: string (nullable = true)
 |-- Estimated earnings (USD): double (nullable = true)
 
 
dfAdsense = adsenseDF.withColumnRenamed("date",'fldDate').withColumnRenamed('Page views','fldPageViews').withColumnRenamed('Impressions','fldImpressions').withColumnRenamed('Clicks','fldClicks').withColumnRenamed('Page RPM (USD)','fldPageRPM').withColumnRenamed('Impression RPM (USD)','fldimpressionRPM').withColumnRenamed("Active View Viewable","fldActiveViewViewable").withColumnRenamed("Estimated earnings (USD)","fldEstimatedEarnings")
dfAdsense.printSchema()

from pyspark.sql.functions import col
df = dfAdsense.filter(col("fldEstimatedEarnings") != 0)
print(df.count())



root
 |-- fldDate: timestamp (nullable = true)
 |-- fldPageViews: integer (nullable = true)
 |-- fldImpressions: integer (nullable = true)
 |-- fldClicks: integer (nullable = true)
 |-- fldPageRPM: double (nullable = true)
 |-- fldimpressionRPM: double (nullable = true)
 |-- fldActiveViewViewable: string (nullable = true)
 |-- fldEstimatedEarnings: double (nullable = true)

1511

from pyspark.sql.functions import hour, mean, year,month,dayofmonth
import pyspark.sql.functions as F

#df.select(year("fldDate"),month("fldDate")).show(5)
#f.select(year("fldDate").alias('year'), month("fldDate").alias('month'), dayofmonth("fldDate").alias('day')).show()
#df.groupBy(year("fldDate").alias("Year")).sum("fldEstimatedEarnings").orderBy("Year").show()

df.groupBy(year("fldDate").alias("Year")).agg(F.round(F.sum('fldEstimatedEarnings'),2).alias('Earnings')).orderBy("Year").show()


+----+--------+
|Year|Earnings|
+----+--------+
|2008|  374.73|
|2009| 1640.68|
|2010| 2883.28|
|2011| 9392.38|
|2012| 2702.84|
|2013|    1.28|
|2014|    0.03|
|2015|    0.21|
|2016|    0.25|
|2017|    0.06|
+----+--------+



from pyspark.sql.functions import hour, mean, year,month,dayofmonth
import pyspark.sql.functions as F
df_re1 = df.groupBy(month('fldDate').alias("Month"),year("fldDate").alias("Year")).agg(F.round(F.sum('fldEstimatedEarnings'),2).alias('Earnings')).orderBy("Year","Month")

+-----+----+--------+
|Month|Year|Earnings|
+-----+----+--------+
|    6|2008|   35.76|
|    7|2008|   82.05|
|    8|2008|   27.17|
|    9|2008|   21.12|
|   10|2008|   65.87|
|   11|2008|   78.38|
|   12|2008|   64.38|
|    1|2009|   130.1|
|    2|2009|   26.69|
|    3|2009|   59.68|
|    4|2009|   94.23|
|    5|2009|  186.15|
|    6|2009|  248.88|
|    7|2009|  190.95|
|    8|2009|    81.7|
|    9|2009|   60.99|
|   10|2009|   206.2|
|   11|2009|  198.68|
|   12|2009|  156.43|
|    1|2010|  101.46|
|    2|2010|  316.61|
|    3|2010|  179.72|
|    4|2010|  142.26|
|    5|2010|  120.05|
|    6|2010|  132.18|
|    7|2010|  134.75|
|    8|2010|  137.77|
|    9|2010|  265.05|
|   10|2010|  267.31|
|   11|2010|   372.2|
|   12|2010|  713.92|
|    1|2011|  478.87|
|    2|2011|  551.54|
|    3|2011|   549.9|
|    4|2011|  683.07|
|    5|2011|  620.94|
|    6|2011|  969.76|
|    7|2011| 1149.31|
|    8|2011| 1057.64|
|    9|2011| 1039.34|
|   10|2011|  966.37|
|   11|2011| 1138.69|
|   12|2011|  186.95|
|    1|2012|    0.48|
|    2|2012|  256.25|
|    3|2012|  564.13|
|    4|2012|  524.13|
|    5|2012|  504.83|
|    6|2012|  370.12|
|    7|2012|   261.1|
|    8|2012|  219.39|
|    9|2012|    2.38|
|   10|2012|    0.02|
|   12|2012|    0.01|
|    1|2013|    0.01|
|    3|2013|    0.25|
|    4|2013|    0.11|
|    5|2013|    0.09|
|    6|2013|    0.25|
|    7|2013|    0.03|
|    9|2013|    0.09|
|   10|2013|    0.32|
|   12|2013|    0.13|
|    2|2014|    0.03|
|    2|2015|    0.08|
|    4|2015|    0.03|
|    7|2015|     0.1|
|    2|2016|    0.01|
|    3|2016|     0.2|
|    9|2016|    0.03|
|   11|2016|    0.01|
|    6|2017|    0.06|
+-----+----+-------- 


df_re1.groupBy("Year").agg(F.round(F.sum("Earnings"),2)).orderBy("Year").show()

+----+-----------------------+
|Year|round(sum(Earnings), 2)|
+----+-----------------------+
|2008|                 374.73|
|2009|                1640.68|
|2010|                2883.28|
|2011|                9392.38|
|2012|                2702.84|
|2013|                   1.28|
|2014|                   0.03|
|2015|                   0.21|
|2016|                   0.25|
|2017|                   0.06|
+----+-----------------------+

Friday, 29 May 2020

Google Adsense Data Analysis using Pyspark RDDs

import collections
from datetime import datetime 

Adsense = collections.namedtuple('Adsense',['fldDate','fldPageViews','fldImpressions','fldClicks','fldPageRPM','fldImpressionRPM','fldActiveViewViewable','fldEstimatedEarnings']) 

def parseAdsense(_row):
fields = _row.split(",")
_date = fields[0]
_pageViews = (int) (fields[1])
_impressions = (int) (fields[2])
_clicks = (int) (fields[3])
_pageRPM  =  fields[4]
_impressionRPM =  fields[5]
_activeviewviewable = fields[6]
_estimatedEarnings = (float) (fields[7])
_adsense = Adsense(_date,_pageViews,_impressions,_clicks,_pageRPM,_impressionRPM,_activeviewviewable,_estimatedEarnings)
return _adsense

#print(parseMovie("1::Toy Story (1995)::Animation|Children's|Comedy"))
adsense_r1 = spark.sparkContext.textFile("E:\\DataSets\\adsense_data.csv")
#print(adsense_r1.count())
#for j in adsense_r1.collect():
#    print(j)
headerInfo  = adsense_r1.first()
#print(headerInfo)

adsense_r2 = adsense_r1.filter(lambda x: headerInfo not in x)
#print(adsense_2.first())
#print(adsense_2.count())
#for j in adsense_r2.collect():
#    print(j)


adsense_r3 = adsense_r2.map(lambda x:(str)(x)).map(parseAdsense)
print(adsense_r3.count())

adsense_r4 = adsense_r3.filter(lambda x: x.fldEstimatedEarnings != 0)
print(adsense_r4.count())


2452
1511

#Find the total Earnings 
totalEarnings = adsense_r4.map(lambda x: x.fldEstimatedEarnings).reduce(lambda x,y : x+y)

print(totalEarnings) #16995.739999999994


#Find the top 10 Earnings without Date
adsense_r4.map(lambda x: x.fldEstimatedEarnings).sortBy(lambda x: x, False).take(10)

[63.71, 54.14, 53.77, 52.73, 52.14, 50.86, 49.63, 48.91, 48.8, 48.58]

#Find the top 10 Earnings with Date
adsense_r4.map(lambda x: (x.fldDate, x.fldEstimatedEarnings)).sortBy(lambda x: x[1], False).take(10)

[('2010-02-25', 63.71),
 ('2011-11-28', 54.14),
 ('2010-12-18', 53.77),
 ('2011-11-21', 52.73),
 ('2010-12-19', 52.14),
 ('2011-07-13', 50.86),
 ('2011-12-02', 49.63),
 ('2011-06-24', 48.91),
 ('2011-09-27', 48.8),
 ('2011-07-04', 48.58)]
 
 
 #Find the least 10 Earnings without Date
adsense_r4.map(lambda x: x.fldEstimatedEarnings).sortBy(lambda x: x, True).take(10)

[0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01]


#Find the least 10 Earnings with Date
adsense_r4.map(lambda x: (x.fldDate, x.fldEstimatedEarnings)).sortBy(lambda x: x[1], True).take(10)

[('2009-02-09', 0.01),
 ('2009-02-17', 0.01),
 ('2011-12-10', 0.01),
 ('2009-03-07', 0.01),
 ('2011-12-13', 0.01),
 ('2008-09-11', 0.01),
 ('2008-09-08', 0.01),
 ('2008-09-03', 0.01),
 ('2008-09-05', 0.01),
 ('2011-12-06', 0.01)]

#in which dates we got Estimated Earnings >= 40
adsense_r4.filter(lambda x: x.fldEstimatedEarnings >= 40).map(lambda x: (x.fldDate, x.fldEstimatedEarnings)).sortBy(lambda x: x[1], False).collect()

[('2010-02-25', 63.71),
 ('2011-11-28', 54.14),
 ('2010-12-18', 53.77),
 ('2011-11-21', 52.73),
 ('2010-12-19', 52.14),
 ('2011-07-13', 50.86),
 ('2011-12-02', 49.63),
 ('2011-06-24', 48.91),
 ('2011-09-27', 48.8),
 ('2011-07-04', 48.58),
 ('2011-06-23', 47.93),
 ('2011-07-12', 46.85),
 ('2011-11-30', 46.69),
 ('2011-07-08', 46.45),
 ('2010-12-14', 45.68),
 ('2011-06-26', 45.5),
 ('2011-10-19', 45.22),
 ('2011-06-30', 45.02),
 ('2011-06-22', 44.82),
 ('2011-06-25', 44.58),
 ('2011-07-16', 44.56),
 ('2011-07-09', 44.45),
 ('2011-11-23', 44.23),
 ('2011-07-27', 44.0),
 ('2011-11-22', 43.78),
 ('2011-09-21', 43.59),
 ('2011-11-24', 43.48),
 ('2011-07-11', 43.33),
 ('2011-11-29', 43.27),
 ('2011-07-05', 43.08),
 ('2011-09-18', 43.02),
 ('2011-11-20', 42.98),
 ('2011-09-13', 42.29),
 ('2011-09-17', 42.28),
 ('2011-08-09', 42.17),
 ('2011-10-27', 42.08),
 ('2011-09-20', 41.96),
 ('2011-07-23', 41.6),
 ('2011-09-23', 41.5),
 ('2011-11-15', 41.43),
 ('2011-10-18', 41.4),
 ('2011-07-14', 41.36),
 ('2011-07-10', 41.21),
 ('2011-11-17', 41.15),
 ('2011-09-19', 40.69),
 ('2011-11-26', 40.49),
 ('2011-08-05', 40.05)]
 
 
 How many days we got Estimated Earnings >= 40?
 
 adsense_r4.filter(lambda x: x.fldEstimatedEarnings >= 40).count()


47


>10 and <= 50
adsense_r4.filter(lambda x: x.fldEstimatedEarnings >= 10 and x.fldEstimatedEarnings <= 50).map(lambda x: (x.fldDate, x.fldEstimatedEarnings)).sortBy(lambda x: x[1], False).collect()


>10 and <= 50 - count
adsense_r4.filter(lambda x: x.fldEstimatedEarnings >= 10 and x.fldEstimatedEarnings <= 50).count()

583



#top 10 page views 

adsense_r4.map(lambda x: (x.fldDate, x.fldPageViews)).sortBy(lambda x:x[1],False).take(10)
[('2010-02-25', 61358),
 ('2011-07-03', 60066),
 ('2011-04-03', 59079),
 ('2011-07-04', 55221),
 ('2011-11-21', 50808),
 ('2011-07-08', 49545),
 ('2011-11-20', 49020),
 ('2011-11-13', 48613),
 ('2011-11-28', 48360),
 ('2011-11-22', 47939)]
 
 #top 10 clicks 
 adsense_r4.map(lambda x: (x.fldDate, x.fldClicks)).sortBy(lambda x:x[1],False).take(10)
 
 [('2011-11-26', 1044),
 ('2011-11-25', 1026),
 ('2011-11-28', 996),
 ('2011-11-29', 968),
 ('2011-11-18', 924),
 ('2011-11-16', 920),
 ('2011-11-17', 917),
 ('2011-11-12', 912),
 ('2011-08-06', 878),
 ('2011-11-13', 873)]
 
 # no of clicks >= 800 - Descending order
 adsense_r4.filter(lambda x: x.fldClicks >= 800).map(lambda x: (x.fldDate, x.fldClicks)).sortBy(lambda x: x[1],False).collect()
 
[('2011-11-26', 1044),
 ('2011-11-25', 1026),
 ('2011-11-28', 996),
 ('2011-11-29', 968),
 ('2011-11-18', 924),
 ('2011-11-16', 920),
 ('2011-11-17', 917),
 ('2011-11-12', 912),
 ('2011-08-06', 878),
 ('2011-11-13', 873),
 ('2011-11-30', 849),
 ('2011-07-23', 845),
 ('2011-12-02', 842),
 ('2011-11-15', 829),
 ('2011-11-27', 823),
 ('2011-07-09', 812),
 ('2011-07-08', 806),
 ('2011-07-04', 802),
 ('2011-11-19', 801),
 ('2011-11-11', 800)]
 
 #no of clicks >= 800 - Ascending Order
 
 adsense_r4.filter(lambda x: x.fldClicks >= 800).map(lambda x: (x.fldDate, x.fldClicks)).sortBy(lambda x: x[1],True).collect()
 
 [('2011-11-11', 800),
 ('2011-11-19', 801),
 ('2011-07-04', 802),
 ('2011-07-08', 806),
 ('2011-07-09', 812),
 ('2011-11-27', 823),
 ('2011-11-15', 829),
 ('2011-12-02', 842),
 ('2011-07-23', 845),
 ('2011-11-30', 849),
 ('2011-11-13', 873),
 ('2011-08-06', 878),
 ('2011-11-12', 912),
 ('2011-11-17', 917),
 ('2011-11-16', 920),
 ('2011-11-18', 924),
 ('2011-11-29', 968),
 ('2011-11-28', 996),
 ('2011-11-25', 1026),
 ('2011-11-26', 1044)]
 
 #How many days we got clicks >= 800
 adsense_r4.filter(lambda x: x.fldClicks >= 800).count()
 
 20
 
 #Total number of clicks where individual day's click >= 800
 adsense_r4.filter(lambda x: x.fldClicks >= 800).map(lambda x: x.fldClicks).reduce(lambda a,b:a+b)
 17667

#Total number of clicks all the day 
adsense_r4.map(lambda x: x.fldClicks).reduce(lambda m,n: m+n)

248190



#Total number of page views all the day 
adsense_r4.map(lambda x: x.fldPageViews).reduce(lambda m,n: m+n)

18626239

#Total days when page views >= 50000
adsense_r4.filter(lambda x: x.fldPageViews >= 50000).count()
5

#Total days when page views >= 10000
adsense_r4.filter(lambda x: x.fldPageViews >= 10000).count()

649

Wednesday, 27 May 2020

Repartition vs Coalesce

Repartition vs coalesce

Repartition - shuffling infolved, less performance
    - increase or decrease the number of partitions allowed

coalesce - increasing the partition is not allowed
- we can only decrease the no of partition

e1 = spark.sparkContext.textFile("E:\\DataSets\\olympix_data.csv")
print(e1.getNumPartitions()) #2

e2 = e1.repartition(5)
print(e2.getNumPartitions()) #5

e3 = e1.repartition(2)
print(e3.getNumPartitions()) #2

c1 = e1.coalesce(2)
print(c1.getNumPartitions()) #2

c2 = e1.coalesce(10)
print(c2.getNumPartitions()) #2
 

2
5
2
2
2


e1 = spark.sparkContext.textFile("E:\\DataSets\\olympix_data.csv")
football = e1.repartition(4).filter(lambda x: 'Football' in x)
wrestling = e1.repartition(5).filter(lambda x: 'Wrestling' in x)
weightlifting = e1.repartition(6).filter(lambda x: 'Weightlifting' in x)

print(e1.getNumPartitions()) #2
print(football.getNumPartitions()) #4
print(wrestling.getNumPartitions()) #5
print(weightlifting.getNumPartitions()) #6

2
4
5
6

#Repartition with Union
unionResult = football.union(wrestling).union(weightlifting)
print(unionResult.getNumPartitions()) #15

#Repartition with Intersection
intersectionResult = football.intersection(wrestling).intersection(weightlifting)
print(intersectionResult.getNumPartitions()) #15


e1 = spark.sparkContext.textFile("E:\\DataSets\\olympix_data.csv")
football = e1.repartition(4).filter(lambda x: 'Football' in x)
wrestling = e1.repartition(5).filter(lambda x: 'Wrestling' in x)
weightlifting = e1.repartition(6).filter(lambda x: 'Weightlifting' in x)

subt = e1.subtract(football)
print(subt.getNumPartitions()) #6

cart = football.cartesian(wrestling)
print(cart.getNumPartitions()) #20





mapValues vs flatMapvalues in Pyspark

d1 = spark.sparkContext.parallelize( [('a',(20,30,40,50)), ('b',(1,2,3)) ])
print(d1.count())  #2

print(d1.flatMapValues(lambda x:x).collect()) 
[('a', 20), ('a', 30), ('a', 40), ('a', 50), ('b', 1), ('b', 2), ('b', 3)]


print(d1.map(lambda x:( x[0], len(x[1]))).collect())
[('a', 4), ('b', 3)]


print(d1.mapValues(lambda x:len(x)).collect())
[('a', 4), ('b', 3)]


keyBy Example:

d1 = spark.sparkContext.parallelize([(101,'Vijay','BTech'),(102,'Balaji','Bsc'), (103,'Arun')] )
print(d1.collect())

#[(101, 'Vijay', 'BTech'), (102, 'Balaji', 'Bsc'), (103, 'Arun')]

d2 = d1.keyBy(lambda x:x[0])
print(d2.collect())

#[(101, (101, 'Vijay', 'BTech')), (102, (102, 'Balaji', 'Bsc')), (103, (103, 'Arun'))]


Join operation in RDD - Pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()

r1 = spark.sparkContext.parallelize([ ('a',10),('b',5),('c',15),('d',12),('a',10),('b',30)])
r2 = spark.sparkContext.parallelize([ ('a',50),('b',15),('c',10),('d',15),('e',12),('c',10),('a',30)])

re1 = r1.join(r2)
print(re1.collect())

[('a', (10, 50)), ('a', (10, 30)), ('a', (10, 50)), ('a', (10, 30)), ('b', (5, 15)), ('b', (30, 15)), ('c', (15, 10)), ('c', (15, 10)), ('d', (12, 15))] 







from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()

r1 = spark.sparkContext.parallelize([ ('a',1),('b',5),('c',4)])
r2 = spark.sparkContext.parallelize([ ('a',2),('b',7),('c',7)])

re1 = r1.join(r2)
print(re1.collect())

[('a', (1, 2)), ('b', (5, 7)), ('c', (4, 7))]







from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()

r1 = spark.sparkContext.parallelize([ ('a',1),('a','6'),('b',5),('b',8),('c',4),('c',9)])
r2 = spark.sparkContext.parallelize([ ('a',2),('b',7),('c',7)])

re1 = r1.join(r2)
print(re1.collect())

[('a', (1, 2)), ('a', ('6', 2)), ('b', (5, 7)), ('b', (8, 7)), ('c', (4, 7)), ('c', (9, 7))]






from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()

r1 = spark.sparkContext.parallelize([ ('a',1),('a',2),('a',3),('b',5),('b',8),('b',9),('c',4),('c',9),('c',3)])
r2 = spark.sparkContext.parallelize([ ('a',2),('b',7),('c',7)])

re1 = r1.join(r2)
print(re1.collect())


[('a', (1, 2)), ('a', (2, 2)), ('a', (3, 2)), 
('b', (5, 7)), ('b', (8, 7)), ('b', (9, 7)), 
('c', (4, 7)), ('c', (9, 7)), ('c', (3, 7))]

Aggregation operations with Pair RDDs in Pyspark

Pair RDD Aggregations:

myList=[('a',10),('b',20),('c',15),('d',25),('a',30),('b',26),('c',10),('a',10),('d',10)]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())

def f1(x) : yield list(x)
    
r2 = r1.mapPartitions(lambda x:f1(x)) 
print(r2.collect())

[('a', 10), ('b', 20), ('c', 15), ('d', 25), ('a', 30), ('b', 26), ('c', 10), ('a', 10), ('d', 10)]
[[('a', 10), ('b', 20), ('c', 15)], [('d', 25), ('a', 30), ('b', 26)], [('c', 10), ('a', 10), ('d', 10)]]


re1 = r1.countByKey()
print(re1)


defaultdict(<class 'int'>, {'a': 3, 'b': 2, 'c': 2, 'd': 2})


re2 = r1.countByValue()
print(re2)

defaultdict(<class 'int'>, {('a', 10): 2, ('b', 20): 1, ('c', 15): 1, ('d', 25): 1, ('a', 30): 1, ('b', 26): 1, ('c', 10): 1, ('d', 10): 1})


re3 = r1.sortByKey()
print(re3.collect())

[('a', 10), ('a', 30), ('a', 10), ('b', 20), ('b', 26), ('c', 15), ('c', 10), ('d', 25), ('d', 10)]


re4  = r1.reduceByKey(lambda x,y : x+y)
print(re4.collect())

[('d', 35), ('b', 46), ('a', 50), ('c', 25)]




myList = [1,2,3,4,5,6,7,8]
r = spark.sparkContext.parallelize(myList,3)
re1 = r.count()
print(re1)

8


myList=[('a',10),('b',20),('c',15),('d',25),('a',30),('b',26),('c',10),('a',10),('d',10)]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())
re4  = r1.reduceByKey(lambda x,y : x+y)
print(re4.sortByKey().collect())

[('a', 10), ('b', 20), ('c', 15), ('d', 25), ('a', 30), ('b', 26), ('c', 10), ('a', 10), ('d', 10)]
[('a', 50), ('b', 46), ('c', 25), ('d', 35)]


myList=[('a',10),('b',20),('c',15),('d',25),('a',30),('b',26),('c',10),('a',10),('d',10)]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())

def f1(x) : yield list(x)
    
r2 = r1.mapPartitions(lambda x:f1(x)) 
print(r2.collect())

re1 = r1.reduceByKey(lambda x,y : x+y)
print(re1.collect())

re2 = re1.sortByKey().collect()
print(re2)

[('a', 10), ('b', 20), ('c', 15), ('d', 25), ('a', 30), ('b', 26), ('c', 10), ('a', 10), ('d', 10)]
[[('a', 10), ('b', 20), ('c', 15)], [('d', 25), ('a', 30), ('b', 26)], [('c', 10), ('a', 10), ('d', 10)]]
[('d', 35), ('b', 46), ('a', 50), ('c', 25)]
[('a', 50), ('b', 46), ('c', 25), ('d', 35)]


re3 = r1.foldByKey(2,lambda x,y : x+y)
print(re3.sortByKey().collect())

[('a', 56), ('b', 50), ('c', 29), ('d', 39)]


Aggregate operations with RDDs in Pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("aggregations").getOrCreate()

myList = [1,2,3,4,5,6,7,8,9,10,11]
r1 = spark.sparkContext.parallelize(myList,3)

print(r1.getNumPartitions())
#answer : 3

def f1(x): yield list(x)

def f2(x) : yield sum(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())
#answer : [[1, 2, 3], [4, 5, 6], [7, 8, 9, 10, 11]]


r3 = r1.mapPartitions(lambda x: f2(x))
print(r3.collect())
#answer : [6, 15, 45]


r4 = r1.coalesce(1)
print(r4.getNumPartitions())
#answer : 1

r4.reduce(lambda x,y : x+y)
#answer : 66

r1.reduce(lambda a,b: a-b)
#answer : 34




#find Min and Max of a given list

def findMax(x,y):
if x > y:
return x
else:
return y
        
def findMin(x,y):
if x < y:
return x
else:
return x
myList = [1,2,3,4,5,6,7,8,9,10,11]
r1 = spark.sparkContext.parallelize(myList)
a3 = r1.reduce(lambda x,y : findMax(x,y))
print(a3)
#answer : 11

a4 = r1.reduce(lambda x,y : findMin(x,y))
print(a4)
#answer : 1


Fold example:

myList = [5,10]
r1 = spark.sparkContext.parallelize(myList)

a1 = r1.reduce(lambda x,y : x+y)
print(a1)

a3 = r1.fold(5,lambda x,y : x+y) # (5+5) + (5+10+10) + 5
print(a3)

a4 = r1.fold(0,lambda x,y : x+y) #(0+5) + (0+5+10) + 0
print(a4)


5+5 => 10+10 => 20+5



myList = ["Arun","Vijay","Kalai","Mani","Nila","Raji","Waran","Bill"]
r1 = spark.sparkContext.parallelize(myList,3)

def f1(x): yield list(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())

#answer: [['Arun', 'Vijay'], ['Kalai', 'Mani'], ['Nila', 'Raji', 'Waran', 'Bill']]



myList = [20000,12000,30000,25000,42000,10000]
r1 = spark.sparkContext.parallelize(myList,2)

def f1(x): yield list(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())

#answer : [[20000, 12000, 30000], [25000, 42000, 10000]]

def findMin(x,y):
if x <= y:
return x
else:
return y
rd3 = r1.fold(5000,lambda x,y:findMin(x,y))
print(rd3)

[[20000, 12000, 30000], [25000, 42000, 10000]]
5000

myList = [20000,12000,30000,25000,42000,10000]
r1 = spark.sparkContext.parallelize(myList,2)

def f1(x): yield list(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())

#answer : [[2000, 1000, 3000], [2500, 4200, 1000]]

def findMin(x,y):
if x <= y:
return x
else:
return y
rd3 = r1.fold(42005,lambda x,y:findMin(x,y))
print(rd3)

[[20000, 12000, 30000], [25000, 42000, 10000]]
10000


myList = [1,2,3,4,5,6,7,8]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())
def f1(x): yield list(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())

[1, 2, 3, 4, 5, 6, 7, 8]
[[1, 2], [3, 4], [5, 6], [7, 8]]


c1 = r1.aggregate(2,(lambda x,y:x+y), (lambda x,y:x-y))
print(c1)

-42





myList = [1,2,3,4,5,6,7,8]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())
def f1(x): yield list(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())
[1, 2, 3, 4, 5, 6, 7, 8]
[[1, 2], [3, 4], [5, 6, 7, 8]]

c1 = r1.aggregate(2,(lambda x,y:x+y), (lambda x,y:x-y))
print(c1)

-40



reduce(f1)
f1 will be applied on partitions and results of partitions
fold(zv,f1) 
f1 will be applied on partitions and results of partitions with zv
aggregate(zv,f1,f2)
f1 will be applied on partitions with zv and
f2 will be applied on the results of partitions with zv


Tuesday, 26 May 2020

groupBy Example in Pyspark


from pyspark.sql.functions import count
df = spark.read.format("csv").option("header",True).\
            option("inferSchema",True).\
            load("E:\\vow\\CancerData10.csv")

#df.printSchema()
#df.show(5)

df1 = df.select("ID","Age","State","Sex")

df2 = df1.groupBy("Sex").agg(count('Sex').alias("Count"))

df2.show()

+---+-----+
|Sex|Count|
+---+-----+
|  F|    4|
|  M|    5|
+---+-----+

Call Log data program using Pyspark

from pyspark.sql import SparkSession
from pyspark import StorageLevel

spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()
r1 = spark.sparkContext.textFile("E:\\vow\\calllogdata.txt")
#r2 = r1.map(lambda x:x.encode('utf-8'))
r1.persist(StorageLevel.MEMORY_ONLY)
r3 = r1.filter(lambda x: 'SUCCESS' in x)
print(r3.count())
r4 = r1.filter(lambda x: 'FAILED' in x)
print(r4.count())

21
6

Word Count program using Pyspark

#Word count program using Pyspark
r1 = spark.sparkContext.textFile("E:\\vow\\email.txt")
r2 = r1.map(lambda x:x.encode('utf-8'))
r2 = r1.flatMap(lambda x:x.split(" ")).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
for j in r2.collect():
    print(j)
r2.coalesce(1).saveAsTextFile("E:\\vow\\emailwordcount.txt")

('acebook', 1)
('Session', 1)
('Developers', 2)
('', 10)
('The', 1)
('world', 1)
('is', 3)
..... 

Most Viewed, Most Rated Movies Analysis using Pyspark

Movie Data Analysis using Pyspark

movies.dat:
E:\\vow\\ml-1m\\movies.dat
MovieID,Title,Genres
1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
ratings.dat:
E:\\vow\\ml-1m\\ratings.dat
UserID,MovieID,Rating,Timestamp
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
users.dat:
E:\\vow\\ml-1m\\users.dat
UserID,Gender,Age,Occupation,ZipCode
1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
Linux location:
hdfs dfs -cat hdfs://localhost:9000/SparkFiles/movies.dat
hdfs dfs -cat hdfs://localhost:9000/SparkFiles/ratings.dat
hdfs dfs -cat hdfs://localhost:9000/SparkFiles/users.dat
Requirement:
Top 10 most viewed movies with their movie names (Z-A, A-Z)
Top 20 rated movies (The movie should be rated / viewed by at least 40 users)
How the Genres ranked by Average rating for each profession,  and age group

import collections
from datetime import datetime

Movie = collections.namedtuple('Movie',['MovieID','Title','Genres']) 

def parseMovie(_row):
fields = _row.split("::")
movieid = (int) (fields[0])
title = fields[1]
genres = fields[2]
M = Movie(movieid,title,genres)
return M

#print(parseMovie("1::Toy Story (1995)::Animation|Children's|Comedy"))
movies_r1 = spark.sparkContext.textFile("hdfs://localhost:9000/SparkFiles/movies.dat")
movies_r2 = movies_r1.map(lambda x:(str)(x)).map(parseMovie)
#movies_r2.take(2)

#[Movie(MovieID=1, Title='Toy Story (1995)', Genres="Animation|Children's|Comedy"),
#Movie(MovieID=2, Title='Jumanji (1995)', Genres="Adventure|Children's|Fantasy")]
 
 


Rating = collections.namedtuple('Rating',['UserID','MovieID','Rating','Timestamp']) 

def parseRatingRecord(_row):
fields = _row.split("::")
userid = (int) (fields[0])
movieid = (int) (fields[1])
rating = (int) (fields[2])
_timestamp = datetime.fromtimestamp((int) (fields[3]))
_rating = Rating(userid,movieid,rating,_timestamp)
return _rating

#print(parseRatingRecord("1::1193::5::978300760"))

ratings_r1 = spark.sparkContext.textFile("hdfs://localhost:9000/SparkFiles/ratings.dat")
ratings_r2 = ratings_r1.map(lambda x:(str)(x)).map(parseRatingRecord)
#ratings_r2.take(2)


User = collections.namedtuple("Users",["UserID","Gender","Age","Occupation","ZipCode"])

def parseUserRecord(_row):
fields = _row.split("::")
userid = (int)(fields[0])
gender= fields[1]
age = (int) (fields[2])
occupation = (int) (fields[3])
zipcode = (int) (fields[4])
_user = User(userid,gender,age,occupation,zipcode) 
return _user

#print(parseUserRecord("3::M::25::15::55117"))            

users_r1 = spark.sparkContext.textFile("hdfs://localhost:9000/SparkFiles/users.dat")
users_r2 = users_r1.map(lambda x:(str)(x)).map(parseUserRecord)
#users_r2.take(2)

#[Users(UserID=1, Gender='F', Age=1, Occupation=10, ZipCode=48067),
# Users(UserID=2, Gender='M', Age=56, Occupation=16, ZipCode=70072)]
 
 
dfMovies  = movies_r2.toDF()
dfRatings = ratings_r2.toDF()
dfUsers = users_r2.toDF()
 
dfMovies.printSchema()
dfRatings.printSchema()
dfUsers.printSchema()

dfMovies.printSchema()
root
 |-- MovieID: long (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genres: string (nullable = true)
 
 
dfRatings.printSchema()
root
 |-- UserID: long (nullable = true)
 |-- MovieID: long (nullable = true)
 |-- Rating: long (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 
 
dfUsers.printSchema()
root
 |-- UserID: long (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Occupation: long (nullable = true)
 |-- ZipCode: long (nullable = true)
 
dfMovies.show(5)
+-------+--------------------+--------------------+
|MovieID|               Title|              Genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Animation|Childre...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|        Comedy|Drama|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+

dfRatings.show(5)
+------+-------+------+-------------------+
|UserID|MovieID|Rating|          Timestamp|
+------+-------+------+-------------------+
|     1|   1193|     5|2001-01-01 03:42:40|
|     1|    661|     3|2001-01-01 04:05:09|
|     1|    914|     3|2001-01-01 04:02:48|
|     1|   3408|     4|2001-01-01 03:34:35|
|     1|   2355|     5|2001-01-07 05:08:11|
+------+-------+------+-------------------+

dfUsers.show(5)
+------+------+---+----------+-------+
|UserID|Gender|Age|Occupation|ZipCode|
+------+------+---+----------+-------+
|     1|     F|  1|        10|  48067|
|     2|     M| 56|        16|  70072|
|     3|     M| 25|        15|  55117|
|     4|     M| 45|         7|   2460|
|     5|     M| 25|        20|  55455|
+------+------+---+----------+-------+



#Joining Movies and Ratings tables

from pyspark.sql.functions import col

dfMovies_Ratings = dfMovies.join(dfRatings,dfMovies["MovieID"] == dfRatings["MovieID"],"inner")
dfMovies_Ratings.printSchema()
dfMovies_Ratings.show(5)

root
 |-- MovieID: long (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- UserID: long (nullable = true)
 |-- MovieID: long (nullable = true)
 |-- Rating: long (nullable = true)
 |-- Timestamp: timestamp (nullable = true)

+-------+--------------+------+------+-------+------+-------------------+
|MovieID|         Title|Genres|UserID|MovieID|Rating|          Timestamp|
+-------+--------------+------+------+-------+------+-------------------+
|     26|Othello (1995)| Drama|    18|     26|     4|2000-12-30 11:52:15|
|     26|Othello (1995)| Drama|    69|     26|     4|2000-12-27 07:24:10|
|     26|Othello (1995)| Drama|   229|     26|     4|2002-12-10 12:29:33|
|     26|Othello (1995)| Drama|   342|     26|     4|2000-12-09 10:41:17|
|     26|Othello (1995)| Drama|   524|     26|     3|2000-12-07 11:33:32|
+-------+--------------+------+------+-------+------+-------------------+
 
 
#Solution : 1
from pyspark.sql.functions import col,count,desc
dfMovies_Ratings2 = dfMovies_Ratings.select("Title","UserID").groupBy("Title").agg(count("UserID").alias("Views")).orderBy(desc("Views")).limit(20)
dfMovies_Ratings2.show()

+--------------------+-----+
|               Title|Views|
+--------------------+-----+
|American Beauty (...| 3428|
|Star Wars: Episod...| 2991|
|Star Wars: Episod...| 2990|
|Star Wars: Episod...| 2883|
|Jurassic Park (1993)| 2672|
|Saving Private Ry...| 2653|
|Terminator 2: Jud...| 2649|
|  Matrix, The (1999)| 2590|
|Back to the Futur...| 2583|
|Silence of the La...| 2578|
| Men in Black (1997)| 2538|
|Raiders of the Lo...| 2514|
|        Fargo (1996)| 2513|
|Sixth Sense, The ...| 2459|
|   Braveheart (1995)| 2443|
|Shakespeare in Lo...| 2369|
|Princess Bride, T...| 2318|
|Schindler's List ...| 2304|
|L.A. Confidential...| 2288|
|Groundhog Day (1993)| 2278|
+--------------------+-----+


dfMovies_Ratings2.printSchema()
root
 |-- Title: string (nullable = true)
 |-- Views: long (nullable = false)


#SparkSQL approach

dfMovies_Ratings.createOrReplaceTempView("MovieRatingsView")
dfdfMovies_Ratings2 =spark.sql("select Title,count(UserID) as Views from MovieRatingsView group by Title order by Views desc limit 20")
dfMovies_Ratings2.show()

dfMovies_Ratings2.show()

+--------------------+-----+
|               Title|Views|
+--------------------+-----+
|American Beauty (...| 3428|
|Star Wars: Episod...| 2991|
|Star Wars: Episod...| 2990|
|Star Wars: Episod...| 2883|
|Jurassic Park (1993)| 2672|
|Saving Private Ry...| 2653|
|Terminator 2: Jud...| 2649|
|  Matrix, The (1999)| 2590|
|Back to the Futur...| 2583|
|Silence of the La...| 2578|
| Men in Black (1997)| 2538|
|Raiders of the Lo...| 2514|
|        Fargo (1996)| 2513|
|Sixth Sense, The ...| 2459|
|   Braveheart (1995)| 2443|
|Shakespeare in Lo...| 2369|
|Princess Bride, T...| 2318|
|Schindler's List ...| 2304|
|L.A. Confidential...| 2288|
|Groundhog Day (1993)| 2278|
+--------------------+-----+

#Write the output into hdfs
dfMovies_Ratings2.coalesce(1).write.format("csv").option("header",True).save("hdfs://localhost:9000/SparkFiles/dfMoviesRatings")

hadoop@hadoop:~$ hdfs dfs -cat hdfs://localhost:9000/SparkFiles/dfMoviesRatings/part-00000-3035084e-127a-4227-93f0-1acfc66b97ec-c000.csv

American Beauty (1999),3428
Star Wars: Episode IV - A New Hope (1977),2991
Star Wars: Episode V - The Empire Strikes Back (1980),2990
Star Wars: Episode VI - Return of the Jedi (1983),2883
Jurassic Park (1993),2672
Saving Private Ryan (1998),2653
Terminator 2: Judgment Day (1991),2649
"Matrix, The (1999)",2590
Back to the Future (1985),2583
"Silence of the Lambs, The (1991)",2578
Men in Black (1997),2538
Raiders of the Lost Ark (1981),2514
Fargo (1996),2513
"Sixth Sense, The (1999)",2459
Braveheart (1995),2443
Shakespeare in Love (1998),2369
"Princess Bride, The (1987)",2318
Schindler's List (1993),2304
L.A. Confidential (1997),2288
Groundhog Day (1993),2278

#Solution : 2
#Top 10 Rated Movies
from pyspark.sql.functions import avg

result2 = dfMovies_Ratings.select("Title","Rating","UserID").groupBy("Title").agg(count("UserID").alias("Views"),avg("Rating").alias("AvgRating"))
result3 = result2.filter("Views >= 40").orderBy(desc("AvgRating"))
result3.show(10)

+--------------------+-----+-----------------+
|               Title|Views|        AvgRating|
+--------------------+-----+-----------------+
|      Sanjuro (1962)|   69|4.608695652173913|
|Seven Samurai (Th...|  628|4.560509554140127|
|Shawshank Redempt...| 2227|4.554557700942973|
|Godfather, The (1...| 2223|4.524966261808367|
|Close Shave, A (1...|  657| 4.52054794520548|
|Usual Suspects, T...| 1783|4.517106001121705|
|Schindler's List ...| 2304|4.510416666666667|
|Wrong Trousers, T...|  882|4.507936507936508|
|Sunset Blvd. (a.k...|  470|4.491489361702127|
|Raiders of the Lo...| 2514|4.477724741447892|
+--------------------+-----+-----------------+


from pyspark.sql.functions import avg

result2 = dfMovies_Ratings.select("Title","Rating","UserID").groupBy("Title").agg(count("UserID").alias("Views"),avg("Rating").alias("AvgRating"))
result3 = result2.filter("Views >= 40").orderBy(desc("AvgRating"))
#result3.show(10)

#write the output into hdfs
result3.coalesce(1).write.format("csv").option("header",True).save("hdfs://localhost:9000/SparkFiles/top10Movies/")


hadoop@hadoop:~$ hdfs dfs -cat hdfs://localhost:9000/SparkFiles/dfMoviesRatings/part-00000-3035084e-127a-4227-93f0-1acfc66b97ec-c000.csv
Title,Views
American Beauty (1999),3428
Star Wars: Episode IV - A New Hope (1977),2991
Star Wars: Episode V - The Empire Strikes Back (1980),2990
Star Wars: Episode VI - Return of the Jedi (1983),2883
Jurassic Park (1993),2672
Saving Private Ryan (1998),2653
Terminator 2: Judgment Day (1991),2649
"Matrix, The (1999)",2590


Movies, Ratings, Users Data analysis using Pyspark using namedTuple

import collections 
Movie = collections.namedtuple('Movie',['MovieID','Title','Genres']) 

def parseMovie(_row):
fields = _row.split("::")
movieid = (int) (fields[0])
title = fields[1]
genres = fields[2]
M = Movie(movieid,title,genres)
return M

#print(parseMovie("1::Toy Story (1995)::Animation|Children's|Comedy"))
movies_r1 = spark.sparkContext.textFile("E:\\vow\\ml-1m\\movies.dat")
movies_r2 = movies_r1.map(lambda x:(str)(x)).map(parseMovie)
movies_r2.take(2)

[Movie(MovieID=1, Title='Toy Story (1995)', Genres="Animation|Children's|Comedy"),
 Movie(MovieID=2, Title='Jumanji (1995)', Genres="Adventure|Children's|Fantasy")]
 
 


import collections 
from datetime import datetime

Rating = collections.namedtuple('Rating',['UserID','MovieID','Rating','Timestamp']) 

def parseRatingRecord(_row):
fields = _row.split("::")
userid = (int) (fields[0])
movieid = (int) (fields[1])
rating = (int) (fields[2])
_timestamp = datetime.fromtimestamp((int) (fields[3]))
_rating = Rating(userid,movieid,rating,_timestamp)
return _rating

#print(parseRatingRecord("1::1193::5::978300760"))

ratings_r1 = spark.sparkContext.textFile("E:\\vow\\ml-1m\\ratings.dat")
ratings_r2 = ratings_r1.map(lambda x:(str)(x)).map(parseRatingRecord)
ratings_r2.take(2)


[Rating(UserID=1, MovieID=1193, Rating=5, Timestamp=datetime.datetime(2001, 1, 1, 3, 42, 40)),
 Rating(UserID=1, MovieID=661, Rating=3, Timestamp=datetime.datetime(2001, 1, 1, 4, 5, 9))]
 
 
 
import collections
User = collections.namedtuple("Users",["UserID","Gender","Age","Occupation","ZipCode"])

def parseUserRecord(_row):
fields = _row.split("::")
userid = (int)(fields[0])
gender= fields[1]
age = (int) (fields[2])
occupation = (int) (fields[3])
zipcode = (int) (fields[4])
_user = User(userid,gender,age,occupation,zipcode) 
return _user

#print(parseUserRecord("3::M::25::15::55117"))            

users_r1 = spark.sparkContext.textFile("E:\\vow\\ml-1m\\users.dat")
userss_r2 = users_r1.map(lambda x:(str)(x)).map(parseUserRecord)
userss_r2.take(2)
                        
[Users(UserID=1, Gender='F', Age=1, Occupation=10, ZipCode=48067),
 Users(UserID=2, Gender='M', Age=56, Occupation=16, ZipCode=70072)]
                        

NamedTuple Example in Python

import collections 
Movie = collections.namedtuple('Movie',['MovieID','Title','Genres'])  #NamedTuple Example


M = Movie('1','Toy Story (1995)',"Animation|Children's|Comedy") 

print(M[0])
print(M[1])
print(M.MovieID)
print(M.Title)
print(M.Genres)

1
Toy Story (1995)
1
Toy Story (1995)
Animation|Children's|Comedy


#NamedTuple Example : #1

import collections 
Movie = collections.namedtuple('Movie',['MovieID','Title','Genres']) 

def parseMovie(_row):
fields = _row.split("::")
movieid = (int) (fields[0])
title = fields[1]
genres = fields[2]
M = Movie(movieid,title,genres)
return M

print(parseMovie("1::Toy Story (1995)::Animation|Children's|Comedy"))

Answer:
Movie(MovieID=1, Title='Toy Story (1995)', Genres="Animation|Children's|Comedy")


#NamedTuple Example : #2
import collections 
from datetime import datetime

Rating = collections.namedtuple('Rating',['UserID','MovieID','Rating','Timestamp']) 

def parseRatingRecord(_row):
fields = _row.split("::")
userid = (int) (fields[0])
movieid = (int) (fields[1])
rating = (int) (fields[2])
_timestamp = datetime.fromtimestamp((int) (fields[3]))
_rating = Rating(userid,movieid,rating,_timestamp)
return _rating

print(parseRatingRecord("1::1193::5::978300760"))

Answer:
Rating(UserID=1, MovieID=1193, Rating=5, Timestamp=datetime.datetime(2001, 1, 1, 3, 42, 40))


#Example 3

import collections
User = collections.namedtuple("Users",["UserID","Gender","Age","Occupation","ZipCode"])

def parseUserRecord(_row):
fields = _row.split("::")
userid = (int)(fields[0])
gender= fields[1]
age = (int) (fields[2])
occupation = (int) (fields[3])
zipcode = (int) (fields[4])
_user = User(userid,gender,age,occupation,zipcode) 
return _user

print(parseUserRecord("3::M::25::15::55117"))            
                               
                               

Monday, 25 May 2020

Date, DateTime Objects in Pyspark

#Date, Row Objects
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()

from pyspark.sql import Row
import datetime as DT

#Pyspark Date object 
r1 = Row(id=101,city="Hyderabad",doj=DT.date(2014,10,23))
r2 = Row(id=102,city="Bangalore",doj=DT.date(2018,3,20))

df = spark.createDataFrame([r1,r2])

df.printSchema()
df.show()

root
 |-- city: string (nullable = true)
 |-- doj: date (nullable = true)
 |-- id: long (nullable = true)

+---------+----------+---+
|     city|       doj| id|
+---------+----------+---+
|Hyderabad|2014-10-23|101|
|Bangalore|2018-03-20|102|
+---------+----------+---+

#Pyspark DateTime object

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()

from pyspark.sql import Row
import datetime as DT

r1 = Row(id=101,city="Hyderabad",doj=DT.datetime(2014,10,23,23,34,45))
r2 = Row(id=102,city="Bangalore",doj=DT.datetime(2018,3,20,12,45,58))

df = spark.createDataFrame([r1,r2])
df.printSchema()
df.show()

root
 |-- city: string (nullable = true)
 |-- doj: timestamp (nullable = true)
 |-- id: long (nullable = true)

+---------+-------------------+---+
|     city|                doj| id|
+---------+-------------------+---+
|Hyderabad|2014-10-23 23:34:45|101|
|Bangalore|2018-03-20 12:45:58|102|
+---------+-------------------+---+

import only what you need in PySpark - Example (Best Practices)

#dont use import * -- not recommended
from pyspark.sql.functions import * 
df1.select('doj',year('doj'),hour('doj').alias('hour'), month(df1.doj).alias('Month'),minute("doj").alias("Minute")).show()

+-------------------+---------+----+-----+------+
|                doj|year(doj)|hour|Month|Minute|
+-------------------+---------+----+-----+------+
|2014-12-23 23:34:45|     2014|  23|   12|    34|
|               null|     null|null| null|  null|
|2010-01-01 12:34:22|     2010|  12|    1|    34|
+-------------------+---------+----+-----+------+

#best practice - import only what you need
from pyspark.sql.functions import col,year,hour,month,minute
df1.select('doj',year('doj'),hour('doj').alias('hour'), month(df1.doj).alias('Month'),minute("doj").alias("Minute")).show()

WithColumn, Casting - Converting data type Example in PySpark

#WithColumn and Converting data type example

 
from pyspark.sql import Row
from pyspark.sql import functions 
 

r1 = Row(id=100,name='Sara',city='Nellai',pin=627001, doj='2014-12-23 23:34:45')
r2 = Row(id=102,name=None,city='Kovai',pin=None,doj=None)
r3 = Row(id=None,name='Raji',city=None,pin=None,doj='2010-01-01 12:34:22')
df = spark.createDataFrame([r1,r2,r3])
#df.select("id","name",'doj',"city","pin").show()
df.printSchema()

#df["field"] example
df1 = df.withColumn("doj",df["doj"].cast("timestamp")) #withColumn and Changing data type
df1.printSchema()


root
 |-- city: string (nullable = true)
 |-- doj: string (nullable = true)  #String
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- pin: long (nullable = true)

root
 |-- city: string (nullable = true)
 |-- doj: timestamp (nullable = true) #timestamp
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- pin: long (nullable = true)
 
 
 
 #with None
from pyspark.sql import Row
from pyspark.sql import functions 
from pyspark.sql.functions import col

r1 = Row(id=100,name='Sara',city='Nellai',pin=627001, doj='2014-12-23 23:34:45')
r2 = Row(id=102,name=None,city='Kovai',pin=None,doj=None)
r3 = Row(id=None,name='Raji',city=None,pin=None,doj='2010-01-01 12:34:22')
df = spark.createDataFrame([r1,r2,r3])
#df.select("id","name",'doj',"city","pin").show()
df.printSchema()

#using col 
df1 = df.withColumn("doj",col("doj").cast("timestamp"))
df1.printSchema()


root
 |-- city: string (nullable = true)
 |-- doj: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- pin: long (nullable = true)

root
 |-- city: string (nullable = true)
 |-- doj: timestamp (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- pin: long (nullable = true)

Row Object in PySpark Examples

from pyspark.sql import Row
r1 = Row(id=100,name='Sara',salary=50000,city='Bangalore')
r2 = Row(id=101,salary=63000,name='Siva',city='Chennai')
r3 = Row(name='Malar',city='Mumbai',id=103,salary=63222)
df = spark.createDataFrame([r1,r2,r3])
df.show()

+---------+---+-----+------+
|     city| id| name|salary|
+---------+---+-----+------+
|Bangalore|100| Sara| 50000|
|  Chennai|101| Siva| 63000|
|   Mumbai|103|Malar| 63222|
+---------+---+-----+------+

df.select("id","name","salary","city").show()
+---+-----+------+---------+
| id| name|salary|     city|
+---+-----+------+---------+
|100| Sara| 50000|Bangalore|
|101| Siva| 63000|  Chennai|
|103|Malar| 63222|   Mumbai|
+---+-----+------+---------+



#with None
from pyspark.sql import Row
r1 = Row(id=100,name='Sara',city='Nellai',pin=627001)
r2 = Row(id=102,name=None,city='Kovai',pin=None)
r3 = Row(id=None,name='Raji',city=None,pin=None)
df = spark.createDataFrame([r1,r2,r3])
df.select("id","name","city","pin").show()

+----+----+------+------+
|  id|name|  city|   pin|
+----+----+------+------+
| 100|Sara|Nellai|627001|
| 102|null| Kovai|  null|
|null|Raji|  null|  null|
+----+----+------+------+

df.printSchema()

root
 |-- city: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- pin: long (nullable = true)



df.na.fill(-1).na.fill("NotProvided").show()
# fill -1 for all numeric fields and NotProvided for all string fields
+-----------+---+-----------+------+
|       city| id|       name|   pin|
+-----------+---+-----------+------+
|     Nellai|100|       Sara|627001|
|      Kovai|102|NotProvided|    -1|
|NotProvided| -1|       Raji|    -1|
+-----------+---+-----------+------+

#with datetime fields
#with None
from pyspark.sql import Row
r1 = Row(id=100,name='Sara',city='Nellai',pin=627001, doj='2014-12-23 23:34:45')
r2 = Row(id=102,name=None,city='Kovai',pin=None,doj=None)
r3 = Row(id=None,name='Raji',city=None,pin=None,doj='2010-01-01 12:34:22')
df = spark.createDataFrame([r1,r2,r3])
df.select("id","name",'doj',"city","pin").show()

+----+----+-------------------+------+------+
|  id|name|                doj|  city|   pin|
+----+----+-------------------+------+------+
| 100|Sara|2014-12-23 23:34:45|Nellai|627001|
| 102|null|               null| Kovai|  null|
|null|Raji|2010-01-01 12:34:22|  null|  null|
+----+----+-------------------+------+------+

root
 |-- city: string (nullable = true)
 |-- doj: string (nullable = true)  -- it is recognizing doj as string
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- pin: long (nullable = true)

builtins vs * - avoid using * in Pyspark

Builtins example:

from pyspark.sql.functions import col  #we didnt put * it will work
from builtins import max

myList = [1,2,5,3,22]
print(max(myList))

22

after restarting kernel:
myList = [1,2,5,3,22]
print(max(myList))

22

error here:

from pyspark.sql.functions import * #we put * so it will make ambiguity

myList = [1,2,5,3,22]
print(max(myList))  # this max will be overridden by sql function max

Sunday, 24 May 2020

Fill missing entry with NODATA programmatically : PySpark RDD programming

Fill missing entry with nodata programmatically

input file:
/home/hadoop/scratch/fillmissingdata.txt

SPARK SPARK SPARK SPARK
SPARK SPARK SPARK
SPARK SPARK
SPARK
SPARK SPARK
SPARK SPARK SPARK
SPARK SPARK SPARK SPARK


from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("sparkApp").getOrCreate()

r1 = spark.sparkContext.textFile("/home/hadoop/scratch/fillmissingdata.txt")
#print(r1.collect())
#print(r1.count())

r2 = r1.map(lambda x:(str)(x)).map(lambda x:x.split(" "))
r3 = r2.map(lambda x:len(x))
columnsize = max(r3.collect())

def fillmissing(x):
result=[]
for i in range(columnsize):
try:
result.append(x[i])
except:
result.append('nodata')
return result
resultdata = r2.map(lambda x:fillmissing(x))
#print(resultdata.collect())

df = resultdata.toDF(['First','Second','Third','Fourth'])
df.show()


output:
+-----+------+------+------+
|   _1|    _2|    _3|    _4|
+-----+------+------+------+
|SPARK| SPARK| SPARK| SPARK|
|SPARK| SPARK| SPARK|nodata|
|SPARK| SPARK|nodata|nodata|
|SPARK|nodata|nodata|nodata|
|SPARK| SPARK|nodata|nodata|
|SPARK| SPARK| SPARK|nodata|
|SPARK| SPARK| SPARK| SPARK|
+-----+------+------+------+

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...