def extract(str):
pnos = re.search('\d{20}',str).group()
fromno = pnos[0:10]
tono = pnos[10:20]
#status = re.search('[A-Z]{6,7}',str).group()
status=re.search('SUCCESS|DROPPED|FAILED',str).group()
timestamps = re.findall('(\d{4}-\d{2}-\d{2}\s{1}\d{2}:\d{2}:\d{2})',str)
starttime = timestamps[0]
endtime=timestamps[1]
return (fromno,tono,status,starttime,endtime)
for logg in inputs:
print(extract(logg))
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()
data = spark.sparkContext.textFile("E:\\vow\\calllogdata.txt")
#data2 = data.map(lambda x:x.encode('utf-8'))
data.first()
'ec59cea2-5006-448f-a031-d5e53f33be232014-03-15 00:02:482014-03-15 00:06:05DROPPED 80526900577757919463'
data2 = data.map(lambda x:extract(x))
data2.take(1)
[('8052690057',
'7757919463',
'DROPPED',
'2014-03-15 00:02:48',
'2014-03-15 00:06:05')]
df = data2.toDF(["fromno","tono","status","starttime","endtime"])
df.printSchema()
root
|-- fromno: string (nullable = true)
|-- tono: string (nullable = true)
|-- status: string (nullable = true)
|-- starttime: string (nullable = true) #starttime should be timestamp
|-- endtime: string (nullable = true) #endtime should be timestamp
from pyspark.sql.functions import col, current_date
#changing the datatype into timestamp, create new column 'day' with current_date
dfCall = df.withColumn("starttime",col("starttime").cast("timestamp")).
withColumn("endtime",col("endtime").cast("timestamp")).
withColumn("day",current_date())
dfCall.printSchema()
#we have changed the datatype of starttime, endtime fields into timestamps
root
|-- fromno: string (nullable = true)
|-- tono: string (nullable = true)
|-- status: string (nullable = true)
|-- starttime: timestamp (nullable = true)
|-- endtime: timestamp (nullable = true)
|-- day: date (nullable = false)
dfCall.show(5)
+----------+----------+-------+-------------------+-------------------+----------+
| fromno| tono| status| starttime| endtime| day|
+----------+----------+-------+-------------------+-------------------+----------+
|8052690057|7757919463|DROPPED|2014-03-15 00:02:48|2014-03-15 00:06:05|2020-05-31|
|9886177375|9916790556|DROPPED|2014-03-15 00:02:48|2014-03-15 00:06:07|2020-05-31|
|8618627996|9886177375|SUCCESS|2014-03-16 00:02:48|2014-03-16 00:06:45|2020-05-31|
|9876515616|4894949494|DROPPED|2014-03-16 00:02:48|2014-03-16 00:06:53|2020-05-31|
|5454545454|6469496477| FAILED|2014-03-16 00:02:48|2014-03-16 00:06:12|2020-05-31|
+----------+----------+-------+-------------------+-------------------+----------+
dfCall.write.format("orc").mode("append").partionBy("day").save("e:\\datasets\callout")