Thursday, 13 August 2020

Google Adsense Analytics using Spark with Scala

$ hdfs dfs -cat hdfs://localhost:8020/user/cloudera/adsense/adsense.csv | head
Date,Page views,Impressions,Clicks,Page RPM (USD),Impression RPM (USD),Active View Viewable,Estimated earnings (USD)
2010-02-25,61358,160401,525,1.04,0.4,,63.71
2011-07-03,60066,157661,774,0.62,0.24,,37.19
2011-04-03,59079,148407,339,0.5,0.2,,29.54
2011-07-04,55221,142543,802,0.88,0.34,,48.58
2011-11-21,50808,136381,786,1.04,0.39,,52.73
2011-11-28,48360,130869,996,1.12,0.41,,54.14
2011-07-08,49545,129778,806,0.94,0.36,,46.45
2011-11-20,49020,129486,622,0.88,0.33,,42.98
2010-12-18,45092,127493,289,1.19,0.42,,53.77


scala> val df  = spark.read.format("csv").option("header","True").option("inferSchema","True").load("hdfs://localhost:8020/user/cloudera/adsense/adsense.csv")

scala> df.printSchema()
root
 |-- Date: timestamp (nullable = true)
 |-- Page views: integer (nullable = true)
 |-- Impressions: integer (nullable = true)
 |-- Clicks: integer (nullable = true)
 |-- Page RPM (USD): double (nullable = true)
 |-- Impression RPM (USD): double (nullable = true)
 |-- Active View Viewable: string (nullable = true)
 |-- Estimated earnings (USD): double (nullable = true)



val dfAdsense = df.withColumnRenamed("Date","fldDate").withColumnRenamed("Page views","fldPageViews").withColumnRenamed("Impressions","fldImpressions").withColumnRenamed("Clicks","fldClicks").withColumnRenamed("Page RPM (USD)","fldPageRPM").withColumnRenamed("Impression RPM (USD)","fldimpressionRPM").withColumnRenamed("Active View Viewable","fldActiveViewViewable").withColumnRenamed("Estimated earnings (USD)","fldEstimatedEarnings")


scala> dfAdsense.printSchema()
root
 |-- fldDate: timestamp (nullable = true)
 |-- fldPageViews: integer (nullable = true)
 |-- fldImpressions: integer (nullable = true)
 |-- fldClicks: integer (nullable = true)
 |-- fldPageRPM: double (nullable = true)
 |-- fldimpressionRPM: double (nullable = true)
 |-- fldActiveViewViewable: string (nullable = true)
 |-- fldEstimatedEarnings: double (nullable = true)


// total row count before remove 0 earnings records
scala> dfAdsense.count
res3: Long = 2452

 
scala> val df = dfAdsense.filter($"fldEstimatedEarnings" =!= 0)
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [fldDate: timestamp, fldPageViews: int ... 6 more fields]


scala> print(df.count())
1511


scala> df.agg(sum("fldEstimatedEarnings")).show()
+-------------------------+
|sum(fldEstimatedEarnings)|
+-------------------------+
|       16995.739999999976|


scala> df.select("fldDate","fldEstimatedEarnings").sort("fldEstimatedEarnings").limit(10).show()
+-------------------+--------------------+
|            fldDate|fldEstimatedEarnings|
+-------------------+--------------------+
|2008-09-08 00:00:00|                0.01|
|2016-02-22 00:00:00|                0.01|
|2008-09-03 00:00:00|                0.01|
|2009-03-07 00:00:00|                0.01|
|2008-09-05 00:00:00|                0.01|
|2011-12-10 00:00:00|                0.01|
|2011-12-06 00:00:00|                0.01|
|2011-12-13 00:00:00|                0.01|
|2008-09-14 00:00:00|                0.01|
|2008-08-30 00:00:00|                0.01|
+-------------------+--------------------+


scala> df.select("fldDate","fldEstimatedEarnings").sort(desc("fldEstimatedEarnings")).limit(10).show()
+-------------------+--------------------+
|            fldDate|fldEstimatedEarnings|
+-------------------+--------------------+
|2010-02-25 00:00:00|               63.71|
|2011-11-28 00:00:00|               54.14|
|2010-12-18 00:00:00|               53.77|
|2011-11-21 00:00:00|               52.73|
|2010-12-19 00:00:00|               52.14|
|2011-07-13 00:00:00|               50.86|
|2011-12-02 00:00:00|               49.63|
|2011-06-24 00:00:00|               48.91|
|2011-09-27 00:00:00|                48.8|
|2011-07-04 00:00:00|               48.58|
+-------------------+--------------------+


scala> df.select("fldDate","fldEstimatedEarnings", "fldClicks","fldPageViews").sort(desc("fldEstimatedEarnings")).limit(10).show()
+-------------------+--------------------+---------+------------+
|            fldDate|fldEstimatedEarnings|fldClicks|fldPageViews|
+-------------------+--------------------+---------+------------+
|2010-02-25 00:00:00|               63.71|      525|       61358|
|2011-11-28 00:00:00|               54.14|      996|       48360|
|2010-12-18 00:00:00|               53.77|      289|       45092|
|2011-11-21 00:00:00|               52.73|      786|       50808|
|2010-12-19 00:00:00|               52.14|      202|       35461|
|2011-07-13 00:00:00|               50.86|      751|       40785|
|2011-12-02 00:00:00|               49.63|      842|       42972|
|2011-06-24 00:00:00|               48.91|      704|       33192|
|2011-09-27 00:00:00|                48.8|      680|       34525|
|2011-07-04 00:00:00|               48.58|      802|       55221|
+-------------------+--------------------+---------+------------+



scala> df.groupBy(year($"fldDate")).agg(sum($"fldEstimatedEarnings")).show()
+-------------+-------------------------+                                       
|year(fldDate)|sum(fldEstimatedEarnings)|
+-------------+-------------------------+
|         2015|      0.21000000000000002|
|         2013|       1.2799999999999998|
|         2014|                     0.03|
|         2012|                  2702.84|
|         2009|       1640.6799999999964|
|         2016|                     0.25|
|         2010|       2883.2799999999975|
|         2011|                  9392.38|
|         2008|        374.7300000000001|
|         2017|     0.060000000000000005|
+-------------+-------------------------+


scala> df.groupBy(year($"fldDate")).agg(round(sum($"fldEstimatedEarnings"),2)).show()
+-------------+-----------------------------------+                             
|year(fldDate)|round(sum(fldEstimatedEarnings), 2)|
+-------------+-----------------------------------+
|         2015|                               0.21|
|         2013|                               1.28|
|         2014|                               0.03|
|         2012|                            2702.84|
|         2009|                            1640.68|
|         2016|                               0.25|
|         2010|                            2883.28|
|         2011|                            9392.38|
|         2008|                             374.73|
|         2017|                               0.06|
+-------------+-----------------------------------+

scala> df.groupBy(year($"fldDate").alias("Year")).agg(round(sum($"fldEstimatedEarnings"),2).alias("Earnings")).show()
+----+--------+
|Year|Earnings|
+----+--------+
|2015|    0.21|
|2013|    1.28|
|2014|    0.03|
|2012| 2702.84|
|2009| 1640.68|
|2016|    0.25|
|2010| 2883.28|
|2011| 9392.38|
|2008|  374.73|
|2017|    0.06|
+----+--------+


scala> df.groupBy(year($"fldDate").alias("Year"), month($"fldDate").alias("Month")).agg(round(sum($"fldEstimatedEarnings"),2).alias("Earnings")).sort("Year","Month").show(200)
+----+-----+--------+                                                           
|Year|Month|Earnings|
+----+-----+--------+
|2008|    6|   35.76|
|2008|    7|   82.05|
|2008|    8|   27.17|
|2008|    9|   21.12|
|2008|   10|   65.87|
|2008|   11|   78.38|
|2008|   12|   64.38|
|2009|    1|   130.1|
|2009|    2|   26.69|
|2009|    3|   59.68|
|2009|    4|   94.23|
|2009|    5|  186.15|
|2009|    6|  248.88|
|2009|    7|  190.95|
|2009|    8|    81.7|
|2009|    9|   60.99|
|2009|   10|   206.2|
|2009|   11|  198.68|
|2009|   12|  156.43|
|2010|    1|  101.46|
|2010|    2|  316.61|
|2010|    3|  179.72|
|2010|    4|  142.26|
|2010|    5|  120.05|
|2010|    6|  132.18|
|2010|    7|  134.75|
|2010|    8|  137.77|
|2010|    9|  265.05|
|2010|   10|  267.31|
|2010|   11|   372.2|
|2010|   12|  713.92|
|2011|    1|  478.87|
|2011|    2|  551.54|
|2011|    3|   549.9|
|2011|    4|  683.07|
|2011|    5|  620.94|
|2011|    6|  969.76|
|2011|    7| 1149.31|
|2011|    8| 1057.64|
|2011|    9| 1039.34|
|2011|   10|  966.37|
|2011|   11| 1138.69|
|2011|   12|  186.95|
|2012|    1|    0.48|
|2012|    2|  256.25|
|2012|    3|  564.13|
|2012|    4|  524.13|
|2012|    5|  504.83|
|2012|    6|  370.12|
|2012|    7|   261.1|
|2012|    8|  219.39|
|2012|    9|    2.38|
|2012|   10|    0.02|
|2012|   12|    0.01|
|2013|    1|    0.01|
|2013|    3|    0.25|
|2013|    4|    0.11|
|2013|    5|    0.09|
|2013|    6|    0.25|
|2013|    7|    0.03|
|2013|    9|    0.09|
|2013|   10|    0.32|
|2013|   12|    0.13|
|2014|    2|    0.03|
|2015|    2|    0.08|
|2015|    4|    0.03|
|2015|    7|     0.1|
|2016|    2|    0.01|
|2016|    3|     0.2|
|2016|    9|    0.03|
|2016|   11|    0.01|
|2017|    6|    0.06|
+----+-----+--------+


// order by Earnings group by Year and Month
scala> df.groupBy(year($"fldDate").alias("Year"), month($"fldDate").alias("Month")).agg(round(sum($"fldEstimatedEarnings"),2).alias("Earnings")).sort(desc("Earnings")).show(200) 
+----+-----+--------+                                                           
|Year|Month|Earnings|
+----+-----+--------+
|2011|    7| 1149.31|
|2011|   11| 1138.69|
|2011|    8| 1057.64|
|2011|    9| 1039.34|
|2011|    6|  969.76|
|2011|   10|  966.37|
|2010|   12|  713.92|
|2011|    4|  683.07|
|2011|    5|  620.94|
|2012|    3|  564.13|
|2011|    2|  551.54|
|2011|    3|   549.9|
|2012|    4|  524.13|
|2012|    5|  504.83|
|2011|    1|  478.87|
|2010|   11|   372.2|
|2012|    6|  370.12|
|2010|    2|  316.61|
|2010|   10|  267.31|
|2010|    9|  265.05|
|2012|    7|   261.1|
|2012|    2|  256.25|
|2009|    6|  248.88|
|2012|    8|  219.39|
|2009|   10|   206.2|
|2009|   11|  198.68|
|2009|    7|  190.95|
|2011|   12|  186.95|
|2009|    5|  186.15|
|2010|    3|  179.72|
|2009|   12|  156.43|
|2010|    4|  142.26|
|2010|    8|  137.77|
|2010|    7|  134.75|
|2010|    6|  132.18|
|2009|    1|   130.1|
|2010|    5|  120.05|
|2010|    1|  101.46|
|2009|    4|   94.23|
|2008|    7|   82.05|
|2009|    8|    81.7|
|2008|   11|   78.38|
|2008|   10|   65.87|
|2008|   12|   64.38|
|2009|    9|   60.99|
|2009|    3|   59.68|
|2008|    6|   35.76|
|2008|    8|   27.17|
|2009|    2|   26.69|
|2008|    9|   21.12|
|2012|    9|    2.38|
|2012|    1|    0.48|
|2013|   10|    0.32|
|2013|    3|    0.25|
|2013|    6|    0.25|
|2016|    3|     0.2|
|2013|   12|    0.13|
|2013|    4|    0.11|
|2015|    7|     0.1|
|2013|    5|    0.09|
|2013|    9|    0.09|
|2015|    2|    0.08|
|2017|    6|    0.06|
|2013|    7|    0.03|
|2014|    2|    0.03|
|2015|    4|    0.03|
|2016|    9|    0.03|
|2012|   10|    0.02|
|2016|   11|    0.01|
|2012|   12|    0.01|
|2016|    2|    0.01|
|2013|    1|    0.01|
+----+-----+--------+

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...