Thursday, 21 May 2020

Experimenting with Dataframe in Pyspark

hdfs://localhost:9000/SparkFiles/calllogdata.txt

input data:
ec59cea2-5006-448f-a031-d5e53f33be23$2014-03-15$2014-03-15$DROPPED$8052690057$7757919463
ec59cea2-5006-448f-a032-d5e53f33be23$2014-03-15$2014-03-15$DROPPED$9886177375$9916790556
ec59cea2-5006-448f-a033-d5e53f33be23$2014-03-16$2014-03-16$SUCCESS$8618627996$9886177375
ec59cea2-5006-448f-a034-d5e53f33be23$2014-03-16$2014-03-16$DROPPED$9876515616$4894949494
ec59cea2-5006-448f-a035-d5e53f33be23$2014-03-16$2014-03-16$FAILED $5454545454$6469496477
ec59cea2-5006-448f-a036-d5e53f33be23$2014-03-16$2014-03-16$SUCCESS$1235467890$2153698431


df = spark.read.format("csv").option("delimiter","$").load("hdfs://localhost:9000/SparkFiles/calllogdata.txt")
df.printSchema()

#Auto generated schema
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)




df.show(5)

+--------------------+----------+------------------+--------+-------+----------+----------+
|                 _c0|       _c1|               _c2|     _c3|    _c4|       _c5|       _c6|
+--------------------+----------+------------------+--------+-------+----------+----------+
|ec59cea2-5006-448...|2014-03-15|00:02:482014-03-15|00:06:05|DROPPED|8052690057|7757919463|
|ec59cea2-5006-448...|2014-03-15|00:02:482014-03-15|00:06:07|DROPPED|9886177375|9916790556|
|ec59cea2-5006-448...|2014-03-16|00:02:482014-03-16|00:06:45|SUCCESS|8618627996|9886177375|
|ec59cea2-5006-448...|2014-03-16|00:02:482014-03-16|00:06:53|DROPPED|9876515616|4894949494|
|ec59cea2-5006-448...|2014-03-16|00:02:482014-03-16|00:06:12| FAILED|      null|5454545454|
+--------------------+----------+------------------+--------+-------+----------+----------+
only showing top 5 rows


#our own custom schema
from pyspark.sql.types import *

sch = StructType(
[
StructField("id",StringType()),
StructField("starttime",StringType()),
StructField("endtime",StringType()),
StructField("status",StringType()),
StructField("fromno",StringType()),
StructField("tono",StringType()),
]
)
#Applying schema here
df = spark.read.format("csv").option("delimiter"," ").schema(sch).load("hdfs://localhost:9000/SparkFiles/calllogdata.txt")
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- endtime: string (nullable = true)
 |-- status: string (nullable = true)
 |-- fromno: string (nullable = true)
 |-- tono: string (nullable = true)
 
 
 df.show(5)
 
 +--------------------+----------+----------+-------+----------+----------+
|                  id| starttime|   endtime| status|    fromno|      tono|
+--------------------+----------+----------+-------+----------+----------+
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|8052690057|7757919463|
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|9886177375|9916790556|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|SUCCESS|8618627996|9886177375|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|DROPPED|9876515616|4894949494|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|FAILED |5454545454|6469496477|
+--------------------+----------+----------+-------+----------+----------+
only showing top 5 rows

df.where("status='SUCCESS'").show(5) 
or
df.filter('status="SUCCESS"').show(5)

+--------------------+----------+----------+-------+----------+----------+
|                  id| starttime|   endtime| status|    fromno|      tono|
+--------------------+----------+----------+-------+----------+----------+
|ec59cea2-5006-448...|2014-03-16|2014-03-16|SUCCESS|8618627996|9886177375|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|SUCCESS|1235467890|2153698431|
|ec59cea2-5006-448...|2014-03-18|2014-03-18|SUCCESS|4545454545|7978978979|
|ec59cea2-5006-448...|2014-03-19|2014-03-19|SUCCESS|8464564560|5646064646|
|ec59cea2-5006-448...|2014-03-20|2014-03-20|SUCCESS|8789864098|9489089409|
+--------------------+----------+----------+-------+----------+----------+
only showing top 5 rows


from pyspark.sql.functions import *

df.withColumn("fromno",col("fromno").cast("long")).withColumn("fromnonew",col("fromno")/10).show(5)

from pyspark.sql.functions import *

#typecasting : df.withColumn("fromno",col("fromno").cast("long"))
#create new column using existing column data : withColumn("fromnonew",col("fromno")/10)

df.withColumn("fromno",col("fromno").cast("long")).withColumn("fromnonew",col("fromno")/10).show(5)

+--------------------+----------+----------+-------+----------+----------+-------------+
|                  id| starttime|   endtime| status|    fromno|      tono|    fromnonew|
+--------------------+----------+----------+-------+----------+----------+-------------+
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|8052690057|7757919463|8.052690057E8|
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|9886177375|9916790556|9.886177375E8|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|SUCCESS|8618627996|9886177375|8.618627996E8|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|DROPPED|9876515616|4894949494|9.876515616E8|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|FAILED |5454545454|6469496477|5.454545454E8|
+--------------------+----------+----------+-------+----------+----------+-------------+
only showing top 5 rows


#create new column - value will be based on condition
df1 = df.withColumn("severity",when(col("status")=="DROPPED","2").when(rtrim(col("status"))=="FAILED",1).otherwise('0'))
df1.show(5)

+--------------------+----------+----------+-------+----------+----------+--------+
|                  id| starttime|   endtime| status|    fromno|      tono|severity|
+--------------------+----------+----------+-------+----------+----------+--------+
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|8052690057|7757919463|       2|
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|9886177375|9916790556|       2|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|SUCCESS|8618627996|9886177375|       0|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|DROPPED|9876515616|4894949494|       2|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|FAILED |5454545454|6469496477|       1|
+--------------------+----------+----------+-------+----------+----------+--------+

#Create a column with constant as data
df2 = df1.withColumn("city",lit("Bangalore")).withColumn("pin",lit("560093"))
df2.show(5)

+--------------------+----------+----------+-------+----------+----------+--------+---------+------+
|                  id| starttime|   endtime| status|    fromno|      tono|severity|     city|   pin|
+--------------------+----------+----------+-------+----------+----------+--------+---------+------+
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|8052690057|7757919463|       2|Bangalore|560093|
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|9886177375|9916790556|       2|Bangalore|560093|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|SUCCESS|8618627996|9886177375|       0|Bangalore|560093|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|DROPPED|9876515616|4894949494|       2|Bangalore|560093|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|FAILED |5454545454|6469496477|       1|Bangalore|560093|

#Renaming existing column of a dataframe
df3 = df2.withColumnRenamed('fromno','dialingno').withColumnRenamed("tono","receiverno")
df3.show(5)

+--------------------+----------+----------+-------+----------+----------+--------+---------+------+
|                  id| starttime|   endtime| status| dialingno|receiverno|severity|     city|   pin|
+--------------------+----------+----------+-------+----------+----------+--------+---------+------+
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|8052690057|7757919463|       2|Bangalore|560093|
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|9886177375|9916790556|       2|Bangalore|560093|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|SUCCESS|8618627996|9886177375|       0|Bangalore|560093|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|DROPPED|9876515616|4894949494|       2|Bangalore|560093|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|FAILED |5454545454|6469496477|       1|Bangalore|560093|
+--------------------+----------+----------+-------+----------+----------+--------+---------+------+
only showing top 5 rows


#drop columns

df.drop("id","tono").show(5)

+----------+----------+-------+----------+
| starttime|   endtime| status|    fromno|
+----------+----------+-------+----------+
|2014-03-15|2014-03-15|DROPPED|8052690057|
|2014-03-15|2014-03-15|DROPPED|9886177375|
|2014-03-16|2014-03-16|SUCCESS|8618627996|
|2014-03-16|2014-03-16|DROPPED|9876515616|
|2014-03-16|2014-03-16|FAILED |5454545454|
+----------+----------+-------+----------+

#SparkSQL
df1.createOrReplaceTempView("mytable")
spark.sql("select * from mytable").show()

+--------------------+----------+----------+-------+----------+----------+--------+
|                  id| starttime|   endtime| status|    fromno|      tono|severity|
+--------------------+----------+----------+-------+----------+----------+--------+
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|8052690057|7757919463|       2|
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|9886177375|9916790556|       2|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|SUCCESS|8618627996|9886177375|       0|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|DROPPED|9876515616|4894949494|       2|
+--------------------+----------+----------+-------+----------+----------+--------+

spark.sql("show tables in default").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        |  mytable|       true|
+--------+---------+-----------+

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...