adsenseDF = spark.read.format("csv").option("header",True).option("inferSchema",True).load("E:\\DataSets\\adsense_data.csv")
adsenseDF.printSchema()
root
|-- date: timestamp (nullable = true)
|-- Page views: integer (nullable = true)
|-- Impressions: integer (nullable = true)
|-- Clicks: integer (nullable = true)
|-- Page RPM (USD): double (nullable = true)
|-- Impression RPM (USD): double (nullable = true)
|-- Active View Viewable: string (nullable = true)
|-- Estimated earnings (USD): double (nullable = true)
dfAdsense = adsenseDF.withColumnRenamed("date",'fldDate').withColumnRenamed('Page views','fldPageViews').withColumnRenamed('Impressions','fldImpressions').withColumnRenamed('Clicks','fldClicks').withColumnRenamed('Page RPM (USD)','fldPageRPM').withColumnRenamed('Impression RPM (USD)','fldimpressionRPM').withColumnRenamed("Active View Viewable","fldActiveViewViewable").withColumnRenamed("Estimated earnings (USD)","fldEstimatedEarnings")
dfAdsense.printSchema()
root
|-- fldDate: timestamp (nullable = true)
|-- fldPageViews: integer (nullable = true)
|-- fldImpressions: integer (nullable = true)
|-- fldClicks: integer (nullable = true)
|-- fldPageRPM: double (nullable = true)
|-- fldimpressionRPM: double (nullable = true)
|-- fldActiveViewViewable: string (nullable = true)
|-- fldEstimatedEarnings: double (nullable = true)
#delete the records which are having zero revenue
from pyspark.sql.functions import col
df = dfAdsense.filter(col("fldEstimatedEarnings") != 0)
print(df.count())
1511
#Find the total Earnings
from pyspark.sql import functions as F
df.select(F.sum(col("fldEstimatedEarnings")).alias("Estimated Earnings Total")).show()
+------------------------+
|Estimated Earnings Total|
+------------------------+
| 16995.739999999976|
+------------------------+
#Find the top 10 Earnings without Date
from pyspark.sql.functions import col
df.select(col("fldEstimatedEarnings")) .sort(col("fldEstimatedEarnings").desc()).show(10)
+--------------------+
|fldEstimatedEarnings|
+--------------------+
| 63.71|
| 54.14|
| 53.77|
| 52.73|
| 52.14|
| 50.86|
| 49.63|
| 48.91|
| 48.8|
| 48.58|
+--------------------+
#Find the top 10 Earnings without Date
from pyspark.sql.functions import desc
df.select("fldEstimatedEarnings").sort(desc("fldEstimatedEarnings")).show(10)
+--------------------+
|fldEstimatedEarnings|
+--------------------+
| 63.71|
| 54.14|
| 53.77|
| 52.73|
| 52.14|
| 50.86|
| 49.63|
| 48.91|
| 48.8|
| 48.58|
+--------------------+
#Find the top 10 Earnings with Date
from pyspark.sql.functions import desc
df.select("fldDate","fldEstimatedEarnings").sort(desc("fldEstimatedEarnings")).show(10)
+-------------------+--------------------+
| fldDate|fldEstimatedEarnings|
+-------------------+--------------------+
|2010-02-25 00:00:00| 63.71|
|2011-11-28 00:00:00| 54.14|
|2010-12-18 00:00:00| 53.77|
|2011-11-21 00:00:00| 52.73|
|2010-12-19 00:00:00| 52.14|
|2011-07-13 00:00:00| 50.86|
|2011-12-02 00:00:00| 49.63|
|2011-06-24 00:00:00| 48.91|
|2011-09-27 00:00:00| 48.8|
|2011-07-04 00:00:00| 48.58|
+-------------------+--------------------+
#Find the least 10 Earnings without Date
df.select("fldEstimatedEarnings").sort(("fldEstimatedEarnings")).show(10)
+--------------------+
|fldEstimatedEarnings|
+--------------------+
| 0.01|
| 0.01|
| 0.01|
| 0.01|
| 0.01|
| 0.01|
| 0.01|
| 0.01|
| 0.01|
| 0.01|
+--------------------+
#Find the least 10 Earnings with Date
df.select("fldDate","fldEstimatedEarnings").sort(("fldEstimatedEarnings")).show(10)
+-------------------+--------------------+
| fldDate|fldEstimatedEarnings|
+-------------------+--------------------+
|2008-09-14 00:00:00| 0.01|
|2012-12-22 00:00:00| 0.01|
|2008-08-30 00:00:00| 0.01|
|2009-02-17 00:00:00| 0.01|
|2012-01-28 00:00:00| 0.01|
|2009-03-07 00:00:00| 0.01|
|2012-02-06 00:00:00| 0.01|
|2011-12-13 00:00:00| 0.01|
|2012-01-24 00:00:00| 0.01|
|2008-09-08 00:00:00| 0.01|
+-------------------+--------------------+
#in which dates we got Estimated Earnings >= 40
from pyspark.sql.functions import col
df.select("fldDate","fldEstimatedEarnings").where (col("fldEstimatedEarnings") >= 40).show()
+-------------------+--------------------+
| fldDate|fldEstimatedEarnings|
+-------------------+--------------------+
|2010-02-25 00:00:00| 63.71|
|2011-07-04 00:00:00| 48.58|
|2011-11-21 00:00:00| 52.73|
|2011-11-28 00:00:00| 54.14|
|2011-07-08 00:00:00| 46.45|
|2011-11-20 00:00:00| 42.98|
|2010-12-18 00:00:00| 53.77|
|2011-11-29 00:00:00| 43.27|
|2011-11-17 00:00:00| 41.15|
|2011-11-22 00:00:00| 43.78|
|2011-11-30 00:00:00| 46.69|
|2011-12-02 00:00:00| 49.63|
|2011-11-15 00:00:00| 41.43|
|2011-09-17 00:00:00| 42.28|
|2011-07-12 00:00:00| 46.85|
|2011-07-11 00:00:00| 43.33|
|2011-07-09 00:00:00| 44.45|
|2011-07-05 00:00:00| 43.08|
|2011-09-18 00:00:00| 43.02|
|2011-06-22 00:00:00| 44.82|
+-------------------+--------------------+
#How many days we got Estimated Earnings >= 40?
df.where(col("fldEstimatedEarnings") >= 40).count()
47
>10 and <= 50 - count
df.filter( (col("fldEstimatedEarnings") >= 10) & (col("fldEstimatedEarnings") <= 50)).count()
583
#top 10 page views
from pyspark.sql.functions import desc
df.select("fldDate","fldPageViews").sort(desc("fldPageViews")).show(10)
+-------------------+------------+
| fldDate|fldPageViews|
+-------------------+------------+
|2010-02-25 00:00:00| 61358|
|2011-07-03 00:00:00| 60066|
|2011-04-03 00:00:00| 59079|
|2011-07-04 00:00:00| 55221|
|2011-11-21 00:00:00| 50808|
|2011-07-08 00:00:00| 49545|
|2011-11-20 00:00:00| 49020|
|2011-11-13 00:00:00| 48613|
|2011-11-28 00:00:00| 48360|
|2011-11-22 00:00:00| 47939|
+-------------------+------------+
#top 10 clicks
from pyspark.sql.functions import desc
df.select("fldDate","fldClicks").sort(desc("fldClicks")).show(10)
+-------------------+---------+ | fldDate|fldClicks| +-------------------+---------+ |2011-11-26 00:00:00| 1044| |2011-11-25 00:00:00| 1026| |2011-11-28 00:00:00| 996| |2011-11-29 00:00:00| 968| |2011-11-18 00:00:00| 924| |2011-11-16 00:00:00| 920| |2011-11-17 00:00:00| 917| |2011-11-12 00:00:00| 912| |2011-08-06 00:00:00| 878| |2011-11-13 00:00:00| 873| +-------------------+---------+
# no of clicks >= 800 - Descending order
from pyspark.sql.functions import desc
df.select("fldDate","fldClicks").where(col('fldClicks') >= 800).sort(desc("fldClicks")).show()
+-------------------+---------+
| fldDate|fldClicks|
+-------------------+---------+
|2011-11-26 00:00:00| 1044|
|2011-11-25 00:00:00| 1026|
|2011-11-28 00:00:00| 996|
|2011-11-29 00:00:00| 968|
|2011-11-18 00:00:00| 924|
|2011-11-16 00:00:00| 920|
|2011-11-17 00:00:00| 917|
|2011-11-12 00:00:00| 912|
|2011-08-06 00:00:00| 878|
|2011-11-13 00:00:00| 873|
|2011-11-30 00:00:00| 849|
|2011-07-23 00:00:00| 845|
|2011-12-02 00:00:00| 842|
|2011-11-15 00:00:00| 829|
|2011-11-27 00:00:00| 823|
|2011-07-09 00:00:00| 812|
|2011-07-08 00:00:00| 806|
|2011-07-04 00:00:00| 802|
|2011-11-19 00:00:00| 801|
|2011-11-11 00:00:00| 800|
#no of clicks >= 800 - Ascending Order
from pyspark.sql.functions import desc
df.select("fldDate","fldClicks").where(col('fldClicks') >= 800).sort("fldClicks").show()
+-------------------+---------+
| fldDate|fldClicks|
+-------------------+---------+
|2011-11-11 00:00:00| 800|
|2011-11-19 00:00:00| 801|
|2011-07-04 00:00:00| 802|
|2011-07-08 00:00:00| 806|
|2011-07-09 00:00:00| 812|
|2011-11-27 00:00:00| 823|
|2011-11-15 00:00:00| 829|
|2011-12-02 00:00:00| 842|
|2011-07-23 00:00:00| 845|
|2011-11-30 00:00:00| 849|
|2011-11-13 00:00:00| 873|
|2011-08-06 00:00:00| 878|
|2011-11-12 00:00:00| 912|
|2011-11-17 00:00:00| 917|
|2011-11-16 00:00:00| 920|
|2011-11-18 00:00:00| 924|
|2011-11-29 00:00:00| 968|
|2011-11-28 00:00:00| 996|
|2011-11-25 00:00:00| 1026|
|2011-11-26 00:00:00| 1044|
+-------------------+---------+
#How many days we got clicks >= 800
#adsense_r4.filter(lambda x: x.fldClicks >= 800).count()
df.select("fldClicks").where(col("fldClicks") >= 800).count()
20
df.filter(col("fldClicks") >= 800).select(F.sum(col("fldClicks")).alias('Sum')).show()
+-----+
| Sum|
+-----+
|17667|
+-----+
#Total number of clicks all the day
df.select(F.sum(col("fldClicks")).alias("All clicks count")).show()
+----------------+
|All clicks count|
+----------------+
| 248190|
+----------------+
#Total number of page views all the day
df.select(F.sum(col("fldPageViews")).alias("Total Page views")).show()
+----------------+
|Total Page views|
+----------------+
| 18626239|
+----------------+
#Total days when page views >= 50000
df.where(col("fldPageViews") >= 50000).count()
#answer : 5
#Total days when page views >= 10000
df.where(col("fldPageViews") >= 10000).count()
#answer : 649
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()
adsenseDF = spark.read.format("csv").option("header",True).option("inferSchema",True).load("E:\\DataSets\\adsense_data.csv")
adsenseDF.printSchema()
root
|-- date: timestamp (nullable = true)
|-- Page views: integer (nullable = true)
|-- Impressions: integer (nullable = true)
|-- Clicks: integer (nullable = true)
|-- Page RPM (USD): double (nullable = true)
|-- Impression RPM (USD): double (nullable = true)
|-- Active View Viewable: string (nullable = true)
|-- Estimated earnings (USD): double (nullable = true)
dfAdsense = adsenseDF.withColumnRenamed("date",'fldDate').withColumnRenamed('Page views','fldPageViews').withColumnRenamed('Impressions','fldImpressions').withColumnRenamed('Clicks','fldClicks').withColumnRenamed('Page RPM (USD)','fldPageRPM').withColumnRenamed('Impression RPM (USD)','fldimpressionRPM').withColumnRenamed("Active View Viewable","fldActiveViewViewable").withColumnRenamed("Estimated earnings (USD)","fldEstimatedEarnings")
dfAdsense.printSchema()
from pyspark.sql.functions import col
df = dfAdsense.filter(col("fldEstimatedEarnings") != 0)
print(df.count())
root
|-- fldDate: timestamp (nullable = true)
|-- fldPageViews: integer (nullable = true)
|-- fldImpressions: integer (nullable = true)
|-- fldClicks: integer (nullable = true)
|-- fldPageRPM: double (nullable = true)
|-- fldimpressionRPM: double (nullable = true)
|-- fldActiveViewViewable: string (nullable = true)
|-- fldEstimatedEarnings: double (nullable = true)
1511
from pyspark.sql.functions import hour, mean, year,month,dayofmonth
import pyspark.sql.functions as F
#df.select(year("fldDate"),month("fldDate")).show(5)
#f.select(year("fldDate").alias('year'), month("fldDate").alias('month'), dayofmonth("fldDate").alias('day')).show()
#df.groupBy(year("fldDate").alias("Year")).sum("fldEstimatedEarnings").orderBy("Year").show()
df.groupBy(year("fldDate").alias("Year")).agg(F.round(F.sum('fldEstimatedEarnings'),2).alias('Earnings')).orderBy("Year").show()
+----+--------+
|Year|Earnings|
+----+--------+
|2008| 374.73|
|2009| 1640.68|
|2010| 2883.28|
|2011| 9392.38|
|2012| 2702.84|
|2013| 1.28|
|2014| 0.03|
|2015| 0.21|
|2016| 0.25|
|2017| 0.06|
+----+--------+
from pyspark.sql.functions import hour, mean, year,month,dayofmonth
import pyspark.sql.functions as F
df_re1 = df.groupBy(month('fldDate').alias("Month"),year("fldDate").alias("Year")).agg(F.round(F.sum('fldEstimatedEarnings'),2).alias('Earnings')).orderBy("Year","Month")
+-----+----+--------+
|Month|Year|Earnings|
+-----+----+--------+
| 6|2008| 35.76|
| 7|2008| 82.05|
| 8|2008| 27.17|
| 9|2008| 21.12|
| 10|2008| 65.87|
| 11|2008| 78.38|
| 12|2008| 64.38|
| 1|2009| 130.1|
| 2|2009| 26.69|
| 3|2009| 59.68|
| 4|2009| 94.23|
| 5|2009| 186.15|
| 6|2009| 248.88|
| 7|2009| 190.95|
| 8|2009| 81.7|
| 9|2009| 60.99|
| 10|2009| 206.2|
| 11|2009| 198.68|
| 12|2009| 156.43|
| 1|2010| 101.46|
| 2|2010| 316.61|
| 3|2010| 179.72|
| 4|2010| 142.26|
| 5|2010| 120.05|
| 6|2010| 132.18|
| 7|2010| 134.75|
| 8|2010| 137.77|
| 9|2010| 265.05|
| 10|2010| 267.31|
| 11|2010| 372.2|
| 12|2010| 713.92|
| 1|2011| 478.87|
| 2|2011| 551.54|
| 3|2011| 549.9|
| 4|2011| 683.07|
| 5|2011| 620.94|
| 6|2011| 969.76|
| 7|2011| 1149.31|
| 8|2011| 1057.64|
| 9|2011| 1039.34|
| 10|2011| 966.37|
| 11|2011| 1138.69|
| 12|2011| 186.95|
| 1|2012| 0.48|
| 2|2012| 256.25|
| 3|2012| 564.13|
| 4|2012| 524.13|
| 5|2012| 504.83|
| 6|2012| 370.12|
| 7|2012| 261.1|
| 8|2012| 219.39|
| 9|2012| 2.38|
| 10|2012| 0.02|
| 12|2012| 0.01|
| 1|2013| 0.01|
| 3|2013| 0.25|
| 4|2013| 0.11|
| 5|2013| 0.09|
| 6|2013| 0.25|
| 7|2013| 0.03|
| 9|2013| 0.09|
| 10|2013| 0.32|
| 12|2013| 0.13|
| 2|2014| 0.03|
| 2|2015| 0.08|
| 4|2015| 0.03|
| 7|2015| 0.1|
| 2|2016| 0.01|
| 3|2016| 0.2|
| 9|2016| 0.03|
| 11|2016| 0.01|
| 6|2017| 0.06|
+-----+----+--------
df_re1.groupBy("Year").agg(F.round(F.sum("Earnings"),2)).orderBy("Year").show()
+----+-----------------------+
|Year|round(sum(Earnings), 2)|
+----+-----------------------+
|2008| 374.73|
|2009| 1640.68|
|2010| 2883.28|
|2011| 9392.38|
|2012| 2702.84|
|2013| 1.28|
|2014| 0.03|
|2015| 0.21|
|2016| 0.25|
|2017| 0.06|
+----+-----------------------+
No comments:
Post a Comment