Sunday, 16 August 2020

London Crime Analysis with DataFrames in Pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("LondonCrimes").getOrCreate()
data = spark.read.format("csv").option("header","true").load(r"D:\\Ex\\london_crime_by_lsoa.csv")

data.printSchema() 

root
 |-- lsoa_code: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- major_category: string (nullable = true)
 |-- minor_category: string (nullable = true)
 |-- value: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 
 
 data.show()
 
 +---------+--------------------+--------------------+--------------------+-----+----+-----+
|lsoa_code|             borough|      major_category|      minor_category|value|year|month|
+---------+--------------------+--------------------+--------------------+-----+----+-----+
|E01001116|             Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|E01001646|           Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|E01000677|             Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|E01003774|           Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|E01004563|          Wandsworth|             Robbery|   Personal Property|    0|2008|    6|
|E01001320|              Ealing|  Theft and Handling|         Other Theft|    0|2012|    5|
|E01001342|              Ealing|Violence Against ...|    Offensive Weapon|    0|2010|    7|
|E01002633|            Hounslow|             Robbery|   Personal Property|    0|2013|    4|
|E01003496|              Newham|     Criminal Damage|Criminal Damage T...|    0|2013|    9|
|E01004177|              Sutton|  Theft and Handling|Theft/Taking of P...|    1|2016|    8|
|E01001985|            Haringey|  Theft and Handling|Motor Vehicle Int...|    0|2013|   12|
|E01003076|             Lambeth|Violence Against ...|      Other violence|    0|2015|    4|
|E01003852|Richmond upon Thames|             Robbery|   Personal Property|    0|2014|    1|
|E01004547|          Wandsworth|Violence Against ...|    Offensive Weapon|    0|2011|   10|
|E01002398|          Hillingdon|  Theft and Handling|Theft/Taking Of M...|    0|2016|    2|
|E01002358|            Havering|Violence Against ...|        Wounding/GBH|    0|2012|    2|
|E01000086|Barking and Dagenham|  Theft and Handling|  Other Theft Person|    1|2009|    5|
|E01003708|           Redbridge|Violence Against ...|      Common Assault|    0|2009|    6|
|E01002945|Kingston upon Thames|  Theft and Handling|    Theft From Shops|    0|2016|   11|
|E01004195|              Sutton|               Drugs| Possession Of Drugs|    0|2009|   10|
+---------+--------------------+--------------------+--------------------+-----+----+-----+


data.count()
13490604 ==> 13.5 M rows

data.limit(5).show()
data.show(5)

+---------+----------+--------------------+--------------------+-----+----+-----+
|lsoa_code|   borough|      major_category|      minor_category|value|year|month|
+---------+----------+--------------------+--------------------+-----+----+-----+
|E01001116|   Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|E01001646| Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|E01000677|   Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|E01003774| Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|E01004563|Wandsworth|             Robbery|   Personal Property|    0|2008|    6|
+---------+----------+--------------------+--------------------+-----+----+-----+


data.limit(5).show(truncate=False)

+---------+----------+---------------------------+---------------------------+-----+----+-----+
|lsoa_code|borough   |major_category             |minor_category             |value|year|month|
+---------+----------+---------------------------+---------------------------+-----+----+-----+
|E01001116|Croydon   |Burglary                   |Burglary in Other Buildings|0    |2016|11   |
|E01001646|Greenwich |Violence Against the Person|Other violence             |0    |2016|11   |
|E01000677|Bromley   |Violence Against the Person|Other violence             |0    |2015|5    |
|E01003774|Redbridge |Burglary                   |Burglary in Other Buildings|0    |2016|3    |
|E01004563|Wandsworth|Robbery                    |Personal Property          |0    |2008|6    |
+---------+----------+---------------------------+---------------------------+-----+----+-----+


// Remove a column
data = data.drop('lsoa_code')
data.show(5,truncate=False)

+----------+---------------------------+---------------------------+-----+----+-----+
|borough   |major_category             |minor_category             |value|year|month|
+----------+---------------------------+---------------------------+-----+----+-----+
|Croydon   |Burglary                   |Burglary in Other Buildings|0    |2016|11   |
|Greenwich |Violence Against the Person|Other violence             |0    |2016|11   |
|Bromley   |Violence Against the Person|Other violence             |0    |2015|5    |
|Redbridge |Burglary                   |Burglary in Other Buildings|0    |2016|3    |
|Wandsworth|Robbery                    |Personal Property          |0    |2008|6    |
+----------+---------------------------+---------------------------+-----+----+-----+
only showing top 5 rows


// Unique boroughs
total_borough = data.select('borough').distinct()
total_borough.show(truncate=False)


total_borough.count()
33

// Display all unique boroughs
total_borough.show(total_borough.count(),truncate=False)

+----------------------+
|borough               |
+----------------------+
|Croydon               |
|Wandsworth            |
|Bexley                |
|Lambeth               |
|Barking and Dagenham  |
|Camden                |
|Greenwich             |
|Newham                |
|Tower Hamlets         |
|Hounslow              |
|Barnet                |
|Harrow                |
|Kensington and Chelsea|
|Islington             |
|Brent                 |
|Haringey              |
|Bromley               |
|Merton                |
|Westminster           |
|Hackney               |
|Southwark             |
|Enfield               |
|Ealing                |
|Sutton                |
|Hammersmith and Fulham|
|Kingston upon Thames  |
|Havering              |
|Hillingdon            |
|Waltham Forest        |
|Richmond upon Thames  |
|Redbridge             |
|City of London        |
|Lewisham              |
+----------------------+


// Filter only Havering rows
import pyspark.sql.functions as F
havering_data = data.filter(F.col("borough") == 'Havering' )
havering_data.show(5)


+--------+--------------------+--------------------+-----+----+-----+
| borough|      major_category|      minor_category|value|year|month|
+--------+--------------------+--------------------+-----+----+-----+
|Havering|Violence Against ...|        Wounding/GBH|    0|2012|    2|
|Havering|Violence Against ...|          Harassment|    0|2008|    1|
|Havering|    Fraud or Forgery|  Counted per Victim|    0|2015|   11|
|Havering|             Robbery|   Personal Property|    0|2009|    8|
|Havering|            Burglary|Burglary in a Dwe...|    1|2016|    8|
+--------+--------------------+--------------------+-----+----+-----+

// Filter either Havering or Hackney 
import pyspark.sql.functions as F
havering_or_hackney_data = data.filter( (F.col("borough") == 'Havering') | (F.col("borough") == 'Hackney') )
havering_or_hackney_data.show(5)

+--------+--------------------+--------------------+-----+----+-----+
| borough|      major_category|      minor_category|value|year|month|
+--------+--------------------+--------------------+-----+----+-----+
|Havering|Violence Against ...|        Wounding/GBH|    0|2012|    2|
| Hackney|     Criminal Damage|Criminal Damage T...|    0|2011|    6|
| Hackney|Violence Against ...|          Harassment|    1|2013|    2|
|Havering|Violence Against ...|          Harassment|    0|2008|    1|
|Havering|    Fraud or Forgery|  Counted per Victim|    0|2015|   11|
+--------+--------------------+--------------------+-----+----+-----+


// Display the records for 2015 and 2016 as Year
import pyspark.sql.functions as F
data_2k15_2k16 = data.filter( F.col("year").isin(["2015","2016"]))
data_2k15_2k16.show()


+--------------------+--------------------+--------------------+-----+----+-----+
|             borough|      major_category|      minor_category|value|year|month|
+--------------------+--------------------+--------------------+-----+----+-----+
|             Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|           Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|             Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|           Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|              Sutton|  Theft and Handling|Theft/Taking of P...|    1|2016|    8|
|             Lambeth|Violence Against ...|      Other violence|    0|2015|    4|
|          Hillingdon|  Theft and Handling|Theft/Taking Of M...|    0|2016|    2|
|Kingston upon Thames|  Theft and Handling|    Theft From Shops|    0|2016|   11|
|            Haringey|Violence Against ...|        Wounding/GBH|    0|2015|   12|
|            Lewisham|Violence Against ...|      Common Assault|    0|2016|    2|
|            Hounslow|     Criminal Damage|Criminal Damage T...|    0|2015|    2|
|             Bromley|     Criminal Damage|Criminal Damage T...|    1|2016|    4|
|            Haringey|     Criminal Damage|Criminal Damage T...|    0|2016|   12|
|           Southwark|               Drugs| Possession Of Drugs|    0|2015|    3|
|            Havering|    Fraud or Forgery|  Counted per Victim|    0|2015|   11|
|      Waltham Forest|Other Notifiable ...|      Going Equipped|    0|2015|    2|
|              Ealing|             Robbery|   Personal Property|    0|2015|    7|
|               Brent|  Theft and Handling|Motor Vehicle Int...|    0|2015|    9|
|            Hounslow|Violence Against ...|        Wounding/GBH|    2|2015|    8|
|           Southwark|  Theft and Handling|    Theft From Shops|    4|2016|    8|
+--------------------+--------------------+--------------------+-----+----+-----+



import pyspark.sql.functions as F
data_greater2k16 = data.filter( F.col("year") > 2015)
data_greater2k16.show(5)

+----------+--------------------+--------------------+-----+----+-----+
|   borough|      major_category|      minor_category|value|year|month|
+----------+--------------------+--------------------+-----+----+-----+
|   Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
| Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
| Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|    Sutton|  Theft and Handling|Theft/Taking of P...|    1|2016|    8|
|Hillingdon|  Theft and Handling|Theft/Taking Of M...|    0|2016|    2|
+----------+--------------------+--------------------+-----+----+-----+


// Grouping and counting of borough

import pyspark.sql.functions as F
borough_crime_count = data.groupBy("borough").agg(F.count("borough"))
borough_crime_count.show()
+--------------------+--------------+
|             borough|count(borough)|
+--------------------+--------------+
|             Croydon|        602100|
|          Wandsworth|        498636|
|              Bexley|        385668|
|             Lambeth|        519048|
|Barking and Dagenham|        311040|
|              Camden|        378432|
|           Greenwich|        421200|
|              Newham|        471420|
|       Tower Hamlets|        412128|
|            Hounslow|        395928|
|              Barnet|        572832|
|              Harrow|        365688|
|Kensington and Ch...|        296784|
|           Islington|        359208|
|               Brent|        490644|
|            Haringey|        413856|
|             Bromley|        523908|
|              Merton|        339876|
|         Westminster|        366660|
|             Hackney|        417744|
+--------------------+--------------+

// With Alias 
import pyspark.sql.functions as F
borough_crime_count = data.groupBy("borough").agg(F.count("borough").alias("Borough_Count"))

borough_crime_count.show()
+--------------------+-------------+
|             borough|Borough_Count|
+--------------------+-------------+
|             Croydon|       602100|
|          Wandsworth|       498636|
|              Bexley|       385668|
|             Lambeth|       519048|
|Barking and Dagenham|       311040|
|              Camden|       378432|
|           Greenwich|       421200|
|              Newham|       471420|
|       Tower Hamlets|       412128|
|            Hounslow|       395928|
|              Barnet|       572832|
|              Harrow|       365688|
|Kensington and Ch...|       296784|
|           Islington|       359208|
|               Brent|       490644|
|            Haringey|       413856|
|             Bromley|       523908|
|              Merton|       339876|
|         Westminster|       366660|
|             Hackney|       417744|
+--------------------+-------------+


borough_conviction_sum = data.groupBy("borough").agg({"value":"sum"})
borough_conviction_sum.show(5)

+--------------------+----------+
|             borough|sum(value)|
+--------------------+----------+
|             Croydon|  260294.0|
|          Wandsworth|  204741.0|
|              Bexley|  114136.0|
|             Lambeth|  292178.0|
|Barking and Dagenham|  149447.0|
+--------------------+----------+


// Renaming a column
borough_conviction_sum = data.groupBy("borough").agg({"value":"sum"}).withColumnRenamed("sum(value)","Convictions")
borough_conviction_sum.show(5)

+--------------------+-----------+
|             borough|Convictions|
+--------------------+-----------+
|             Croydon|   260294.0|
|          Wandsworth|   204741.0|
|              Bexley|   114136.0|
|             Lambeth|   292178.0|
|Barking and Dagenham|   149447.0|
+--------------------+-----------+


total_borough_convictions = borough_conviction_sum.agg({"convictions":"sum"})
total_borough_convictions.show()

+----------------+
|sum(convictions)|
+----------------+
|       6447758.0|
+----------------+


otal_borough_convictions = borough_conviction_sum.agg({"convictions":"sum"}).withColumnRenamed("sum(convictions)","TotalSum")
total_borough_convictions.show()

+---------+
| TotalSum|
+---------+
|6447758.0|
+---------+

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...