Fill missing entry with nodata programmatically
input file:
/home/hadoop/scratch/fillmissingdata.txt
SPARK SPARK SPARK SPARK
SPARK SPARK SPARK
SPARK SPARK
SPARK
SPARK SPARK
SPARK SPARK SPARK
SPARK SPARK SPARK SPARK
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("sparkApp").getOrCreate()
r1 = spark.sparkContext.textFile("/home/hadoop/scratch/fillmissingdata.txt")
#print(r1.collect())
#print(r1.count())
r2 = r1.map(lambda x:(str)(x)).map(lambda x:x.split(" "))
r3 = r2.map(lambda x:len(x))
columnsize = max(r3.collect())
def fillmissing(x):
result=[]
for i in range(columnsize):
try:
result.append(x[i])
except:
result.append('nodata')
return result
resultdata = r2.map(lambda x:fillmissing(x))
#print(resultdata.collect())
df = resultdata.toDF(['First','Second','Third','Fourth'])
df.show()
output:
+-----+------+------+------+
| _1| _2| _3| _4|
+-----+------+------+------+
|SPARK| SPARK| SPARK| SPARK|
|SPARK| SPARK| SPARK|nodata|
|SPARK| SPARK|nodata|nodata|
|SPARK|nodata|nodata|nodata|
|SPARK| SPARK|nodata|nodata|
|SPARK| SPARK| SPARK|nodata|
|SPARK| SPARK| SPARK| SPARK|
+-----+------+------+------+
No comments:
Post a Comment