Sunday, 17 May 2020

NYPD Pyspark program

filePath = "file:///E:/DataSets/AAPL.csv"
apple = sc.textFile(filePath)
print(apple)

file:///E:/DataSets/AAPL.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

apple.take(1)

['Date,Open,High,Low,Close,Adj Close,Volume']


apple.take(5)

['Date,Open,High,Low,Close,Adj Close,Volume',
 '1980-12-12,0.513393,0.515625,0.513393,0.513393,0.405683,117258400',
 '1980-12-15,0.488839,0.488839,0.486607,0.486607,0.384517,43971200',
 '1980-12-16,0.453125,0.453125,0.450893,0.450893,0.356296,26432000',
 '1980-12-17,0.462054,0.464286,0.462054,0.462054,0.365115,21610400']
 
 
filePath = "file:///E:/DataSets/Swetha/nypd.csv"
 
data = sc.textFile(filePath)
data.take(10)

['OBJECTID,Identifier,Occurrence Date,Day of Week,Occurrence Month,Occurrence Day,Occurrence Year,Occurrence Hour,CompStat Month,CompStat Day,CompStat Year,Offense,Offense Classification,Sector,Precinct,Borough,Jurisdiction,XCoordinate,YCoordinate,Location 1',
 '1,f070032d,09/06/1940 07:30:00 PM,Friday,Sep,6,1940,19,9,7,2010,BURGLARY,FELONY,D,66,BROOKLYN,N.Y. POLICE DEPT,987478,166141,"(40.6227027620001, -73.9883732929999)"',
 '2,c6245d4d,12/14/1968 12:20:00 AM,Saturday,Dec,14,1968,0,12,14,2008,GRAND LARCENY,FELONY,G,28,MANHATTAN,N.Y. POLICE DEPT,996470,232106,"(40.8037530600001, -73.955861904)"',
 '3,716dbc6f,10/30/1970 03:30:00 PM,Friday,Oct,30,1970,15,10,31,2008,BURGLARY,FELONY,H,84,BROOKLYN,N.Y. POLICE DEPT,986508,190249,"(40.688874254, -73.9918594329999)"',
 '4,638cd7b7,07/18/1972 11:00:00 PM,Tuesday,Jul,18,1972,23,7,19,2012,GRAND LARCENY OF MOTOR VEHICLE,FELONY,F,73,BROOKLYN,N.Y. POLICE DEPT,1005876,182440,"(40.6674141890001, -73.9220463899999)"',
 '5,6e410287,05/21/1987 12:01:00 AM,Thursday,May,21,1987,0,5,28,2009,GRAND LARCENY,FELONY,K,75,BROOKLYN,N.Y. POLICE DEPT,1017958,182266,"(40.6668988440001, -73.878495425)"',
 '6,7eebfe3c,02/01/1990 09:00:00 AM,Thursday,Feb,1,1990,9,9,17,2014,GRAND LARCENY,FELONY,K,105,QUEENS,N.Y. POLICE DEPT,1058407,204788,"(40.7284698170001, -73.7324430589999)"',
 '7,da21f94f,11/13/1990 12:01:00 AM,Tuesday,Nov,13,1990,0,6,7,2007,GRAND LARCENY,FELONY,,73,BROOKLYN,N.Y. HOUSING POLICE,1010272,183760,"(40.671025464, -73.906195082)"',
 '8,87c99e8c,02/02/1992 04:00:00 PM,Sunday,Feb,2,1992,16,3,27,2012,GRAND LARCENY,FELONY,,101,QUEENS,N.Y. POLICE DEPT,1053678,159044,"(40.6029515910001, -73.749976261)"',
 '9,495f57e1,08/08/1994 06:00:00 PM,Monday,Aug,8,1994,18,7,31,2008,RAPE,FELONY,A,103,QUEENS,N.Y. POLICE DEPT,1041749,196938,"(40.707047475, -73.792611904)"']

Cleaning Data:
Filtering the header
Missing values
Anomalous data
header = data.first()
print(header)
OBJECTID,Identifier,Occurrence Date,Day of Week,Occurrence Month,Occurrence Day,Occurrence Year,Occurrence Hour,CompStat Month,CompStat Day,CompStat Year,Offense,Offense Classification,Sector,Precinct,Borough,Jurisdiction,XCoordinate,YCoordinate,Location 1
dataWithoutHeader = data.filter(lambda x: x != header)
dataWithoutHeader.take(1)
['1,f070032d,09/06/1940 07:30:00 PM,Friday,Sep,6,1940,19,9,7,2010,BURGLARY,FELONY,D,66,BROOKLYN,N.Y. POLICE DEPT,987478,166141,"(40.6227027620001, -73.9883732929999)"']
OBJECTID
Identifier
Occurrence Date
Day of Week
Occurrence Month
Occurrence Day
Occurrence Year
Occurrence Hour
CompStat Month
CompStat Day
CompStat Year
Offense
Offense Classification
Sector
Precinct
Borough
Jurisdiction
XCoordinate
YCoordinate
Location 1



import csv
from io import StringIO
fields = header.replace(" ","_").replace("/","_").split(",")

print(fields)

['OBJECTID', 'Identifier', 'Occurrence_Date', 'Day_of_Week', 'Occurrence_Month', 'Occurrence_Day', 'Occurrence_Year', 'Occurrence_Hour', 'CompStat_Month', 'CompStat_Day', 'CompStat_Year', 'Offense', 'Offense_Classification', 'Sector', 'Precinct', 'Borough', 'Jurisdiction', 'XCoordinate', 'YCoordinate', 'Location_1']


from collections import namedtuple
Crime = namedtuple("Crime",fields)


def parseData(row):
    reader = csv.reader(StringIO(row))
    row=next(reader)
    return Crime(*row)
crimes=dataWithoutHeader.map(parseData)

crimes.first()

Crime(OBJECTID='1', Identifier='f070032d', Occurrence_Date='09/06/1940 07:30:00 PM', Day_of_Week='Friday', Occurrence_Month='Sep', Occurrence_Day='6', Occurrence_Year='1940', Occurrence_Hour='19', CompStat_Month='9', CompStat_Day='7', CompStat_Year='2010', Offense='BURGLARY', Offense_Classification='FELONY', Sector='D', Precinct='66', Borough='BROOKLYN', Jurisdiction='N.Y. POLICE DEPT', XCoordinate='987478', YCoordinate='166141', Location_1='(40.6227027620001, -73.9883732929999)')


crimes.map(lambda x:x.Offense).countByValue()

defaultdict(int,
            {'BURGLARY': 191369,
             'GRAND LARCENY': 428993,
             'GRAND LARCENY OF MOTOR VEHICLE': 101963,
             'RAPE': 13779,
             'ROBBERY': 198744,
             'FELONY ASSAULT': 184042,
             'MURDER & NON-NEGL. MANSLAUGHTE': 4574,
             'NA': 1})  #will be filtering it later


crimes.map(lambda x:x.Occurrence_Year).countByValue()


defaultdict(int,
            {'1940': 1,
             '1968': 1,
             '1970': 2,
             '1972': 2,
             '1987': 6,
             '1990': 17,
             '1992': 12,
             '1994': 19,
             '1995': 27,
             '1996': 34,
             '1998': 74,
             '1999': 124,
             '2000': 282,
             '2001': 343,
             '2002': 368,
             '2003': 490,
             '2004': 692,
             '2005': 3272,
             '2006': 127887,
             '1910': 3,
             '1913': 4,
             '1945': 2,
             '1981': 1,
             '1985': 8,
             '1988': 6,
             '1991': 12,
             '1905': 2,
             '1971': 1,
             '1997': 40,
             '1914': 2,
             '1956': 1,
             '1989': 12,
             '1993': 23,
             '2015': 102657,
             '1954': 1,
             '1982': 5,
             '1950': 1,
             '1959': 1,
             '1966': 7,
             '1980': 5,
             '1912': 1,
             '1955': 1,
             '1920': 1,
             '1964': 1,
             '1978': 2,
             '1986': 2,
             '1915': 3,
             '1908': 1,
             '1911': 1,
             '1965': 2,
             '1946': 1,
             '1973': 5,
             '1975': 2,
             '': 244,
             '2007': 120554,
             '2008': 117375,
             '2009': 106018,
             '2010': 105643,
             '2011': 107206,
             '2012': 111798,
             '2013': 111286,
             '2014': 106849,
             '1976': 2,
             '1983': 1,
             '1977': 3,
             '1984': 4,
             '1960': 1,
             '1979': 6,
             '1974': 3,
             '1958': 1,
             '1969': 1})
 
244 records has year as '' : '': 244,
Sudden increase after 2005 - because of digitization after 2005
Missing data before 2005 - open data initiatives
filter those data
crimesFiltered = crimes.filter(lambda x: not (x.Offense=="NA"  or x.Occurrence_Year=='')) \
.filter(lambda x: int(x.Occurrence_Year) >= 2006)

crimesFiltered.map(lambda x:x.Occurrence_Year).countByValue()

defaultdict(int,
            {'2006': 127887,
             '2015': 102657,
             '2007': 120554,
             '2008': 117375,
             '2009': 106018,
             '2010': 105643,
             '2011': 107206,
             '2012': 111798,
             '2013': 111286,
             '2014': 106849})

def getCoords(loc):
    loc_lat = float(loc[1:loc.index(",")])
    loc_lon = float(loc[loc.index(",")+1:-1])
    return (loc_lat,loc_lon)
crimesFiltered.map(lambda x:getCoords(x.Location_1)) \
.reduce(lambda x,y :(min(x[0],y[0]),min(x[1],y[1])))

(40.112709974, -77.519206334)

crimesFiltered.map(lambda x:getCoords(x.Location_1)) \
.reduce(lambda x,y :(max(x[0],y[0]),max(x[1],y[1])))


(59.5805088160001, -73.700716685)

#not working
crimesFinal = crimesFiltered.filter(lambda x : getCoords(x.Location_1)[0] >= 40.477399 and \
                                               getCoords(x.Location_1)[0] <= 40.917577  and \
                                               getCoords(x.Location_1)[1] >= 74.25909 and \
                                               getCoords(x.Location_1)[1] <= 73.700009) 
   
import gmplot
gmap = gmplot.GoogleMapPlotter(37.428,-122.145,16)


b_lats = crimesFiltered.filter(lambda x: x.Offense=="BURGLARY" and x.Occurrence_Year=="2015") \
   .map(lambda x:getCoords(x.Location_1)[0])\
.collect()


b_lons = crimesFiltered.filter(lambda x: x.Offense=="BURGLARY" and x.Occurrence_Year=="2015") \
   .map(lambda x:getCoords(x.Location_1)[1])\
.collect()

gmap.scatter(b_lats,b_lons,"#DE1515",size=40,marker=False)
gmap.draw("mymap.html")

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...