hdfs://localhost:9000/SparkFiles/calllogdata.txt
input data:
ec59cea2-5006-448f-a031-d5e53f33be23$2014-03-15$2014-03-15$DROPPED$8052690057$7757919463
ec59cea2-5006-448f-a032-d5e53f33be23$2014-03-15$2014-03-15$DROPPED$9886177375$9916790556
ec59cea2-5006-448f-a033-d5e53f33be23$2014-03-16$2014-03-16$SUCCESS$8618627996$9886177375
ec59cea2-5006-448f-a034-d5e53f33be23$2014-03-16$2014-03-16$DROPPED$9876515616$4894949494
ec59cea2-5006-448f-a035-d5e53f33be23$2014-03-16$2014-03-16$FAILED $5454545454$6469496477
ec59cea2-5006-448f-a036-d5e53f33be23$2014-03-16$2014-03-16$SUCCESS$1235467890$2153698431
df = spark.read.format("csv").option("delimiter","$").load("hdfs://localhost:9000/SparkFiles/calllogdata.txt")
df.printSchema()
#Auto generated schema
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
|-- _c4: string (nullable = true)
|-- _c5: string (nullable = true)
|-- _c6: string (nullable = true)
df.show(5)
+--------------------+----------+------------------+--------+-------+----------+----------+
| _c0| _c1| _c2| _c3| _c4| _c5| _c6|
+--------------------+----------+------------------+--------+-------+----------+----------+
|ec59cea2-5006-448...|2014-03-15|00:02:482014-03-15|00:06:05|DROPPED|8052690057|7757919463|
|ec59cea2-5006-448...|2014-03-15|00:02:482014-03-15|00:06:07|DROPPED|9886177375|9916790556|
|ec59cea2-5006-448...|2014-03-16|00:02:482014-03-16|00:06:45|SUCCESS|8618627996|9886177375|
|ec59cea2-5006-448...|2014-03-16|00:02:482014-03-16|00:06:53|DROPPED|9876515616|4894949494|
|ec59cea2-5006-448...|2014-03-16|00:02:482014-03-16|00:06:12| FAILED| null|5454545454|
+--------------------+----------+------------------+--------+-------+----------+----------+
only showing top 5 rows
#our own custom schema
from pyspark.sql.types import *
sch = StructType(
[
StructField("id",StringType()),
StructField("starttime",StringType()),
StructField("endtime",StringType()),
StructField("status",StringType()),
StructField("fromno",StringType()),
StructField("tono",StringType()),
]
)
#Applying schema here
df = spark.read.format("csv").option("delimiter"," ").schema(sch).load("hdfs://localhost:9000/SparkFiles/calllogdata.txt")
df.printSchema()
root
|-- id: string (nullable = true)
|-- starttime: string (nullable = true)
|-- endtime: string (nullable = true)
|-- status: string (nullable = true)
|-- fromno: string (nullable = true)
|-- tono: string (nullable = true)
df.show(5)
+--------------------+----------+----------+-------+----------+----------+
| id| starttime| endtime| status| fromno| tono|
+--------------------+----------+----------+-------+----------+----------+
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|8052690057|7757919463|
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|9886177375|9916790556|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|SUCCESS|8618627996|9886177375|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|DROPPED|9876515616|4894949494|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|FAILED |5454545454|6469496477|
+--------------------+----------+----------+-------+----------+----------+
only showing top 5 rows
df.where("status='SUCCESS'").show(5)
or
df.filter('status="SUCCESS"').show(5)
+--------------------+----------+----------+-------+----------+----------+
| id| starttime| endtime| status| fromno| tono|
+--------------------+----------+----------+-------+----------+----------+
|ec59cea2-5006-448...|2014-03-16|2014-03-16|SUCCESS|8618627996|9886177375|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|SUCCESS|1235467890|2153698431|
|ec59cea2-5006-448...|2014-03-18|2014-03-18|SUCCESS|4545454545|7978978979|
|ec59cea2-5006-448...|2014-03-19|2014-03-19|SUCCESS|8464564560|5646064646|
|ec59cea2-5006-448...|2014-03-20|2014-03-20|SUCCESS|8789864098|9489089409|
+--------------------+----------+----------+-------+----------+----------+
only showing top 5 rows
from pyspark.sql.functions import *
df.withColumn("fromno",col("fromno").cast("long")).withColumn("fromnonew",col("fromno")/10).show(5)
from pyspark.sql.functions import *
#typecasting : df.withColumn("fromno",col("fromno").cast("long"))
#create new column using existing column data : withColumn("fromnonew",col("fromno")/10)
df.withColumn("fromno",col("fromno").cast("long")).withColumn("fromnonew",col("fromno")/10).show(5)
+--------------------+----------+----------+-------+----------+----------+-------------+
| id| starttime| endtime| status| fromno| tono| fromnonew|
+--------------------+----------+----------+-------+----------+----------+-------------+
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|8052690057|7757919463|8.052690057E8|
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|9886177375|9916790556|9.886177375E8|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|SUCCESS|8618627996|9886177375|8.618627996E8|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|DROPPED|9876515616|4894949494|9.876515616E8|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|FAILED |5454545454|6469496477|5.454545454E8|
+--------------------+----------+----------+-------+----------+----------+-------------+
only showing top 5 rows
#create new column - value will be based on condition
df1 = df.withColumn("severity",when(col("status")=="DROPPED","2").when(rtrim(col("status"))=="FAILED",1).otherwise('0'))
df1.show(5)
+--------------------+----------+----------+-------+----------+----------+--------+
| id| starttime| endtime| status| fromno| tono|severity|
+--------------------+----------+----------+-------+----------+----------+--------+
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|8052690057|7757919463| 2|
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|9886177375|9916790556| 2|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|SUCCESS|8618627996|9886177375| 0|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|DROPPED|9876515616|4894949494| 2|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|FAILED |5454545454|6469496477| 1|
+--------------------+----------+----------+-------+----------+----------+--------+
#Create a column with constant as data
df2 = df1.withColumn("city",lit("Bangalore")).withColumn("pin",lit("560093"))
df2.show(5)
+--------------------+----------+----------+-------+----------+----------+--------+---------+------+
| id| starttime| endtime| status| fromno| tono|severity| city| pin|
+--------------------+----------+----------+-------+----------+----------+--------+---------+------+
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|8052690057|7757919463| 2|Bangalore|560093|
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|9886177375|9916790556| 2|Bangalore|560093|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|SUCCESS|8618627996|9886177375| 0|Bangalore|560093|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|DROPPED|9876515616|4894949494| 2|Bangalore|560093|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|FAILED |5454545454|6469496477| 1|Bangalore|560093|
#Renaming existing column of a dataframe
df3 = df2.withColumnRenamed('fromno','dialingno').withColumnRenamed("tono","receiverno")
df3.show(5)
+--------------------+----------+----------+-------+----------+----------+--------+---------+------+
| id| starttime| endtime| status| dialingno|receiverno|severity| city| pin|
+--------------------+----------+----------+-------+----------+----------+--------+---------+------+
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|8052690057|7757919463| 2|Bangalore|560093|
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|9886177375|9916790556| 2|Bangalore|560093|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|SUCCESS|8618627996|9886177375| 0|Bangalore|560093|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|DROPPED|9876515616|4894949494| 2|Bangalore|560093|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|FAILED |5454545454|6469496477| 1|Bangalore|560093|
+--------------------+----------+----------+-------+----------+----------+--------+---------+------+
only showing top 5 rows
#drop columns
df.drop("id","tono").show(5)
+----------+----------+-------+----------+
| starttime| endtime| status| fromno|
+----------+----------+-------+----------+
|2014-03-15|2014-03-15|DROPPED|8052690057|
|2014-03-15|2014-03-15|DROPPED|9886177375|
|2014-03-16|2014-03-16|SUCCESS|8618627996|
|2014-03-16|2014-03-16|DROPPED|9876515616|
|2014-03-16|2014-03-16|FAILED |5454545454|
+----------+----------+-------+----------+
#SparkSQL
df1.createOrReplaceTempView("mytable")
spark.sql("select * from mytable").show()
+--------------------+----------+----------+-------+----------+----------+--------+
| id| starttime| endtime| status| fromno| tono|severity|
+--------------------+----------+----------+-------+----------+----------+--------+
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|8052690057|7757919463| 2|
|ec59cea2-5006-448...|2014-03-15|2014-03-15|DROPPED|9886177375|9916790556| 2|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|SUCCESS|8618627996|9886177375| 0|
|ec59cea2-5006-448...|2014-03-16|2014-03-16|DROPPED|9876515616|4894949494| 2|
+--------------------+----------+----------+-------+----------+----------+--------+
spark.sql("show tables in default").show()
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| | mytable| true|
+--------+---------+-----------+
No comments:
Post a Comment