Sunday, 24 May 2020

Fill missing entry with NODATA programmatically : PySpark RDD programming

Fill missing entry with nodata programmatically

input file:
/home/hadoop/scratch/fillmissingdata.txt

SPARK SPARK SPARK SPARK
SPARK SPARK SPARK
SPARK SPARK
SPARK
SPARK SPARK
SPARK SPARK SPARK
SPARK SPARK SPARK SPARK


from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("sparkApp").getOrCreate()

r1 = spark.sparkContext.textFile("/home/hadoop/scratch/fillmissingdata.txt")
#print(r1.collect())
#print(r1.count())

r2 = r1.map(lambda x:(str)(x)).map(lambda x:x.split(" "))
r3 = r2.map(lambda x:len(x))
columnsize = max(r3.collect())

def fillmissing(x):
result=[]
for i in range(columnsize):
try:
result.append(x[i])
except:
result.append('nodata')
return result
resultdata = r2.map(lambda x:fillmissing(x))
#print(resultdata.collect())

df = resultdata.toDF(['First','Second','Third','Fourth'])
df.show()


output:
+-----+------+------+------+
|   _1|    _2|    _3|    _4|
+-----+------+------+------+
|SPARK| SPARK| SPARK| SPARK|
|SPARK| SPARK| SPARK|nodata|
|SPARK| SPARK|nodata|nodata|
|SPARK|nodata|nodata|nodata|
|SPARK| SPARK|nodata|nodata|
|SPARK| SPARK| SPARK|nodata|
|SPARK| SPARK| SPARK| SPARK|
+-----+------+------+------+

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...