Showing posts with label PyCharm. Show all posts
Showing posts with label PyCharm. Show all posts

Sunday, 24 May 2020

Reading CSV using PySpark and Export into Hive - PyCharm Example

Start PyCharm :
hadoop@hadoop:~/pycharm-community-2020.1.1/bin$ ./pycharm.sh 
Reading csv and export data into Hive using PyCharm

demo.py:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext


def createsparkdriver():
    spark = SparkSession.builder.master("local").appName("demoApp").\
            config("spark.sql.warehouse.dir", "hdfs://localhost:8020/user/hive/warehouse").\
            enableHiveSupport().getOrCreate()
    return spark

programming.py:

from demo import createsparkdriver
import logging

if __name__ == "__main__":
    logging.basicConfig(filename="/home/hadoop/logginghere/mylog.log", level=logging.INFO)
    spark = createsparkdriver()
    logging.info("Spark Driver Created Successfully")

    logging.info("Reading file") #hard coded the file path
    df = spark.read.format("csv").option("header",True).load("hdfs://localhost:9000/SparkFiles/person.csv")
    df.show()

#write the dataframe into hive table (db : rocks, table : emp )
    df.write.saveAsTable("rocks.emp")
    spark.stop()
    logging.info("Program complete")


Run this program.

+-----+---+------+------+
| Name|Age|Gender|Salary|
+-----+---+------+------+
| Ravi| 23|     M|  5000|
|Rahul| 24|     M|  6300|
| Siva| 22|     M|  3200|
+-----+---+------+------+


hive> use rocks;
 

hive> show tables;

emp
employee
person


hive> select * from emp;

Ravi 23 M 5000
Rahul 24 M 6300
Siva 22 M 3200



Saturday, 23 May 2020

Log the INFO in a file using Pyspark with PyCharm - Log configuration

Log the INFO into a file in linux

demo.py:

from pyspark.sql import SparkSession


def createsparkdriver():
    spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()
    return spark

programming.py:

from demo import createsparkdriver
import logging #logging

if __name__ == "__main__":
    logging.basicConfig(filename="/home/hadoop/logginghere/mylog.log", level=logging.INFO)
    spark = createsparkdriver()
    logging.info("Spark Driver Created Successfully")

    logging.info("Reading input paramaters")
    file_format = input("Enter the file format\t : ")
    file_path = input("Enter the input file path\t : ")

    logging.info("Reading file")
    df = spark.read.format(file_format).option("multiline", True).load(file_path)
    df.show()

    #write the output into parquet file
    df.write.format("parquet").mode("overwrite").save("/home/hadoop/logginghere/dfout")
   
    spark.stop()
    logging.info("Program complete")


Enter the file format : csv
Enter the input file path : hdfs://localhost:9000/SparkFiles/person.csv
+-----+---+---+----+
|  _c0|_c1|_c2| _c3|
+-----+---+---+----+
| Ravi| 23|  M|5000|
|Rahul| 24|  M|6300|
| Siva| 22|  M|3200|
+-----+---+---+----+



hadoop@hadoop:~/logginghere$ cat mylog.log
INFO:root:Spark Driver Created Successfully
INFO:root:Reading input paramaters
INFO:root:Reading file
INFO:root:Program complete

PyCharm with PySpark - sample program

PYTHONPATH = $SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.8.1-src.zip
$SPARK_HOME = /home/hadoop/spark-3.0.0-preview2-bin-hadoop3.2

environmental variables

PYTHONUNBUFFERED=1;PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.8.1-src.zip;$SPARK_HOME=/home/hadoop/spark-3.0.0-preview2-bin-hadoop3.2


File - New - Project
File - New - Python Package
go inside the package:
New - Python file  (demo.py, programming.py)
demo.py:

from pyspark.sql import SparkSession


def createsparkdriver():
    spark = SparkSession.builder.master("local").appName("demoApp").getOrCreate()
    return spark


programming.py:

from demo import createsparkdriver

if __name__ == "__main__":
    spark = createsparkdriver()
    df = spark.read.format("json").option("multiline", True).load("hdfs://localhost:9000/SparkFiles/orgs.json")
    df.show()
    spark.stop()
Right click - Run



Interactive one:

from demo import createsparkdriver

if __name__ == "__main__":
    spark = createsparkdriver()

    file_format = input("Enter the file format\t : ")
    file_path = input("Enter the input file path\t : ")

    df = spark.read.format(file_format).option("multiline", True).load(file_path)
    df.show()
    spark.stop()

Run it:

Enter the file format : json
Enter the input file path : hdfs://localhost:9000/SparkFiles/orgs.json


Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...