Saturday, 30 May 2020

Google Adsense data Analysis using Pyspark DataFrame

adsenseDF = spark.read.format("csv").option("header",True).option("inferSchema",True).load("E:\\DataSets\\adsense_data.csv")
adsenseDF.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- Page views: integer (nullable = true)
 |-- Impressions: integer (nullable = true)
 |-- Clicks: integer (nullable = true)
 |-- Page RPM (USD): double (nullable = true)
 |-- Impression RPM (USD): double (nullable = true)
 |-- Active View Viewable: string (nullable = true)
 |-- Estimated earnings (USD): double (nullable = true)

dfAdsense = adsenseDF.withColumnRenamed("date",'fldDate').withColumnRenamed('Page views','fldPageViews').withColumnRenamed('Impressions','fldImpressions').withColumnRenamed('Clicks','fldClicks').withColumnRenamed('Page RPM (USD)','fldPageRPM').withColumnRenamed('Impression RPM (USD)','fldimpressionRPM').withColumnRenamed("Active View Viewable","fldActiveViewViewable").withColumnRenamed("Estimated earnings (USD)","fldEstimatedEarnings")
dfAdsense.printSchema()

root
 |-- fldDate: timestamp (nullable = true)
 |-- fldPageViews: integer (nullable = true)
 |-- fldImpressions: integer (nullable = true)
 |-- fldClicks: integer (nullable = true)
 |-- fldPageRPM: double (nullable = true)
 |-- fldimpressionRPM: double (nullable = true)
 |-- fldActiveViewViewable: string (nullable = true)
 |-- fldEstimatedEarnings: double (nullable = true)



#delete the records which are having zero revenue
from pyspark.sql.functions import col
df = dfAdsense.filter(col("fldEstimatedEarnings") != 0)
print(df.count())

1511


#Find the total Earnings 
from pyspark.sql import functions as F 
df.select(F.sum(col("fldEstimatedEarnings")).alias("Estimated Earnings Total")).show()

+------------------------+
|Estimated Earnings Total|
+------------------------+
|      16995.739999999976|
+------------------------+

#Find the top 10 Earnings without Date
from pyspark.sql.functions import col
df.select(col("fldEstimatedEarnings")) .sort(col("fldEstimatedEarnings").desc()).show(10)

+--------------------+
|fldEstimatedEarnings|
+--------------------+
|               63.71|
|               54.14|
|               53.77|
|               52.73|
|               52.14|
|               50.86|
|               49.63|
|               48.91|
|                48.8|
|               48.58|
+--------------------+

#Find the top 10 Earnings without Date
from pyspark.sql.functions import desc
df.select("fldEstimatedEarnings").sort(desc("fldEstimatedEarnings")).show(10)

+--------------------+
|fldEstimatedEarnings|
+--------------------+
|               63.71|
|               54.14|
|               53.77|
|               52.73|
|               52.14|
|               50.86|
|               49.63|
|               48.91|
|                48.8|
|               48.58|
+--------------------+


#Find the top 10 Earnings with Date

from pyspark.sql.functions import desc
df.select("fldDate","fldEstimatedEarnings").sort(desc("fldEstimatedEarnings")).show(10)

+-------------------+--------------------+
|            fldDate|fldEstimatedEarnings|
+-------------------+--------------------+
|2010-02-25 00:00:00|               63.71|
|2011-11-28 00:00:00|               54.14|
|2010-12-18 00:00:00|               53.77|
|2011-11-21 00:00:00|               52.73|
|2010-12-19 00:00:00|               52.14|
|2011-07-13 00:00:00|               50.86|
|2011-12-02 00:00:00|               49.63|
|2011-06-24 00:00:00|               48.91|
|2011-09-27 00:00:00|                48.8|
|2011-07-04 00:00:00|               48.58|
+-------------------+--------------------+


 #Find the least 10 Earnings without Date
 df.select("fldEstimatedEarnings").sort(("fldEstimatedEarnings")).show(10)
 
 +--------------------+
|fldEstimatedEarnings|
+--------------------+
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
+--------------------+

#Find the least 10 Earnings with Date
df.select("fldDate","fldEstimatedEarnings").sort(("fldEstimatedEarnings")).show(10)

+-------------------+--------------------+
|            fldDate|fldEstimatedEarnings|
+-------------------+--------------------+
|2008-09-14 00:00:00|                0.01|
|2012-12-22 00:00:00|                0.01|
|2008-08-30 00:00:00|                0.01|
|2009-02-17 00:00:00|                0.01|
|2012-01-28 00:00:00|                0.01|
|2009-03-07 00:00:00|                0.01|
|2012-02-06 00:00:00|                0.01|
|2011-12-13 00:00:00|                0.01|
|2012-01-24 00:00:00|                0.01|
|2008-09-08 00:00:00|                0.01|
+-------------------+--------------------+

#in which dates we got Estimated Earnings >= 40
from pyspark.sql.functions import col
df.select("fldDate","fldEstimatedEarnings").where (col("fldEstimatedEarnings") >= 40).show()

+-------------------+--------------------+
|            fldDate|fldEstimatedEarnings|
+-------------------+--------------------+
|2010-02-25 00:00:00|               63.71|
|2011-07-04 00:00:00|               48.58|
|2011-11-21 00:00:00|               52.73|
|2011-11-28 00:00:00|               54.14|
|2011-07-08 00:00:00|               46.45|
|2011-11-20 00:00:00|               42.98|
|2010-12-18 00:00:00|               53.77|
|2011-11-29 00:00:00|               43.27|
|2011-11-17 00:00:00|               41.15|
|2011-11-22 00:00:00|               43.78|
|2011-11-30 00:00:00|               46.69|
|2011-12-02 00:00:00|               49.63|
|2011-11-15 00:00:00|               41.43|
|2011-09-17 00:00:00|               42.28|
|2011-07-12 00:00:00|               46.85|
|2011-07-11 00:00:00|               43.33|
|2011-07-09 00:00:00|               44.45|
|2011-07-05 00:00:00|               43.08|
|2011-09-18 00:00:00|               43.02|
|2011-06-22 00:00:00|               44.82|
+-------------------+--------------------+

 #How many days we got Estimated Earnings >= 40?
 df.where(col("fldEstimatedEarnings") >= 40).count()
 
 47
 
 >10 and <= 50 - count
 df.filter( (col("fldEstimatedEarnings") >= 10) & (col("fldEstimatedEarnings") <= 50)).count()
 
 583
 
 #top 10 page views
from pyspark.sql.functions import desc
df.select("fldDate","fldPageViews").sort(desc("fldPageViews")).show(10)


+-------------------+------------+
|            fldDate|fldPageViews|
+-------------------+------------+
|2010-02-25 00:00:00|       61358|
|2011-07-03 00:00:00|       60066|
|2011-04-03 00:00:00|       59079|
|2011-07-04 00:00:00|       55221|
|2011-11-21 00:00:00|       50808|
|2011-07-08 00:00:00|       49545|
|2011-11-20 00:00:00|       49020|
|2011-11-13 00:00:00|       48613|
|2011-11-28 00:00:00|       48360|
|2011-11-22 00:00:00|       47939|
+-------------------+------------+

#top 10 clicks 
from pyspark.sql.functions import desc
df.select("fldDate","fldClicks").sort(desc("fldClicks")).show(10)

+-------------------+---------+
|            fldDate|fldClicks|
+-------------------+---------+
|2011-11-26 00:00:00|     1044|
|2011-11-25 00:00:00|     1026|
|2011-11-28 00:00:00|      996|
|2011-11-29 00:00:00|      968|
|2011-11-18 00:00:00|      924|
|2011-11-16 00:00:00|      920|
|2011-11-17 00:00:00|      917|
|2011-11-12 00:00:00|      912|
|2011-08-06 00:00:00|      878|
|2011-11-13 00:00:00|      873|
+-------------------+---------+


# no of clicks >= 800 - Descending order
from pyspark.sql.functions import desc
df.select("fldDate","fldClicks").where(col('fldClicks') >= 800).sort(desc("fldClicks")).show()

+-------------------+---------+
|            fldDate|fldClicks|
+-------------------+---------+
|2011-11-26 00:00:00|     1044|
|2011-11-25 00:00:00|     1026|
|2011-11-28 00:00:00|      996|
|2011-11-29 00:00:00|      968|
|2011-11-18 00:00:00|      924|
|2011-11-16 00:00:00|      920|
|2011-11-17 00:00:00|      917|
|2011-11-12 00:00:00|      912|
|2011-08-06 00:00:00|      878|
|2011-11-13 00:00:00|      873|
|2011-11-30 00:00:00|      849|
|2011-07-23 00:00:00|      845|
|2011-12-02 00:00:00|      842|
|2011-11-15 00:00:00|      829|
|2011-11-27 00:00:00|      823|
|2011-07-09 00:00:00|      812|
|2011-07-08 00:00:00|      806|
|2011-07-04 00:00:00|      802|
|2011-11-19 00:00:00|      801|
|2011-11-11 00:00:00|      800|

#no of clicks >= 800 - Ascending Order
from pyspark.sql.functions import desc
df.select("fldDate","fldClicks").where(col('fldClicks') >= 800).sort("fldClicks").show()

+-------------------+---------+
|            fldDate|fldClicks|
+-------------------+---------+
|2011-11-11 00:00:00|      800|
|2011-11-19 00:00:00|      801|
|2011-07-04 00:00:00|      802|
|2011-07-08 00:00:00|      806|
|2011-07-09 00:00:00|      812|
|2011-11-27 00:00:00|      823|
|2011-11-15 00:00:00|      829|
|2011-12-02 00:00:00|      842|
|2011-07-23 00:00:00|      845|
|2011-11-30 00:00:00|      849|
|2011-11-13 00:00:00|      873|
|2011-08-06 00:00:00|      878|
|2011-11-12 00:00:00|      912|
|2011-11-17 00:00:00|      917|
|2011-11-16 00:00:00|      920|
|2011-11-18 00:00:00|      924|
|2011-11-29 00:00:00|      968|
|2011-11-28 00:00:00|      996|
|2011-11-25 00:00:00|     1026|
|2011-11-26 00:00:00|     1044|
+-------------------+---------+


 
#How many days we got clicks >= 800
#adsense_r4.filter(lambda x: x.fldClicks >= 800).count()
df.select("fldClicks").where(col("fldClicks") >= 800).count()

20


df.filter(col("fldClicks") >= 800).select(F.sum(col("fldClicks")).alias('Sum')).show()
+-----+
|  Sum|
+-----+
|17667|
+-----+

#Total number of clicks all the day 
df.select(F.sum(col("fldClicks")).alias("All clicks count")).show()

+----------------+
|All clicks count|
+----------------+
|          248190|
+----------------+

#Total number of page views all the day 
df.select(F.sum(col("fldPageViews")).alias("Total Page views")).show()

+----------------+
|Total Page views|
+----------------+
|        18626239|
+----------------+

#Total days when page views >= 50000
df.where(col("fldPageViews") >= 50000).count()
#answer : 5

#Total days when page views >= 10000
df.where(col("fldPageViews") >= 10000).count()
#answer : 649



from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()
adsenseDF = spark.read.format("csv").option("header",True).option("inferSchema",True).load("E:\\DataSets\\adsense_data.csv")
adsenseDF.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- Page views: integer (nullable = true)
 |-- Impressions: integer (nullable = true)
 |-- Clicks: integer (nullable = true)
 |-- Page RPM (USD): double (nullable = true)
 |-- Impression RPM (USD): double (nullable = true)
 |-- Active View Viewable: string (nullable = true)
 |-- Estimated earnings (USD): double (nullable = true)
 
 
dfAdsense = adsenseDF.withColumnRenamed("date",'fldDate').withColumnRenamed('Page views','fldPageViews').withColumnRenamed('Impressions','fldImpressions').withColumnRenamed('Clicks','fldClicks').withColumnRenamed('Page RPM (USD)','fldPageRPM').withColumnRenamed('Impression RPM (USD)','fldimpressionRPM').withColumnRenamed("Active View Viewable","fldActiveViewViewable").withColumnRenamed("Estimated earnings (USD)","fldEstimatedEarnings")
dfAdsense.printSchema()

from pyspark.sql.functions import col
df = dfAdsense.filter(col("fldEstimatedEarnings") != 0)
print(df.count())



root
 |-- fldDate: timestamp (nullable = true)
 |-- fldPageViews: integer (nullable = true)
 |-- fldImpressions: integer (nullable = true)
 |-- fldClicks: integer (nullable = true)
 |-- fldPageRPM: double (nullable = true)
 |-- fldimpressionRPM: double (nullable = true)
 |-- fldActiveViewViewable: string (nullable = true)
 |-- fldEstimatedEarnings: double (nullable = true)

1511

from pyspark.sql.functions import hour, mean, year,month,dayofmonth
import pyspark.sql.functions as F

#df.select(year("fldDate"),month("fldDate")).show(5)
#f.select(year("fldDate").alias('year'), month("fldDate").alias('month'), dayofmonth("fldDate").alias('day')).show()
#df.groupBy(year("fldDate").alias("Year")).sum("fldEstimatedEarnings").orderBy("Year").show()

df.groupBy(year("fldDate").alias("Year")).agg(F.round(F.sum('fldEstimatedEarnings'),2).alias('Earnings')).orderBy("Year").show()


+----+--------+
|Year|Earnings|
+----+--------+
|2008|  374.73|
|2009| 1640.68|
|2010| 2883.28|
|2011| 9392.38|
|2012| 2702.84|
|2013|    1.28|
|2014|    0.03|
|2015|    0.21|
|2016|    0.25|
|2017|    0.06|
+----+--------+



from pyspark.sql.functions import hour, mean, year,month,dayofmonth
import pyspark.sql.functions as F
df_re1 = df.groupBy(month('fldDate').alias("Month"),year("fldDate").alias("Year")).agg(F.round(F.sum('fldEstimatedEarnings'),2).alias('Earnings')).orderBy("Year","Month")

+-----+----+--------+
|Month|Year|Earnings|
+-----+----+--------+
|    6|2008|   35.76|
|    7|2008|   82.05|
|    8|2008|   27.17|
|    9|2008|   21.12|
|   10|2008|   65.87|
|   11|2008|   78.38|
|   12|2008|   64.38|
|    1|2009|   130.1|
|    2|2009|   26.69|
|    3|2009|   59.68|
|    4|2009|   94.23|
|    5|2009|  186.15|
|    6|2009|  248.88|
|    7|2009|  190.95|
|    8|2009|    81.7|
|    9|2009|   60.99|
|   10|2009|   206.2|
|   11|2009|  198.68|
|   12|2009|  156.43|
|    1|2010|  101.46|
|    2|2010|  316.61|
|    3|2010|  179.72|
|    4|2010|  142.26|
|    5|2010|  120.05|
|    6|2010|  132.18|
|    7|2010|  134.75|
|    8|2010|  137.77|
|    9|2010|  265.05|
|   10|2010|  267.31|
|   11|2010|   372.2|
|   12|2010|  713.92|
|    1|2011|  478.87|
|    2|2011|  551.54|
|    3|2011|   549.9|
|    4|2011|  683.07|
|    5|2011|  620.94|
|    6|2011|  969.76|
|    7|2011| 1149.31|
|    8|2011| 1057.64|
|    9|2011| 1039.34|
|   10|2011|  966.37|
|   11|2011| 1138.69|
|   12|2011|  186.95|
|    1|2012|    0.48|
|    2|2012|  256.25|
|    3|2012|  564.13|
|    4|2012|  524.13|
|    5|2012|  504.83|
|    6|2012|  370.12|
|    7|2012|   261.1|
|    8|2012|  219.39|
|    9|2012|    2.38|
|   10|2012|    0.02|
|   12|2012|    0.01|
|    1|2013|    0.01|
|    3|2013|    0.25|
|    4|2013|    0.11|
|    5|2013|    0.09|
|    6|2013|    0.25|
|    7|2013|    0.03|
|    9|2013|    0.09|
|   10|2013|    0.32|
|   12|2013|    0.13|
|    2|2014|    0.03|
|    2|2015|    0.08|
|    4|2015|    0.03|
|    7|2015|     0.1|
|    2|2016|    0.01|
|    3|2016|     0.2|
|    9|2016|    0.03|
|   11|2016|    0.01|
|    6|2017|    0.06|
+-----+----+-------- 


df_re1.groupBy("Year").agg(F.round(F.sum("Earnings"),2)).orderBy("Year").show()

+----+-----------------------+
|Year|round(sum(Earnings), 2)|
+----+-----------------------+
|2008|                 374.73|
|2009|                1640.68|
|2010|                2883.28|
|2011|                9392.38|
|2012|                2702.84|
|2013|                   1.28|
|2014|                   0.03|
|2015|                   0.21|
|2016|                   0.25|
|2017|                   0.06|
+----+-----------------------+

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...