Showing posts with label pyspark. Show all posts
Showing posts with label pyspark. Show all posts

Sunday, 16 August 2020

London Crime Analysis with DataFrames in Pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("LondonCrimes").getOrCreate()
data = spark.read.format("csv").option("header","true").load(r"D:\\Ex\\london_crime_by_lsoa.csv")

data.printSchema() 

root
 |-- lsoa_code: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- major_category: string (nullable = true)
 |-- minor_category: string (nullable = true)
 |-- value: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 
 
 data.show()
 
 +---------+--------------------+--------------------+--------------------+-----+----+-----+
|lsoa_code|             borough|      major_category|      minor_category|value|year|month|
+---------+--------------------+--------------------+--------------------+-----+----+-----+
|E01001116|             Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|E01001646|           Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|E01000677|             Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|E01003774|           Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|E01004563|          Wandsworth|             Robbery|   Personal Property|    0|2008|    6|
|E01001320|              Ealing|  Theft and Handling|         Other Theft|    0|2012|    5|
|E01001342|              Ealing|Violence Against ...|    Offensive Weapon|    0|2010|    7|
|E01002633|            Hounslow|             Robbery|   Personal Property|    0|2013|    4|
|E01003496|              Newham|     Criminal Damage|Criminal Damage T...|    0|2013|    9|
|E01004177|              Sutton|  Theft and Handling|Theft/Taking of P...|    1|2016|    8|
|E01001985|            Haringey|  Theft and Handling|Motor Vehicle Int...|    0|2013|   12|
|E01003076|             Lambeth|Violence Against ...|      Other violence|    0|2015|    4|
|E01003852|Richmond upon Thames|             Robbery|   Personal Property|    0|2014|    1|
|E01004547|          Wandsworth|Violence Against ...|    Offensive Weapon|    0|2011|   10|
|E01002398|          Hillingdon|  Theft and Handling|Theft/Taking Of M...|    0|2016|    2|
|E01002358|            Havering|Violence Against ...|        Wounding/GBH|    0|2012|    2|
|E01000086|Barking and Dagenham|  Theft and Handling|  Other Theft Person|    1|2009|    5|
|E01003708|           Redbridge|Violence Against ...|      Common Assault|    0|2009|    6|
|E01002945|Kingston upon Thames|  Theft and Handling|    Theft From Shops|    0|2016|   11|
|E01004195|              Sutton|               Drugs| Possession Of Drugs|    0|2009|   10|
+---------+--------------------+--------------------+--------------------+-----+----+-----+


data.count()
13490604 ==> 13.5 M rows

data.limit(5).show()
data.show(5)

+---------+----------+--------------------+--------------------+-----+----+-----+
|lsoa_code|   borough|      major_category|      minor_category|value|year|month|
+---------+----------+--------------------+--------------------+-----+----+-----+
|E01001116|   Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|E01001646| Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|E01000677|   Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|E01003774| Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|E01004563|Wandsworth|             Robbery|   Personal Property|    0|2008|    6|
+---------+----------+--------------------+--------------------+-----+----+-----+


data.limit(5).show(truncate=False)

+---------+----------+---------------------------+---------------------------+-----+----+-----+
|lsoa_code|borough   |major_category             |minor_category             |value|year|month|
+---------+----------+---------------------------+---------------------------+-----+----+-----+
|E01001116|Croydon   |Burglary                   |Burglary in Other Buildings|0    |2016|11   |
|E01001646|Greenwich |Violence Against the Person|Other violence             |0    |2016|11   |
|E01000677|Bromley   |Violence Against the Person|Other violence             |0    |2015|5    |
|E01003774|Redbridge |Burglary                   |Burglary in Other Buildings|0    |2016|3    |
|E01004563|Wandsworth|Robbery                    |Personal Property          |0    |2008|6    |
+---------+----------+---------------------------+---------------------------+-----+----+-----+


// Remove a column
data = data.drop('lsoa_code')
data.show(5,truncate=False)

+----------+---------------------------+---------------------------+-----+----+-----+
|borough   |major_category             |minor_category             |value|year|month|
+----------+---------------------------+---------------------------+-----+----+-----+
|Croydon   |Burglary                   |Burglary in Other Buildings|0    |2016|11   |
|Greenwich |Violence Against the Person|Other violence             |0    |2016|11   |
|Bromley   |Violence Against the Person|Other violence             |0    |2015|5    |
|Redbridge |Burglary                   |Burglary in Other Buildings|0    |2016|3    |
|Wandsworth|Robbery                    |Personal Property          |0    |2008|6    |
+----------+---------------------------+---------------------------+-----+----+-----+
only showing top 5 rows


// Unique boroughs
total_borough = data.select('borough').distinct()
total_borough.show(truncate=False)


total_borough.count()
33

// Display all unique boroughs
total_borough.show(total_borough.count(),truncate=False)

+----------------------+
|borough               |
+----------------------+
|Croydon               |
|Wandsworth            |
|Bexley                |
|Lambeth               |
|Barking and Dagenham  |
|Camden                |
|Greenwich             |
|Newham                |
|Tower Hamlets         |
|Hounslow              |
|Barnet                |
|Harrow                |
|Kensington and Chelsea|
|Islington             |
|Brent                 |
|Haringey              |
|Bromley               |
|Merton                |
|Westminster           |
|Hackney               |
|Southwark             |
|Enfield               |
|Ealing                |
|Sutton                |
|Hammersmith and Fulham|
|Kingston upon Thames  |
|Havering              |
|Hillingdon            |
|Waltham Forest        |
|Richmond upon Thames  |
|Redbridge             |
|City of London        |
|Lewisham              |
+----------------------+


// Filter only Havering rows
import pyspark.sql.functions as F
havering_data = data.filter(F.col("borough") == 'Havering' )
havering_data.show(5)


+--------+--------------------+--------------------+-----+----+-----+
| borough|      major_category|      minor_category|value|year|month|
+--------+--------------------+--------------------+-----+----+-----+
|Havering|Violence Against ...|        Wounding/GBH|    0|2012|    2|
|Havering|Violence Against ...|          Harassment|    0|2008|    1|
|Havering|    Fraud or Forgery|  Counted per Victim|    0|2015|   11|
|Havering|             Robbery|   Personal Property|    0|2009|    8|
|Havering|            Burglary|Burglary in a Dwe...|    1|2016|    8|
+--------+--------------------+--------------------+-----+----+-----+

// Filter either Havering or Hackney 
import pyspark.sql.functions as F
havering_or_hackney_data = data.filter( (F.col("borough") == 'Havering') | (F.col("borough") == 'Hackney') )
havering_or_hackney_data.show(5)

+--------+--------------------+--------------------+-----+----+-----+
| borough|      major_category|      minor_category|value|year|month|
+--------+--------------------+--------------------+-----+----+-----+
|Havering|Violence Against ...|        Wounding/GBH|    0|2012|    2|
| Hackney|     Criminal Damage|Criminal Damage T...|    0|2011|    6|
| Hackney|Violence Against ...|          Harassment|    1|2013|    2|
|Havering|Violence Against ...|          Harassment|    0|2008|    1|
|Havering|    Fraud or Forgery|  Counted per Victim|    0|2015|   11|
+--------+--------------------+--------------------+-----+----+-----+


// Display the records for 2015 and 2016 as Year
import pyspark.sql.functions as F
data_2k15_2k16 = data.filter( F.col("year").isin(["2015","2016"]))
data_2k15_2k16.show()


+--------------------+--------------------+--------------------+-----+----+-----+
|             borough|      major_category|      minor_category|value|year|month|
+--------------------+--------------------+--------------------+-----+----+-----+
|             Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|           Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|             Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|           Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|              Sutton|  Theft and Handling|Theft/Taking of P...|    1|2016|    8|
|             Lambeth|Violence Against ...|      Other violence|    0|2015|    4|
|          Hillingdon|  Theft and Handling|Theft/Taking Of M...|    0|2016|    2|
|Kingston upon Thames|  Theft and Handling|    Theft From Shops|    0|2016|   11|
|            Haringey|Violence Against ...|        Wounding/GBH|    0|2015|   12|
|            Lewisham|Violence Against ...|      Common Assault|    0|2016|    2|
|            Hounslow|     Criminal Damage|Criminal Damage T...|    0|2015|    2|
|             Bromley|     Criminal Damage|Criminal Damage T...|    1|2016|    4|
|            Haringey|     Criminal Damage|Criminal Damage T...|    0|2016|   12|
|           Southwark|               Drugs| Possession Of Drugs|    0|2015|    3|
|            Havering|    Fraud or Forgery|  Counted per Victim|    0|2015|   11|
|      Waltham Forest|Other Notifiable ...|      Going Equipped|    0|2015|    2|
|              Ealing|             Robbery|   Personal Property|    0|2015|    7|
|               Brent|  Theft and Handling|Motor Vehicle Int...|    0|2015|    9|
|            Hounslow|Violence Against ...|        Wounding/GBH|    2|2015|    8|
|           Southwark|  Theft and Handling|    Theft From Shops|    4|2016|    8|
+--------------------+--------------------+--------------------+-----+----+-----+



import pyspark.sql.functions as F
data_greater2k16 = data.filter( F.col("year") > 2015)
data_greater2k16.show(5)

+----------+--------------------+--------------------+-----+----+-----+
|   borough|      major_category|      minor_category|value|year|month|
+----------+--------------------+--------------------+-----+----+-----+
|   Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
| Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
| Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|    Sutton|  Theft and Handling|Theft/Taking of P...|    1|2016|    8|
|Hillingdon|  Theft and Handling|Theft/Taking Of M...|    0|2016|    2|
+----------+--------------------+--------------------+-----+----+-----+


// Grouping and counting of borough

import pyspark.sql.functions as F
borough_crime_count = data.groupBy("borough").agg(F.count("borough"))
borough_crime_count.show()
+--------------------+--------------+
|             borough|count(borough)|
+--------------------+--------------+
|             Croydon|        602100|
|          Wandsworth|        498636|
|              Bexley|        385668|
|             Lambeth|        519048|
|Barking and Dagenham|        311040|
|              Camden|        378432|
|           Greenwich|        421200|
|              Newham|        471420|
|       Tower Hamlets|        412128|
|            Hounslow|        395928|
|              Barnet|        572832|
|              Harrow|        365688|
|Kensington and Ch...|        296784|
|           Islington|        359208|
|               Brent|        490644|
|            Haringey|        413856|
|             Bromley|        523908|
|              Merton|        339876|
|         Westminster|        366660|
|             Hackney|        417744|
+--------------------+--------------+

// With Alias 
import pyspark.sql.functions as F
borough_crime_count = data.groupBy("borough").agg(F.count("borough").alias("Borough_Count"))

borough_crime_count.show()
+--------------------+-------------+
|             borough|Borough_Count|
+--------------------+-------------+
|             Croydon|       602100|
|          Wandsworth|       498636|
|              Bexley|       385668|
|             Lambeth|       519048|
|Barking and Dagenham|       311040|
|              Camden|       378432|
|           Greenwich|       421200|
|              Newham|       471420|
|       Tower Hamlets|       412128|
|            Hounslow|       395928|
|              Barnet|       572832|
|              Harrow|       365688|
|Kensington and Ch...|       296784|
|           Islington|       359208|
|               Brent|       490644|
|            Haringey|       413856|
|             Bromley|       523908|
|              Merton|       339876|
|         Westminster|       366660|
|             Hackney|       417744|
+--------------------+-------------+


borough_conviction_sum = data.groupBy("borough").agg({"value":"sum"})
borough_conviction_sum.show(5)

+--------------------+----------+
|             borough|sum(value)|
+--------------------+----------+
|             Croydon|  260294.0|
|          Wandsworth|  204741.0|
|              Bexley|  114136.0|
|             Lambeth|  292178.0|
|Barking and Dagenham|  149447.0|
+--------------------+----------+


// Renaming a column
borough_conviction_sum = data.groupBy("borough").agg({"value":"sum"}).withColumnRenamed("sum(value)","Convictions")
borough_conviction_sum.show(5)

+--------------------+-----------+
|             borough|Convictions|
+--------------------+-----------+
|             Croydon|   260294.0|
|          Wandsworth|   204741.0|
|              Bexley|   114136.0|
|             Lambeth|   292178.0|
|Barking and Dagenham|   149447.0|
+--------------------+-----------+


total_borough_convictions = borough_conviction_sum.agg({"convictions":"sum"})
total_borough_convictions.show()

+----------------+
|sum(convictions)|
+----------------+
|       6447758.0|
+----------------+


otal_borough_convictions = borough_conviction_sum.agg({"convictions":"sum"}).withColumnRenamed("sum(convictions)","TotalSum")
total_borough_convictions.show()

+---------+
| TotalSum|
+---------+
|6447758.0|
+---------+

RDD and DataFrame Examples in Pyspark

a = spark.sparkContext.parallelize((1,2,3,4,5,6))
print(a.getNumPartitions) 
4


from pyspark.sql.types import Row
from datetime import datetime

simple_data = sc.parallelize( ([100,"Raja",500], [101, "Kala",550], [102,"Raji",650]))

print(simple_data.count())
3


print(simple_data.first())
[100, 'Raja', 500]


print(simple_data.take(2))
[[100, 'Raja', 500], [101, 'Kala', 550]]


print(simple_data.collect())
[[100, 'Raja', 500], [101, 'Kala', 550], [102, 'Raji', 650]]
 


df = simple_data.toDF(["RollNo","Name","Wages"])

df.printSchema()
root
 |-- RollNo: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Wages: long (nullable = true)


df.show()

+------+----+-----+
|RollNo|Name|Wages|
+------+----+-----+
|   100|Raja|  500|
|   101|Kala|  550|
|   102|Raji|  650|
+------+----+-----+


studentsRDD =  sc.parallelize([
Row(id=100,name="Ravi",score=50,Gender="M"),
Row(id=101,name="Janani",score=66,Gender="F"),
Row(id=102,name="Siva",score=63,Gender="M"), 
Row(id=103,name="Aishvarya",score=58,Gender="F"), 
Row(id=104,name="Vijay",score=52,Gender="M"),
Row(id=105,name="Varadaraj",score=55,Gender="M"),
Row(id=106,name="Parvathi",score=60,Gender="F"),
Row(id=107,name="Aasqhique",score=62,Gender="M")
])


print(studentsRDD.take(3))

[Row(Gender='M', id=100, name='Ravi', score=50), Row(Gender='F', id=101, name='Janani', score=66), Row(Gender='M', id=102, name='Siva', score=63)]



studentsRDD.count()
8

studentsRDD.collect()
[Row(Gender='M', id=100, name='Ravi', score=50),
 Row(Gender='F', id=101, name='Janani', score=66),
 Row(Gender='M', id=102, name='Siva', score=63),
 Row(Gender='F', id=103, name='Aishvarya', score=58),
 Row(Gender='M', id=104, name='Vijay', score=52),
 Row(Gender='M', id=105, name='Varadaraj', score=55),
 Row(Gender='F', id=106, name='Parvathi', score=60),
 Row(Gender='M', id=107, name='Aasqhique', score=62)]
 
 



df = studentsRDD.toDF()

df.show()
 +------+---+---------+-----+
|Gender| id|     name|score|
+------+---+---------+-----+
|     M|100|     Ravi|   50|
|     F|101|   Janani|   66|
|     M|102|     Siva|   63|
|     F|103|Aishvarya|   58|
|     M|104|    Vijay|   52|
|     M|105|Varadaraj|   55|
|     F|106| Parvathi|   60|
|     M|107|Aasqhique|   62|
+------+---+---------+-----+



// Display only Male candiates

from pyspark.sql import functions as F
df.select("id","name","score","Gender").where(F.col("Gender") == "M").show()

+---+---------+-----+------+
| id|     name|score|Gender|
+---+---------+-----+------+
|100|     Ravi|   50|     M|
|102|     Siva|   63|     M|
|104|    Vijay|   52|     M|
|105|Varadaraj|   55|     M|
|107|Aasqhique|   62|     M|
+---+---------+-----+------+

// Display only Female candiates 
from pyspark.sql import functions as F
df.where(F.col("Gender") == "F").show()

+------+---+---------+-----+
|Gender| id|     name|score|
+------+---+---------+-----+
|     F|101|   Janani|   66|
|     F|103|Aishvarya|   58|
|     F|106| Parvathi|   60|
+------+---+---------+-----+

df.printSchema()

root
 |-- Gender: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- score: long (nullable = true)
 
 
 
 
from pyspark.sql import functions as F
df.groupBy("Gender").agg(F.sum(F.col("score")).alias("Summ")).show()

+------+----+
|Gender|Summ|
+------+----+
|     F| 184|
|     M| 282|
+------+----+



cmplx_data = sc.parallelize([Row(
col_list = ['a','b','c'],
col_dict = {"India":"Delhi"},
col_row  = Row(id=100,name="Ravi",score=33,city="Bangalore",Gender="M"),
col_time =datetime(2020,1,1,14,1,5)
),
Row(
col_list = ['z','y','x','w'],
col_dict = {"Srilanka":"Colombo"},
col_row  = Row(id=100,name="Ravi",score=33,city="Bangalore",Gender="M"),
col_time =datetime(2019,2,1,14,1,5)
),
Row(
col_list = ['m','n','o','q','r'],
col_dict = {"Myanmar":"Naypyidaw"},
col_row  = Row(id=100,name="Ravi",score=33,city="Bangalore",Gender="M"),
col_time =datetime(2018,3,1,14,1,5)
),
Row(
col_list = ['s','t','u','v','w','x','y'],
col_dict = {"Pakistan":"Islamabad"},
col_row  = Row(id=100,name="Ravi",score=33,city="Bangalore",Gender="M"),
col_time =datetime(2017,4,1,14,1,5)
),
Row(
col_list = ['h','i','j','k','l','m','n','o','p'],
col_dict = {"Bangaladesh":"Dhaka"},
col_row  = Row(id=100,name="Ravi",score=33,city="Bangalore",Gender="M"),
col_time =datetime(2016,5,1,14,1,5)
)
])


df = cmplx_data.toDF()

// Display the complex datatypes schema
df.printSchema()

root
 |-- col_dict: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- col_list: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- col_row: struct (nullable = true)
 |    |-- Gender: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- score: long (nullable = true)
 |-- col_time: timestamp (nullable = true)
 
 df.show()
 
+--------------------+--------------------+--------------------+-------------------+
|            col_dict|            col_list|             col_row|           col_time|
+--------------------+--------------------+--------------------+-------------------+
|    [India -> Delhi]|           [a, b, c]|[M, Bangalore, 10...|2020-01-01 14:01:05|
|[Srilanka -> Colo...|        [z, y, x, w]|[M, Bangalore, 10...|2019-02-01 14:01:05|
|[Myanmar -> Naypy...|     [m, n, o, q, r]|[M, Bangalore, 10...|2018-03-01 14:01:05|
|[Pakistan -> Isla...|[s, t, u, v, w, x...|[M, Bangalore, 10...|2017-04-01 14:01:05|
|[Bangaladesh -> D...|[h, i, j, k, l, m...|[M, Bangalore, 10...|2016-05-01 14:01:05|
+--------------------+--------------------+--------------------+-------------------+


// Display Year and Month of col_time 
from pyspark.sql import functions as F
df.select(F.year(F.col("col_time")).alias("Year"), F.month(F.col("col_time")).alias("Month")).show()


+----+-----+
|Year|Month|
+----+-----+
|2020|    1|
|2019|    2|
|2018|    3|
|2017|    4|
|2016|    5|
+----+-----+

Saturday, 30 May 2020

Call Log Data Extraction using Pyspark

def extract(str):
    pnos = re.search('\d{20}',str).group()
    fromno = pnos[0:10]
    tono = pnos[10:20]
    #status = re.search('[A-Z]{6,7}',str).group()
    
    status=re.search('SUCCESS|DROPPED|FAILED',str).group()
    timestamps = re.findall('(\d{4}-\d{2}-\d{2}\s{1}\d{2}:\d{2}:\d{2})',str)
    starttime = timestamps[0]
    endtime=timestamps[1]
    return (fromno,tono,status,starttime,endtime)


for logg in inputs:
    print(extract(logg))
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()
data = spark.sparkContext.textFile("E:\\vow\\calllogdata.txt")
#data2 = data.map(lambda x:x.encode('utf-8'))

data.first()
'ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:05DROPPED 80526900577757919463'

data2 = data.map(lambda x:extract(x))
data2.take(1)

[('8052690057',
  '7757919463',
  'DROPPED',
  '2014-03-15 00:02:48',
  '2014-03-15 00:06:05')]
  
  
df = data2.toDF(["fromno","tono","status","starttime","endtime"])
df.printSchema()

root
 |-- fromno: string (nullable = true)
 |-- tono: string (nullable = true)
 |-- status: string (nullable = true)
 |-- starttime: string (nullable = true)  #starttime should be timestamp
 |-- endtime: string (nullable = true)   #endtime should be timestamp
 
from pyspark.sql.functions import col, current_date

#changing the datatype into timestamp, create new column 'day' with current_date
dfCall = df.withColumn("starttime",col("starttime").cast("timestamp")).
withColumn("endtime",col("endtime").cast("timestamp")).
withColumn("day",current_date())
dfCall.printSchema()

#we have changed the datatype of starttime, endtime fields into timestamps
root
 |-- fromno: string (nullable = true)
 |-- tono: string (nullable = true)
 |-- status: string (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- endtime: timestamp (nullable = true)
 |-- day: date (nullable = false)


dfCall.show(5)

+----------+----------+-------+-------------------+-------------------+----------+
|    fromno|      tono| status|          starttime|            endtime|       day|
+----------+----------+-------+-------------------+-------------------+----------+
|8052690057|7757919463|DROPPED|2014-03-15 00:02:48|2014-03-15 00:06:05|2020-05-31|
|9886177375|9916790556|DROPPED|2014-03-15 00:02:48|2014-03-15 00:06:07|2020-05-31|
|8618627996|9886177375|SUCCESS|2014-03-16 00:02:48|2014-03-16 00:06:45|2020-05-31|
|9876515616|4894949494|DROPPED|2014-03-16 00:02:48|2014-03-16 00:06:53|2020-05-31|
|5454545454|6469496477| FAILED|2014-03-16 00:02:48|2014-03-16 00:06:12|2020-05-31|
+----------+----------+-------+-------------------+-------------------+----------+

dfCall.write.format("orc").mode("append").partionBy("day").save("e:\\datasets\callout")

Call log Regular Expressions in Pyspark

import re

in1 = 'ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:332014-03-15 06:03:42DROPPED 80526900577757919463'
in2 = 'ec59cea2-5006-448f-a047-d5e53f33be232014-03-19 00:03:482014-03-19 05:02:33DROPPED 57554548979797979797'
in3 = 'ec59cea2-5006-448f-a039-d5e53f33be232014-03-17 00:04:452014-03-17 04:06:05FAILED  44554584848449644469'
in4 = 'ec59cea2-5006-448f-a045-d5e53f33be232014-03-19 00:05:482014-03-19 03:02:34SUCCESS 84645645605646064646'
in5 = 'ec59cea2-5006-448f-a050-d5e53f33be232014-03-20 00:06:282014-03-20 01:06:05SUCCESS 74894086489489489489'
inputs = list((in1,in2,in3,in4,in5))


def extract(str):
    pnos = re.search('\d{20}',str).group()
    fromno = pnos[0:10]
    tono = pnos[10:20]
    #status = re.search('[A-Z]{6,7}',str).group()
    
    status=re.search('SUCCESS|DROPPED|FAILED',str).group()
    timestamps = re.findall('(\d{4}-\d{2}-\d{2}\s{1}\d{2}:\d{2}:\d{2})',str)
    starttime = timestamps[0]
    endtime=timestamps[1]
    return (fromno,tono,status,starttime,endtime)


for logg in inputs:
    print(extract(logg))

Google Adsense data Analysis using Pyspark DataFrame

adsenseDF = spark.read.format("csv").option("header",True).option("inferSchema",True).load("E:\\DataSets\\adsense_data.csv")
adsenseDF.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- Page views: integer (nullable = true)
 |-- Impressions: integer (nullable = true)
 |-- Clicks: integer (nullable = true)
 |-- Page RPM (USD): double (nullable = true)
 |-- Impression RPM (USD): double (nullable = true)
 |-- Active View Viewable: string (nullable = true)
 |-- Estimated earnings (USD): double (nullable = true)

dfAdsense = adsenseDF.withColumnRenamed("date",'fldDate').withColumnRenamed('Page views','fldPageViews').withColumnRenamed('Impressions','fldImpressions').withColumnRenamed('Clicks','fldClicks').withColumnRenamed('Page RPM (USD)','fldPageRPM').withColumnRenamed('Impression RPM (USD)','fldimpressionRPM').withColumnRenamed("Active View Viewable","fldActiveViewViewable").withColumnRenamed("Estimated earnings (USD)","fldEstimatedEarnings")
dfAdsense.printSchema()

root
 |-- fldDate: timestamp (nullable = true)
 |-- fldPageViews: integer (nullable = true)
 |-- fldImpressions: integer (nullable = true)
 |-- fldClicks: integer (nullable = true)
 |-- fldPageRPM: double (nullable = true)
 |-- fldimpressionRPM: double (nullable = true)
 |-- fldActiveViewViewable: string (nullable = true)
 |-- fldEstimatedEarnings: double (nullable = true)



#delete the records which are having zero revenue
from pyspark.sql.functions import col
df = dfAdsense.filter(col("fldEstimatedEarnings") != 0)
print(df.count())

1511


#Find the total Earnings 
from pyspark.sql import functions as F 
df.select(F.sum(col("fldEstimatedEarnings")).alias("Estimated Earnings Total")).show()

+------------------------+
|Estimated Earnings Total|
+------------------------+
|      16995.739999999976|
+------------------------+

#Find the top 10 Earnings without Date
from pyspark.sql.functions import col
df.select(col("fldEstimatedEarnings")) .sort(col("fldEstimatedEarnings").desc()).show(10)

+--------------------+
|fldEstimatedEarnings|
+--------------------+
|               63.71|
|               54.14|
|               53.77|
|               52.73|
|               52.14|
|               50.86|
|               49.63|
|               48.91|
|                48.8|
|               48.58|
+--------------------+

#Find the top 10 Earnings without Date
from pyspark.sql.functions import desc
df.select("fldEstimatedEarnings").sort(desc("fldEstimatedEarnings")).show(10)

+--------------------+
|fldEstimatedEarnings|
+--------------------+
|               63.71|
|               54.14|
|               53.77|
|               52.73|
|               52.14|
|               50.86|
|               49.63|
|               48.91|
|                48.8|
|               48.58|
+--------------------+


#Find the top 10 Earnings with Date

from pyspark.sql.functions import desc
df.select("fldDate","fldEstimatedEarnings").sort(desc("fldEstimatedEarnings")).show(10)

+-------------------+--------------------+
|            fldDate|fldEstimatedEarnings|
+-------------------+--------------------+
|2010-02-25 00:00:00|               63.71|
|2011-11-28 00:00:00|               54.14|
|2010-12-18 00:00:00|               53.77|
|2011-11-21 00:00:00|               52.73|
|2010-12-19 00:00:00|               52.14|
|2011-07-13 00:00:00|               50.86|
|2011-12-02 00:00:00|               49.63|
|2011-06-24 00:00:00|               48.91|
|2011-09-27 00:00:00|                48.8|
|2011-07-04 00:00:00|               48.58|
+-------------------+--------------------+


 #Find the least 10 Earnings without Date
 df.select("fldEstimatedEarnings").sort(("fldEstimatedEarnings")).show(10)
 
 +--------------------+
|fldEstimatedEarnings|
+--------------------+
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
|                0.01|
+--------------------+

#Find the least 10 Earnings with Date
df.select("fldDate","fldEstimatedEarnings").sort(("fldEstimatedEarnings")).show(10)

+-------------------+--------------------+
|            fldDate|fldEstimatedEarnings|
+-------------------+--------------------+
|2008-09-14 00:00:00|                0.01|
|2012-12-22 00:00:00|                0.01|
|2008-08-30 00:00:00|                0.01|
|2009-02-17 00:00:00|                0.01|
|2012-01-28 00:00:00|                0.01|
|2009-03-07 00:00:00|                0.01|
|2012-02-06 00:00:00|                0.01|
|2011-12-13 00:00:00|                0.01|
|2012-01-24 00:00:00|                0.01|
|2008-09-08 00:00:00|                0.01|
+-------------------+--------------------+

#in which dates we got Estimated Earnings >= 40
from pyspark.sql.functions import col
df.select("fldDate","fldEstimatedEarnings").where (col("fldEstimatedEarnings") >= 40).show()

+-------------------+--------------------+
|            fldDate|fldEstimatedEarnings|
+-------------------+--------------------+
|2010-02-25 00:00:00|               63.71|
|2011-07-04 00:00:00|               48.58|
|2011-11-21 00:00:00|               52.73|
|2011-11-28 00:00:00|               54.14|
|2011-07-08 00:00:00|               46.45|
|2011-11-20 00:00:00|               42.98|
|2010-12-18 00:00:00|               53.77|
|2011-11-29 00:00:00|               43.27|
|2011-11-17 00:00:00|               41.15|
|2011-11-22 00:00:00|               43.78|
|2011-11-30 00:00:00|               46.69|
|2011-12-02 00:00:00|               49.63|
|2011-11-15 00:00:00|               41.43|
|2011-09-17 00:00:00|               42.28|
|2011-07-12 00:00:00|               46.85|
|2011-07-11 00:00:00|               43.33|
|2011-07-09 00:00:00|               44.45|
|2011-07-05 00:00:00|               43.08|
|2011-09-18 00:00:00|               43.02|
|2011-06-22 00:00:00|               44.82|
+-------------------+--------------------+

 #How many days we got Estimated Earnings >= 40?
 df.where(col("fldEstimatedEarnings") >= 40).count()
 
 47
 
 >10 and <= 50 - count
 df.filter( (col("fldEstimatedEarnings") >= 10) & (col("fldEstimatedEarnings") <= 50)).count()
 
 583
 
 #top 10 page views
from pyspark.sql.functions import desc
df.select("fldDate","fldPageViews").sort(desc("fldPageViews")).show(10)


+-------------------+------------+
|            fldDate|fldPageViews|
+-------------------+------------+
|2010-02-25 00:00:00|       61358|
|2011-07-03 00:00:00|       60066|
|2011-04-03 00:00:00|       59079|
|2011-07-04 00:00:00|       55221|
|2011-11-21 00:00:00|       50808|
|2011-07-08 00:00:00|       49545|
|2011-11-20 00:00:00|       49020|
|2011-11-13 00:00:00|       48613|
|2011-11-28 00:00:00|       48360|
|2011-11-22 00:00:00|       47939|
+-------------------+------------+

#top 10 clicks 
from pyspark.sql.functions import desc
df.select("fldDate","fldClicks").sort(desc("fldClicks")).show(10)

+-------------------+---------+
|            fldDate|fldClicks|
+-------------------+---------+
|2011-11-26 00:00:00|     1044|
|2011-11-25 00:00:00|     1026|
|2011-11-28 00:00:00|      996|
|2011-11-29 00:00:00|      968|
|2011-11-18 00:00:00|      924|
|2011-11-16 00:00:00|      920|
|2011-11-17 00:00:00|      917|
|2011-11-12 00:00:00|      912|
|2011-08-06 00:00:00|      878|
|2011-11-13 00:00:00|      873|
+-------------------+---------+


# no of clicks >= 800 - Descending order
from pyspark.sql.functions import desc
df.select("fldDate","fldClicks").where(col('fldClicks') >= 800).sort(desc("fldClicks")).show()

+-------------------+---------+
|            fldDate|fldClicks|
+-------------------+---------+
|2011-11-26 00:00:00|     1044|
|2011-11-25 00:00:00|     1026|
|2011-11-28 00:00:00|      996|
|2011-11-29 00:00:00|      968|
|2011-11-18 00:00:00|      924|
|2011-11-16 00:00:00|      920|
|2011-11-17 00:00:00|      917|
|2011-11-12 00:00:00|      912|
|2011-08-06 00:00:00|      878|
|2011-11-13 00:00:00|      873|
|2011-11-30 00:00:00|      849|
|2011-07-23 00:00:00|      845|
|2011-12-02 00:00:00|      842|
|2011-11-15 00:00:00|      829|
|2011-11-27 00:00:00|      823|
|2011-07-09 00:00:00|      812|
|2011-07-08 00:00:00|      806|
|2011-07-04 00:00:00|      802|
|2011-11-19 00:00:00|      801|
|2011-11-11 00:00:00|      800|

#no of clicks >= 800 - Ascending Order
from pyspark.sql.functions import desc
df.select("fldDate","fldClicks").where(col('fldClicks') >= 800).sort("fldClicks").show()

+-------------------+---------+
|            fldDate|fldClicks|
+-------------------+---------+
|2011-11-11 00:00:00|      800|
|2011-11-19 00:00:00|      801|
|2011-07-04 00:00:00|      802|
|2011-07-08 00:00:00|      806|
|2011-07-09 00:00:00|      812|
|2011-11-27 00:00:00|      823|
|2011-11-15 00:00:00|      829|
|2011-12-02 00:00:00|      842|
|2011-07-23 00:00:00|      845|
|2011-11-30 00:00:00|      849|
|2011-11-13 00:00:00|      873|
|2011-08-06 00:00:00|      878|
|2011-11-12 00:00:00|      912|
|2011-11-17 00:00:00|      917|
|2011-11-16 00:00:00|      920|
|2011-11-18 00:00:00|      924|
|2011-11-29 00:00:00|      968|
|2011-11-28 00:00:00|      996|
|2011-11-25 00:00:00|     1026|
|2011-11-26 00:00:00|     1044|
+-------------------+---------+


 
#How many days we got clicks >= 800
#adsense_r4.filter(lambda x: x.fldClicks >= 800).count()
df.select("fldClicks").where(col("fldClicks") >= 800).count()

20


df.filter(col("fldClicks") >= 800).select(F.sum(col("fldClicks")).alias('Sum')).show()
+-----+
|  Sum|
+-----+
|17667|
+-----+

#Total number of clicks all the day 
df.select(F.sum(col("fldClicks")).alias("All clicks count")).show()

+----------------+
|All clicks count|
+----------------+
|          248190|
+----------------+

#Total number of page views all the day 
df.select(F.sum(col("fldPageViews")).alias("Total Page views")).show()

+----------------+
|Total Page views|
+----------------+
|        18626239|
+----------------+

#Total days when page views >= 50000
df.where(col("fldPageViews") >= 50000).count()
#answer : 5

#Total days when page views >= 10000
df.where(col("fldPageViews") >= 10000).count()
#answer : 649



from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()
adsenseDF = spark.read.format("csv").option("header",True).option("inferSchema",True).load("E:\\DataSets\\adsense_data.csv")
adsenseDF.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- Page views: integer (nullable = true)
 |-- Impressions: integer (nullable = true)
 |-- Clicks: integer (nullable = true)
 |-- Page RPM (USD): double (nullable = true)
 |-- Impression RPM (USD): double (nullable = true)
 |-- Active View Viewable: string (nullable = true)
 |-- Estimated earnings (USD): double (nullable = true)
 
 
dfAdsense = adsenseDF.withColumnRenamed("date",'fldDate').withColumnRenamed('Page views','fldPageViews').withColumnRenamed('Impressions','fldImpressions').withColumnRenamed('Clicks','fldClicks').withColumnRenamed('Page RPM (USD)','fldPageRPM').withColumnRenamed('Impression RPM (USD)','fldimpressionRPM').withColumnRenamed("Active View Viewable","fldActiveViewViewable").withColumnRenamed("Estimated earnings (USD)","fldEstimatedEarnings")
dfAdsense.printSchema()

from pyspark.sql.functions import col
df = dfAdsense.filter(col("fldEstimatedEarnings") != 0)
print(df.count())



root
 |-- fldDate: timestamp (nullable = true)
 |-- fldPageViews: integer (nullable = true)
 |-- fldImpressions: integer (nullable = true)
 |-- fldClicks: integer (nullable = true)
 |-- fldPageRPM: double (nullable = true)
 |-- fldimpressionRPM: double (nullable = true)
 |-- fldActiveViewViewable: string (nullable = true)
 |-- fldEstimatedEarnings: double (nullable = true)

1511

from pyspark.sql.functions import hour, mean, year,month,dayofmonth
import pyspark.sql.functions as F

#df.select(year("fldDate"),month("fldDate")).show(5)
#f.select(year("fldDate").alias('year'), month("fldDate").alias('month'), dayofmonth("fldDate").alias('day')).show()
#df.groupBy(year("fldDate").alias("Year")).sum("fldEstimatedEarnings").orderBy("Year").show()

df.groupBy(year("fldDate").alias("Year")).agg(F.round(F.sum('fldEstimatedEarnings'),2).alias('Earnings')).orderBy("Year").show()


+----+--------+
|Year|Earnings|
+----+--------+
|2008|  374.73|
|2009| 1640.68|
|2010| 2883.28|
|2011| 9392.38|
|2012| 2702.84|
|2013|    1.28|
|2014|    0.03|
|2015|    0.21|
|2016|    0.25|
|2017|    0.06|
+----+--------+



from pyspark.sql.functions import hour, mean, year,month,dayofmonth
import pyspark.sql.functions as F
df_re1 = df.groupBy(month('fldDate').alias("Month"),year("fldDate").alias("Year")).agg(F.round(F.sum('fldEstimatedEarnings'),2).alias('Earnings')).orderBy("Year","Month")

+-----+----+--------+
|Month|Year|Earnings|
+-----+----+--------+
|    6|2008|   35.76|
|    7|2008|   82.05|
|    8|2008|   27.17|
|    9|2008|   21.12|
|   10|2008|   65.87|
|   11|2008|   78.38|
|   12|2008|   64.38|
|    1|2009|   130.1|
|    2|2009|   26.69|
|    3|2009|   59.68|
|    4|2009|   94.23|
|    5|2009|  186.15|
|    6|2009|  248.88|
|    7|2009|  190.95|
|    8|2009|    81.7|
|    9|2009|   60.99|
|   10|2009|   206.2|
|   11|2009|  198.68|
|   12|2009|  156.43|
|    1|2010|  101.46|
|    2|2010|  316.61|
|    3|2010|  179.72|
|    4|2010|  142.26|
|    5|2010|  120.05|
|    6|2010|  132.18|
|    7|2010|  134.75|
|    8|2010|  137.77|
|    9|2010|  265.05|
|   10|2010|  267.31|
|   11|2010|   372.2|
|   12|2010|  713.92|
|    1|2011|  478.87|
|    2|2011|  551.54|
|    3|2011|   549.9|
|    4|2011|  683.07|
|    5|2011|  620.94|
|    6|2011|  969.76|
|    7|2011| 1149.31|
|    8|2011| 1057.64|
|    9|2011| 1039.34|
|   10|2011|  966.37|
|   11|2011| 1138.69|
|   12|2011|  186.95|
|    1|2012|    0.48|
|    2|2012|  256.25|
|    3|2012|  564.13|
|    4|2012|  524.13|
|    5|2012|  504.83|
|    6|2012|  370.12|
|    7|2012|   261.1|
|    8|2012|  219.39|
|    9|2012|    2.38|
|   10|2012|    0.02|
|   12|2012|    0.01|
|    1|2013|    0.01|
|    3|2013|    0.25|
|    4|2013|    0.11|
|    5|2013|    0.09|
|    6|2013|    0.25|
|    7|2013|    0.03|
|    9|2013|    0.09|
|   10|2013|    0.32|
|   12|2013|    0.13|
|    2|2014|    0.03|
|    2|2015|    0.08|
|    4|2015|    0.03|
|    7|2015|     0.1|
|    2|2016|    0.01|
|    3|2016|     0.2|
|    9|2016|    0.03|
|   11|2016|    0.01|
|    6|2017|    0.06|
+-----+----+-------- 


df_re1.groupBy("Year").agg(F.round(F.sum("Earnings"),2)).orderBy("Year").show()

+----+-----------------------+
|Year|round(sum(Earnings), 2)|
+----+-----------------------+
|2008|                 374.73|
|2009|                1640.68|
|2010|                2883.28|
|2011|                9392.38|
|2012|                2702.84|
|2013|                   1.28|
|2014|                   0.03|
|2015|                   0.21|
|2016|                   0.25|
|2017|                   0.06|
+----+-----------------------+

Friday, 29 May 2020

Google Adsense Data Analysis using Pyspark RDDs

import collections
from datetime import datetime 

Adsense = collections.namedtuple('Adsense',['fldDate','fldPageViews','fldImpressions','fldClicks','fldPageRPM','fldImpressionRPM','fldActiveViewViewable','fldEstimatedEarnings']) 

def parseAdsense(_row):
fields = _row.split(",")
_date = fields[0]
_pageViews = (int) (fields[1])
_impressions = (int) (fields[2])
_clicks = (int) (fields[3])
_pageRPM  =  fields[4]
_impressionRPM =  fields[5]
_activeviewviewable = fields[6]
_estimatedEarnings = (float) (fields[7])
_adsense = Adsense(_date,_pageViews,_impressions,_clicks,_pageRPM,_impressionRPM,_activeviewviewable,_estimatedEarnings)
return _adsense

#print(parseMovie("1::Toy Story (1995)::Animation|Children's|Comedy"))
adsense_r1 = spark.sparkContext.textFile("E:\\DataSets\\adsense_data.csv")
#print(adsense_r1.count())
#for j in adsense_r1.collect():
#    print(j)
headerInfo  = adsense_r1.first()
#print(headerInfo)

adsense_r2 = adsense_r1.filter(lambda x: headerInfo not in x)
#print(adsense_2.first())
#print(adsense_2.count())
#for j in adsense_r2.collect():
#    print(j)


adsense_r3 = adsense_r2.map(lambda x:(str)(x)).map(parseAdsense)
print(adsense_r3.count())

adsense_r4 = adsense_r3.filter(lambda x: x.fldEstimatedEarnings != 0)
print(adsense_r4.count())


2452
1511

#Find the total Earnings 
totalEarnings = adsense_r4.map(lambda x: x.fldEstimatedEarnings).reduce(lambda x,y : x+y)

print(totalEarnings) #16995.739999999994


#Find the top 10 Earnings without Date
adsense_r4.map(lambda x: x.fldEstimatedEarnings).sortBy(lambda x: x, False).take(10)

[63.71, 54.14, 53.77, 52.73, 52.14, 50.86, 49.63, 48.91, 48.8, 48.58]

#Find the top 10 Earnings with Date
adsense_r4.map(lambda x: (x.fldDate, x.fldEstimatedEarnings)).sortBy(lambda x: x[1], False).take(10)

[('2010-02-25', 63.71),
 ('2011-11-28', 54.14),
 ('2010-12-18', 53.77),
 ('2011-11-21', 52.73),
 ('2010-12-19', 52.14),
 ('2011-07-13', 50.86),
 ('2011-12-02', 49.63),
 ('2011-06-24', 48.91),
 ('2011-09-27', 48.8),
 ('2011-07-04', 48.58)]
 
 
 #Find the least 10 Earnings without Date
adsense_r4.map(lambda x: x.fldEstimatedEarnings).sortBy(lambda x: x, True).take(10)

[0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01]


#Find the least 10 Earnings with Date
adsense_r4.map(lambda x: (x.fldDate, x.fldEstimatedEarnings)).sortBy(lambda x: x[1], True).take(10)

[('2009-02-09', 0.01),
 ('2009-02-17', 0.01),
 ('2011-12-10', 0.01),
 ('2009-03-07', 0.01),
 ('2011-12-13', 0.01),
 ('2008-09-11', 0.01),
 ('2008-09-08', 0.01),
 ('2008-09-03', 0.01),
 ('2008-09-05', 0.01),
 ('2011-12-06', 0.01)]

#in which dates we got Estimated Earnings >= 40
adsense_r4.filter(lambda x: x.fldEstimatedEarnings >= 40).map(lambda x: (x.fldDate, x.fldEstimatedEarnings)).sortBy(lambda x: x[1], False).collect()

[('2010-02-25', 63.71),
 ('2011-11-28', 54.14),
 ('2010-12-18', 53.77),
 ('2011-11-21', 52.73),
 ('2010-12-19', 52.14),
 ('2011-07-13', 50.86),
 ('2011-12-02', 49.63),
 ('2011-06-24', 48.91),
 ('2011-09-27', 48.8),
 ('2011-07-04', 48.58),
 ('2011-06-23', 47.93),
 ('2011-07-12', 46.85),
 ('2011-11-30', 46.69),
 ('2011-07-08', 46.45),
 ('2010-12-14', 45.68),
 ('2011-06-26', 45.5),
 ('2011-10-19', 45.22),
 ('2011-06-30', 45.02),
 ('2011-06-22', 44.82),
 ('2011-06-25', 44.58),
 ('2011-07-16', 44.56),
 ('2011-07-09', 44.45),
 ('2011-11-23', 44.23),
 ('2011-07-27', 44.0),
 ('2011-11-22', 43.78),
 ('2011-09-21', 43.59),
 ('2011-11-24', 43.48),
 ('2011-07-11', 43.33),
 ('2011-11-29', 43.27),
 ('2011-07-05', 43.08),
 ('2011-09-18', 43.02),
 ('2011-11-20', 42.98),
 ('2011-09-13', 42.29),
 ('2011-09-17', 42.28),
 ('2011-08-09', 42.17),
 ('2011-10-27', 42.08),
 ('2011-09-20', 41.96),
 ('2011-07-23', 41.6),
 ('2011-09-23', 41.5),
 ('2011-11-15', 41.43),
 ('2011-10-18', 41.4),
 ('2011-07-14', 41.36),
 ('2011-07-10', 41.21),
 ('2011-11-17', 41.15),
 ('2011-09-19', 40.69),
 ('2011-11-26', 40.49),
 ('2011-08-05', 40.05)]
 
 
 How many days we got Estimated Earnings >= 40?
 
 adsense_r4.filter(lambda x: x.fldEstimatedEarnings >= 40).count()


47


>10 and <= 50
adsense_r4.filter(lambda x: x.fldEstimatedEarnings >= 10 and x.fldEstimatedEarnings <= 50).map(lambda x: (x.fldDate, x.fldEstimatedEarnings)).sortBy(lambda x: x[1], False).collect()


>10 and <= 50 - count
adsense_r4.filter(lambda x: x.fldEstimatedEarnings >= 10 and x.fldEstimatedEarnings <= 50).count()

583



#top 10 page views 

adsense_r4.map(lambda x: (x.fldDate, x.fldPageViews)).sortBy(lambda x:x[1],False).take(10)
[('2010-02-25', 61358),
 ('2011-07-03', 60066),
 ('2011-04-03', 59079),
 ('2011-07-04', 55221),
 ('2011-11-21', 50808),
 ('2011-07-08', 49545),
 ('2011-11-20', 49020),
 ('2011-11-13', 48613),
 ('2011-11-28', 48360),
 ('2011-11-22', 47939)]
 
 #top 10 clicks 
 adsense_r4.map(lambda x: (x.fldDate, x.fldClicks)).sortBy(lambda x:x[1],False).take(10)
 
 [('2011-11-26', 1044),
 ('2011-11-25', 1026),
 ('2011-11-28', 996),
 ('2011-11-29', 968),
 ('2011-11-18', 924),
 ('2011-11-16', 920),
 ('2011-11-17', 917),
 ('2011-11-12', 912),
 ('2011-08-06', 878),
 ('2011-11-13', 873)]
 
 # no of clicks >= 800 - Descending order
 adsense_r4.filter(lambda x: x.fldClicks >= 800).map(lambda x: (x.fldDate, x.fldClicks)).sortBy(lambda x: x[1],False).collect()
 
[('2011-11-26', 1044),
 ('2011-11-25', 1026),
 ('2011-11-28', 996),
 ('2011-11-29', 968),
 ('2011-11-18', 924),
 ('2011-11-16', 920),
 ('2011-11-17', 917),
 ('2011-11-12', 912),
 ('2011-08-06', 878),
 ('2011-11-13', 873),
 ('2011-11-30', 849),
 ('2011-07-23', 845),
 ('2011-12-02', 842),
 ('2011-11-15', 829),
 ('2011-11-27', 823),
 ('2011-07-09', 812),
 ('2011-07-08', 806),
 ('2011-07-04', 802),
 ('2011-11-19', 801),
 ('2011-11-11', 800)]
 
 #no of clicks >= 800 - Ascending Order
 
 adsense_r4.filter(lambda x: x.fldClicks >= 800).map(lambda x: (x.fldDate, x.fldClicks)).sortBy(lambda x: x[1],True).collect()
 
 [('2011-11-11', 800),
 ('2011-11-19', 801),
 ('2011-07-04', 802),
 ('2011-07-08', 806),
 ('2011-07-09', 812),
 ('2011-11-27', 823),
 ('2011-11-15', 829),
 ('2011-12-02', 842),
 ('2011-07-23', 845),
 ('2011-11-30', 849),
 ('2011-11-13', 873),
 ('2011-08-06', 878),
 ('2011-11-12', 912),
 ('2011-11-17', 917),
 ('2011-11-16', 920),
 ('2011-11-18', 924),
 ('2011-11-29', 968),
 ('2011-11-28', 996),
 ('2011-11-25', 1026),
 ('2011-11-26', 1044)]
 
 #How many days we got clicks >= 800
 adsense_r4.filter(lambda x: x.fldClicks >= 800).count()
 
 20
 
 #Total number of clicks where individual day's click >= 800
 adsense_r4.filter(lambda x: x.fldClicks >= 800).map(lambda x: x.fldClicks).reduce(lambda a,b:a+b)
 17667

#Total number of clicks all the day 
adsense_r4.map(lambda x: x.fldClicks).reduce(lambda m,n: m+n)

248190



#Total number of page views all the day 
adsense_r4.map(lambda x: x.fldPageViews).reduce(lambda m,n: m+n)

18626239

#Total days when page views >= 50000
adsense_r4.filter(lambda x: x.fldPageViews >= 50000).count()
5

#Total days when page views >= 10000
adsense_r4.filter(lambda x: x.fldPageViews >= 10000).count()

649

Wednesday, 27 May 2020

Repartition vs Coalesce

Repartition vs coalesce

Repartition - shuffling infolved, less performance
    - increase or decrease the number of partitions allowed

coalesce - increasing the partition is not allowed
- we can only decrease the no of partition

e1 = spark.sparkContext.textFile("E:\\DataSets\\olympix_data.csv")
print(e1.getNumPartitions()) #2

e2 = e1.repartition(5)
print(e2.getNumPartitions()) #5

e3 = e1.repartition(2)
print(e3.getNumPartitions()) #2

c1 = e1.coalesce(2)
print(c1.getNumPartitions()) #2

c2 = e1.coalesce(10)
print(c2.getNumPartitions()) #2
 

2
5
2
2
2


e1 = spark.sparkContext.textFile("E:\\DataSets\\olympix_data.csv")
football = e1.repartition(4).filter(lambda x: 'Football' in x)
wrestling = e1.repartition(5).filter(lambda x: 'Wrestling' in x)
weightlifting = e1.repartition(6).filter(lambda x: 'Weightlifting' in x)

print(e1.getNumPartitions()) #2
print(football.getNumPartitions()) #4
print(wrestling.getNumPartitions()) #5
print(weightlifting.getNumPartitions()) #6

2
4
5
6

#Repartition with Union
unionResult = football.union(wrestling).union(weightlifting)
print(unionResult.getNumPartitions()) #15

#Repartition with Intersection
intersectionResult = football.intersection(wrestling).intersection(weightlifting)
print(intersectionResult.getNumPartitions()) #15


e1 = spark.sparkContext.textFile("E:\\DataSets\\olympix_data.csv")
football = e1.repartition(4).filter(lambda x: 'Football' in x)
wrestling = e1.repartition(5).filter(lambda x: 'Wrestling' in x)
weightlifting = e1.repartition(6).filter(lambda x: 'Weightlifting' in x)

subt = e1.subtract(football)
print(subt.getNumPartitions()) #6

cart = football.cartesian(wrestling)
print(cart.getNumPartitions()) #20





mapValues vs flatMapvalues in Pyspark

d1 = spark.sparkContext.parallelize( [('a',(20,30,40,50)), ('b',(1,2,3)) ])
print(d1.count())  #2

print(d1.flatMapValues(lambda x:x).collect()) 
[('a', 20), ('a', 30), ('a', 40), ('a', 50), ('b', 1), ('b', 2), ('b', 3)]


print(d1.map(lambda x:( x[0], len(x[1]))).collect())
[('a', 4), ('b', 3)]


print(d1.mapValues(lambda x:len(x)).collect())
[('a', 4), ('b', 3)]


keyBy Example:

d1 = spark.sparkContext.parallelize([(101,'Vijay','BTech'),(102,'Balaji','Bsc'), (103,'Arun')] )
print(d1.collect())

#[(101, 'Vijay', 'BTech'), (102, 'Balaji', 'Bsc'), (103, 'Arun')]

d2 = d1.keyBy(lambda x:x[0])
print(d2.collect())

#[(101, (101, 'Vijay', 'BTech')), (102, (102, 'Balaji', 'Bsc')), (103, (103, 'Arun'))]


Join operation in RDD - Pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()

r1 = spark.sparkContext.parallelize([ ('a',10),('b',5),('c',15),('d',12),('a',10),('b',30)])
r2 = spark.sparkContext.parallelize([ ('a',50),('b',15),('c',10),('d',15),('e',12),('c',10),('a',30)])

re1 = r1.join(r2)
print(re1.collect())

[('a', (10, 50)), ('a', (10, 30)), ('a', (10, 50)), ('a', (10, 30)), ('b', (5, 15)), ('b', (30, 15)), ('c', (15, 10)), ('c', (15, 10)), ('d', (12, 15))] 







from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()

r1 = spark.sparkContext.parallelize([ ('a',1),('b',5),('c',4)])
r2 = spark.sparkContext.parallelize([ ('a',2),('b',7),('c',7)])

re1 = r1.join(r2)
print(re1.collect())

[('a', (1, 2)), ('b', (5, 7)), ('c', (4, 7))]







from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()

r1 = spark.sparkContext.parallelize([ ('a',1),('a','6'),('b',5),('b',8),('c',4),('c',9)])
r2 = spark.sparkContext.parallelize([ ('a',2),('b',7),('c',7)])

re1 = r1.join(r2)
print(re1.collect())

[('a', (1, 2)), ('a', ('6', 2)), ('b', (5, 7)), ('b', (8, 7)), ('c', (4, 7)), ('c', (9, 7))]






from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()

r1 = spark.sparkContext.parallelize([ ('a',1),('a',2),('a',3),('b',5),('b',8),('b',9),('c',4),('c',9),('c',3)])
r2 = spark.sparkContext.parallelize([ ('a',2),('b',7),('c',7)])

re1 = r1.join(r2)
print(re1.collect())


[('a', (1, 2)), ('a', (2, 2)), ('a', (3, 2)), 
('b', (5, 7)), ('b', (8, 7)), ('b', (9, 7)), 
('c', (4, 7)), ('c', (9, 7)), ('c', (3, 7))]

Aggregation operations with Pair RDDs in Pyspark

Pair RDD Aggregations:

myList=[('a',10),('b',20),('c',15),('d',25),('a',30),('b',26),('c',10),('a',10),('d',10)]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())

def f1(x) : yield list(x)
    
r2 = r1.mapPartitions(lambda x:f1(x)) 
print(r2.collect())

[('a', 10), ('b', 20), ('c', 15), ('d', 25), ('a', 30), ('b', 26), ('c', 10), ('a', 10), ('d', 10)]
[[('a', 10), ('b', 20), ('c', 15)], [('d', 25), ('a', 30), ('b', 26)], [('c', 10), ('a', 10), ('d', 10)]]


re1 = r1.countByKey()
print(re1)


defaultdict(<class 'int'>, {'a': 3, 'b': 2, 'c': 2, 'd': 2})


re2 = r1.countByValue()
print(re2)

defaultdict(<class 'int'>, {('a', 10): 2, ('b', 20): 1, ('c', 15): 1, ('d', 25): 1, ('a', 30): 1, ('b', 26): 1, ('c', 10): 1, ('d', 10): 1})


re3 = r1.sortByKey()
print(re3.collect())

[('a', 10), ('a', 30), ('a', 10), ('b', 20), ('b', 26), ('c', 15), ('c', 10), ('d', 25), ('d', 10)]


re4  = r1.reduceByKey(lambda x,y : x+y)
print(re4.collect())

[('d', 35), ('b', 46), ('a', 50), ('c', 25)]




myList = [1,2,3,4,5,6,7,8]
r = spark.sparkContext.parallelize(myList,3)
re1 = r.count()
print(re1)

8


myList=[('a',10),('b',20),('c',15),('d',25),('a',30),('b',26),('c',10),('a',10),('d',10)]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())
re4  = r1.reduceByKey(lambda x,y : x+y)
print(re4.sortByKey().collect())

[('a', 10), ('b', 20), ('c', 15), ('d', 25), ('a', 30), ('b', 26), ('c', 10), ('a', 10), ('d', 10)]
[('a', 50), ('b', 46), ('c', 25), ('d', 35)]


myList=[('a',10),('b',20),('c',15),('d',25),('a',30),('b',26),('c',10),('a',10),('d',10)]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())

def f1(x) : yield list(x)
    
r2 = r1.mapPartitions(lambda x:f1(x)) 
print(r2.collect())

re1 = r1.reduceByKey(lambda x,y : x+y)
print(re1.collect())

re2 = re1.sortByKey().collect()
print(re2)

[('a', 10), ('b', 20), ('c', 15), ('d', 25), ('a', 30), ('b', 26), ('c', 10), ('a', 10), ('d', 10)]
[[('a', 10), ('b', 20), ('c', 15)], [('d', 25), ('a', 30), ('b', 26)], [('c', 10), ('a', 10), ('d', 10)]]
[('d', 35), ('b', 46), ('a', 50), ('c', 25)]
[('a', 50), ('b', 46), ('c', 25), ('d', 35)]


re3 = r1.foldByKey(2,lambda x,y : x+y)
print(re3.sortByKey().collect())

[('a', 56), ('b', 50), ('c', 29), ('d', 39)]


Aggregate operations with RDDs in Pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("aggregations").getOrCreate()

myList = [1,2,3,4,5,6,7,8,9,10,11]
r1 = spark.sparkContext.parallelize(myList,3)

print(r1.getNumPartitions())
#answer : 3

def f1(x): yield list(x)

def f2(x) : yield sum(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())
#answer : [[1, 2, 3], [4, 5, 6], [7, 8, 9, 10, 11]]


r3 = r1.mapPartitions(lambda x: f2(x))
print(r3.collect())
#answer : [6, 15, 45]


r4 = r1.coalesce(1)
print(r4.getNumPartitions())
#answer : 1

r4.reduce(lambda x,y : x+y)
#answer : 66

r1.reduce(lambda a,b: a-b)
#answer : 34




#find Min and Max of a given list

def findMax(x,y):
if x > y:
return x
else:
return y
        
def findMin(x,y):
if x < y:
return x
else:
return x
myList = [1,2,3,4,5,6,7,8,9,10,11]
r1 = spark.sparkContext.parallelize(myList)
a3 = r1.reduce(lambda x,y : findMax(x,y))
print(a3)
#answer : 11

a4 = r1.reduce(lambda x,y : findMin(x,y))
print(a4)
#answer : 1


Fold example:

myList = [5,10]
r1 = spark.sparkContext.parallelize(myList)

a1 = r1.reduce(lambda x,y : x+y)
print(a1)

a3 = r1.fold(5,lambda x,y : x+y) # (5+5) + (5+10+10) + 5
print(a3)

a4 = r1.fold(0,lambda x,y : x+y) #(0+5) + (0+5+10) + 0
print(a4)


5+5 => 10+10 => 20+5



myList = ["Arun","Vijay","Kalai","Mani","Nila","Raji","Waran","Bill"]
r1 = spark.sparkContext.parallelize(myList,3)

def f1(x): yield list(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())

#answer: [['Arun', 'Vijay'], ['Kalai', 'Mani'], ['Nila', 'Raji', 'Waran', 'Bill']]



myList = [20000,12000,30000,25000,42000,10000]
r1 = spark.sparkContext.parallelize(myList,2)

def f1(x): yield list(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())

#answer : [[20000, 12000, 30000], [25000, 42000, 10000]]

def findMin(x,y):
if x <= y:
return x
else:
return y
rd3 = r1.fold(5000,lambda x,y:findMin(x,y))
print(rd3)

[[20000, 12000, 30000], [25000, 42000, 10000]]
5000

myList = [20000,12000,30000,25000,42000,10000]
r1 = spark.sparkContext.parallelize(myList,2)

def f1(x): yield list(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())

#answer : [[2000, 1000, 3000], [2500, 4200, 1000]]

def findMin(x,y):
if x <= y:
return x
else:
return y
rd3 = r1.fold(42005,lambda x,y:findMin(x,y))
print(rd3)

[[20000, 12000, 30000], [25000, 42000, 10000]]
10000


myList = [1,2,3,4,5,6,7,8]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())
def f1(x): yield list(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())

[1, 2, 3, 4, 5, 6, 7, 8]
[[1, 2], [3, 4], [5, 6], [7, 8]]


c1 = r1.aggregate(2,(lambda x,y:x+y), (lambda x,y:x-y))
print(c1)

-42





myList = [1,2,3,4,5,6,7,8]
r1 = spark.sparkContext.parallelize(myList,3)
print(r1.collect())
def f1(x): yield list(x)

r2 = r1.mapPartitions(lambda x: f1(x))
print(r2.collect())
[1, 2, 3, 4, 5, 6, 7, 8]
[[1, 2], [3, 4], [5, 6, 7, 8]]

c1 = r1.aggregate(2,(lambda x,y:x+y), (lambda x,y:x-y))
print(c1)

-40



reduce(f1)
f1 will be applied on partitions and results of partitions
fold(zv,f1) 
f1 will be applied on partitions and results of partitions with zv
aggregate(zv,f1,f2)
f1 will be applied on partitions with zv and
f2 will be applied on the results of partitions with zv


Tuesday, 26 May 2020

groupBy Example in Pyspark


from pyspark.sql.functions import count
df = spark.read.format("csv").option("header",True).\
            option("inferSchema",True).\
            load("E:\\vow\\CancerData10.csv")

#df.printSchema()
#df.show(5)

df1 = df.select("ID","Age","State","Sex")

df2 = df1.groupBy("Sex").agg(count('Sex').alias("Count"))

df2.show()

+---+-----+
|Sex|Count|
+---+-----+
|  F|    4|
|  M|    5|
+---+-----+

Call Log data program using Pyspark

from pyspark.sql import SparkSession
from pyspark import StorageLevel

spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()
r1 = spark.sparkContext.textFile("E:\\vow\\calllogdata.txt")
#r2 = r1.map(lambda x:x.encode('utf-8'))
r1.persist(StorageLevel.MEMORY_ONLY)
r3 = r1.filter(lambda x: 'SUCCESS' in x)
print(r3.count())
r4 = r1.filter(lambda x: 'FAILED' in x)
print(r4.count())

21
6

Word Count program using Pyspark

#Word count program using Pyspark
r1 = spark.sparkContext.textFile("E:\\vow\\email.txt")
r2 = r1.map(lambda x:x.encode('utf-8'))
r2 = r1.flatMap(lambda x:x.split(" ")).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
for j in r2.collect():
    print(j)
r2.coalesce(1).saveAsTextFile("E:\\vow\\emailwordcount.txt")

('acebook', 1)
('Session', 1)
('Developers', 2)
('', 10)
('The', 1)
('world', 1)
('is', 3)
..... 

Most Viewed, Most Rated Movies Analysis using Pyspark

Movie Data Analysis using Pyspark

movies.dat:
E:\\vow\\ml-1m\\movies.dat
MovieID,Title,Genres
1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
ratings.dat:
E:\\vow\\ml-1m\\ratings.dat
UserID,MovieID,Rating,Timestamp
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
users.dat:
E:\\vow\\ml-1m\\users.dat
UserID,Gender,Age,Occupation,ZipCode
1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
Linux location:
hdfs dfs -cat hdfs://localhost:9000/SparkFiles/movies.dat
hdfs dfs -cat hdfs://localhost:9000/SparkFiles/ratings.dat
hdfs dfs -cat hdfs://localhost:9000/SparkFiles/users.dat
Requirement:
Top 10 most viewed movies with their movie names (Z-A, A-Z)
Top 20 rated movies (The movie should be rated / viewed by at least 40 users)
How the Genres ranked by Average rating for each profession,  and age group

import collections
from datetime import datetime

Movie = collections.namedtuple('Movie',['MovieID','Title','Genres']) 

def parseMovie(_row):
fields = _row.split("::")
movieid = (int) (fields[0])
title = fields[1]
genres = fields[2]
M = Movie(movieid,title,genres)
return M

#print(parseMovie("1::Toy Story (1995)::Animation|Children's|Comedy"))
movies_r1 = spark.sparkContext.textFile("hdfs://localhost:9000/SparkFiles/movies.dat")
movies_r2 = movies_r1.map(lambda x:(str)(x)).map(parseMovie)
#movies_r2.take(2)

#[Movie(MovieID=1, Title='Toy Story (1995)', Genres="Animation|Children's|Comedy"),
#Movie(MovieID=2, Title='Jumanji (1995)', Genres="Adventure|Children's|Fantasy")]
 
 


Rating = collections.namedtuple('Rating',['UserID','MovieID','Rating','Timestamp']) 

def parseRatingRecord(_row):
fields = _row.split("::")
userid = (int) (fields[0])
movieid = (int) (fields[1])
rating = (int) (fields[2])
_timestamp = datetime.fromtimestamp((int) (fields[3]))
_rating = Rating(userid,movieid,rating,_timestamp)
return _rating

#print(parseRatingRecord("1::1193::5::978300760"))

ratings_r1 = spark.sparkContext.textFile("hdfs://localhost:9000/SparkFiles/ratings.dat")
ratings_r2 = ratings_r1.map(lambda x:(str)(x)).map(parseRatingRecord)
#ratings_r2.take(2)


User = collections.namedtuple("Users",["UserID","Gender","Age","Occupation","ZipCode"])

def parseUserRecord(_row):
fields = _row.split("::")
userid = (int)(fields[0])
gender= fields[1]
age = (int) (fields[2])
occupation = (int) (fields[3])
zipcode = (int) (fields[4])
_user = User(userid,gender,age,occupation,zipcode) 
return _user

#print(parseUserRecord("3::M::25::15::55117"))            

users_r1 = spark.sparkContext.textFile("hdfs://localhost:9000/SparkFiles/users.dat")
users_r2 = users_r1.map(lambda x:(str)(x)).map(parseUserRecord)
#users_r2.take(2)

#[Users(UserID=1, Gender='F', Age=1, Occupation=10, ZipCode=48067),
# Users(UserID=2, Gender='M', Age=56, Occupation=16, ZipCode=70072)]
 
 
dfMovies  = movies_r2.toDF()
dfRatings = ratings_r2.toDF()
dfUsers = users_r2.toDF()
 
dfMovies.printSchema()
dfRatings.printSchema()
dfUsers.printSchema()

dfMovies.printSchema()
root
 |-- MovieID: long (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genres: string (nullable = true)
 
 
dfRatings.printSchema()
root
 |-- UserID: long (nullable = true)
 |-- MovieID: long (nullable = true)
 |-- Rating: long (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 
 
dfUsers.printSchema()
root
 |-- UserID: long (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Occupation: long (nullable = true)
 |-- ZipCode: long (nullable = true)
 
dfMovies.show(5)
+-------+--------------------+--------------------+
|MovieID|               Title|              Genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Animation|Childre...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|        Comedy|Drama|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+

dfRatings.show(5)
+------+-------+------+-------------------+
|UserID|MovieID|Rating|          Timestamp|
+------+-------+------+-------------------+
|     1|   1193|     5|2001-01-01 03:42:40|
|     1|    661|     3|2001-01-01 04:05:09|
|     1|    914|     3|2001-01-01 04:02:48|
|     1|   3408|     4|2001-01-01 03:34:35|
|     1|   2355|     5|2001-01-07 05:08:11|
+------+-------+------+-------------------+

dfUsers.show(5)
+------+------+---+----------+-------+
|UserID|Gender|Age|Occupation|ZipCode|
+------+------+---+----------+-------+
|     1|     F|  1|        10|  48067|
|     2|     M| 56|        16|  70072|
|     3|     M| 25|        15|  55117|
|     4|     M| 45|         7|   2460|
|     5|     M| 25|        20|  55455|
+------+------+---+----------+-------+



#Joining Movies and Ratings tables

from pyspark.sql.functions import col

dfMovies_Ratings = dfMovies.join(dfRatings,dfMovies["MovieID"] == dfRatings["MovieID"],"inner")
dfMovies_Ratings.printSchema()
dfMovies_Ratings.show(5)

root
 |-- MovieID: long (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- UserID: long (nullable = true)
 |-- MovieID: long (nullable = true)
 |-- Rating: long (nullable = true)
 |-- Timestamp: timestamp (nullable = true)

+-------+--------------+------+------+-------+------+-------------------+
|MovieID|         Title|Genres|UserID|MovieID|Rating|          Timestamp|
+-------+--------------+------+------+-------+------+-------------------+
|     26|Othello (1995)| Drama|    18|     26|     4|2000-12-30 11:52:15|
|     26|Othello (1995)| Drama|    69|     26|     4|2000-12-27 07:24:10|
|     26|Othello (1995)| Drama|   229|     26|     4|2002-12-10 12:29:33|
|     26|Othello (1995)| Drama|   342|     26|     4|2000-12-09 10:41:17|
|     26|Othello (1995)| Drama|   524|     26|     3|2000-12-07 11:33:32|
+-------+--------------+------+------+-------+------+-------------------+
 
 
#Solution : 1
from pyspark.sql.functions import col,count,desc
dfMovies_Ratings2 = dfMovies_Ratings.select("Title","UserID").groupBy("Title").agg(count("UserID").alias("Views")).orderBy(desc("Views")).limit(20)
dfMovies_Ratings2.show()

+--------------------+-----+
|               Title|Views|
+--------------------+-----+
|American Beauty (...| 3428|
|Star Wars: Episod...| 2991|
|Star Wars: Episod...| 2990|
|Star Wars: Episod...| 2883|
|Jurassic Park (1993)| 2672|
|Saving Private Ry...| 2653|
|Terminator 2: Jud...| 2649|
|  Matrix, The (1999)| 2590|
|Back to the Futur...| 2583|
|Silence of the La...| 2578|
| Men in Black (1997)| 2538|
|Raiders of the Lo...| 2514|
|        Fargo (1996)| 2513|
|Sixth Sense, The ...| 2459|
|   Braveheart (1995)| 2443|
|Shakespeare in Lo...| 2369|
|Princess Bride, T...| 2318|
|Schindler's List ...| 2304|
|L.A. Confidential...| 2288|
|Groundhog Day (1993)| 2278|
+--------------------+-----+


dfMovies_Ratings2.printSchema()
root
 |-- Title: string (nullable = true)
 |-- Views: long (nullable = false)


#SparkSQL approach

dfMovies_Ratings.createOrReplaceTempView("MovieRatingsView")
dfdfMovies_Ratings2 =spark.sql("select Title,count(UserID) as Views from MovieRatingsView group by Title order by Views desc limit 20")
dfMovies_Ratings2.show()

dfMovies_Ratings2.show()

+--------------------+-----+
|               Title|Views|
+--------------------+-----+
|American Beauty (...| 3428|
|Star Wars: Episod...| 2991|
|Star Wars: Episod...| 2990|
|Star Wars: Episod...| 2883|
|Jurassic Park (1993)| 2672|
|Saving Private Ry...| 2653|
|Terminator 2: Jud...| 2649|
|  Matrix, The (1999)| 2590|
|Back to the Futur...| 2583|
|Silence of the La...| 2578|
| Men in Black (1997)| 2538|
|Raiders of the Lo...| 2514|
|        Fargo (1996)| 2513|
|Sixth Sense, The ...| 2459|
|   Braveheart (1995)| 2443|
|Shakespeare in Lo...| 2369|
|Princess Bride, T...| 2318|
|Schindler's List ...| 2304|
|L.A. Confidential...| 2288|
|Groundhog Day (1993)| 2278|
+--------------------+-----+

#Write the output into hdfs
dfMovies_Ratings2.coalesce(1).write.format("csv").option("header",True).save("hdfs://localhost:9000/SparkFiles/dfMoviesRatings")

hadoop@hadoop:~$ hdfs dfs -cat hdfs://localhost:9000/SparkFiles/dfMoviesRatings/part-00000-3035084e-127a-4227-93f0-1acfc66b97ec-c000.csv

American Beauty (1999),3428
Star Wars: Episode IV - A New Hope (1977),2991
Star Wars: Episode V - The Empire Strikes Back (1980),2990
Star Wars: Episode VI - Return of the Jedi (1983),2883
Jurassic Park (1993),2672
Saving Private Ryan (1998),2653
Terminator 2: Judgment Day (1991),2649
"Matrix, The (1999)",2590
Back to the Future (1985),2583
"Silence of the Lambs, The (1991)",2578
Men in Black (1997),2538
Raiders of the Lost Ark (1981),2514
Fargo (1996),2513
"Sixth Sense, The (1999)",2459
Braveheart (1995),2443
Shakespeare in Love (1998),2369
"Princess Bride, The (1987)",2318
Schindler's List (1993),2304
L.A. Confidential (1997),2288
Groundhog Day (1993),2278

#Solution : 2
#Top 10 Rated Movies
from pyspark.sql.functions import avg

result2 = dfMovies_Ratings.select("Title","Rating","UserID").groupBy("Title").agg(count("UserID").alias("Views"),avg("Rating").alias("AvgRating"))
result3 = result2.filter("Views >= 40").orderBy(desc("AvgRating"))
result3.show(10)

+--------------------+-----+-----------------+
|               Title|Views|        AvgRating|
+--------------------+-----+-----------------+
|      Sanjuro (1962)|   69|4.608695652173913|
|Seven Samurai (Th...|  628|4.560509554140127|
|Shawshank Redempt...| 2227|4.554557700942973|
|Godfather, The (1...| 2223|4.524966261808367|
|Close Shave, A (1...|  657| 4.52054794520548|
|Usual Suspects, T...| 1783|4.517106001121705|
|Schindler's List ...| 2304|4.510416666666667|
|Wrong Trousers, T...|  882|4.507936507936508|
|Sunset Blvd. (a.k...|  470|4.491489361702127|
|Raiders of the Lo...| 2514|4.477724741447892|
+--------------------+-----+-----------------+


from pyspark.sql.functions import avg

result2 = dfMovies_Ratings.select("Title","Rating","UserID").groupBy("Title").agg(count("UserID").alias("Views"),avg("Rating").alias("AvgRating"))
result3 = result2.filter("Views >= 40").orderBy(desc("AvgRating"))
#result3.show(10)

#write the output into hdfs
result3.coalesce(1).write.format("csv").option("header",True).save("hdfs://localhost:9000/SparkFiles/top10Movies/")


hadoop@hadoop:~$ hdfs dfs -cat hdfs://localhost:9000/SparkFiles/dfMoviesRatings/part-00000-3035084e-127a-4227-93f0-1acfc66b97ec-c000.csv
Title,Views
American Beauty (1999),3428
Star Wars: Episode IV - A New Hope (1977),2991
Star Wars: Episode V - The Empire Strikes Back (1980),2990
Star Wars: Episode VI - Return of the Jedi (1983),2883
Jurassic Park (1993),2672
Saving Private Ryan (1998),2653
Terminator 2: Judgment Day (1991),2649
"Matrix, The (1999)",2590


Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...