Showing posts with label json. Show all posts
Showing posts with label json. Show all posts

Friday, 22 May 2020

Read online json and create local file using Pyspark

hadoop@hadoop:~/Downloads$ hdfs dfs -copyFromLocal orgs.json /SparkFiles/

hadoop@hadoop:~/Downloads$ hdfs dfs -cat hdfs://localhost:9000/SparkFiles/orgs.json

#we downloaded and copied the .json file 
df_json = spark.read.format("json").load("hdfs://localhost:9000/SparkFiles/orgs.json")
root
 |-- _corrupt_record: string (nullable = true)

#multiline = True

df_json = spark.read.format("json").option("multiline",True).load("hdfs://localhost:9000/SparkFiles/orgs.json")
df_json.printSchema()

root
 |-- avatar_url: string (nullable = true)
 |-- description: string (nullable = true)
 |-- events_url: string (nullable = true)
 |-- hooks_url: string (nullable = true)
 |-- id: long (nullable = true)
 |-- issues_url: string (nullable = true)
 |-- login: string (nullable = true)
 |-- members_url: string (nullable = true)
 |-- node_id: string (nullable = true)
 |-- public_members_url: string (nullable = true)
 |-- repos_url: string (nullable = true)
 |-- url: string (nullable = true)

df_json.select("avatar_url","id","login").show(5,False)

+----------------------------------------------------+-------+--------+
|avatar_url                                          |id     |login   |
+----------------------------------------------------+-------+--------+
|https://avatars1.githubusercontent.com/u/423638?v=4 |423638 |ggobi   |
|https://avatars1.githubusercontent.com/u/513560?v=4 |513560 |rstudio |
|https://avatars1.githubusercontent.com/u/722735?v=4 |722735 |rstats  |
|https://avatars3.githubusercontent.com/u/1200269?v=4|1200269|ropensci|
|https://avatars2.githubusercontent.com/u/3330561?v=4|3330561|rjournal|
+----------------------------------------------------+-------+--------+

#write the program to read json directly from url

url for json  : https://api.github.com/users/hadley/orgs

#reading from REST API(Representational State Transfer)



import requests
import json

#json is available in the below url
jsonapidata = requests.request("GET","https://api.github.com/users/hadley/orgs")
jsondata = jsonapidata.json()



print(type(len(jsonapidata.json())))
print(len(jsonapidata.json()))

#We are creating this file 
file = open("/home/hadoop/Downloads/test.json",'a')

#writing data into the file
for record in jsondata:
file.write("%s\n" %record)
#reading the json file from local
df_json = spark.read.format("json").option("multiline",True).load("/home/hadoop/Downloads/test.json")
df_json.count()
df_json.show(5)

Tuesday, 19 May 2020

Read Json and Write into MySQL table using PySpark

//Read from json

df_local = sqlContext.read.format("json").load("E:\\DataSets\\olympic.json")

df_local.printSchema()

root
 |-- age: long (nullable = true)
 |-- athelete: string (nullable = true)
 |-- bronze: long (nullable = true)
 |-- closing: string (nullable = true)
 |-- country: string (nullable = true)
 |-- gold: long (nullable = true)
 |-- silver: long (nullable = true)
 |-- sport: string (nullable = true)
 |-- total: long (nullable = true)
 |-- year: string (nullable = true)

df_local.show()

+---+--------------------+------+--------+-------------+----+------+--------------------+-----+----+
|age|            athelete|bronze| closing|      country|gold|silver|               sport|total|year|
+---+--------------------+------+--------+-------------+----+------+--------------------+-----+----+
| 30|      Inge de Bruijn|     2|08-29-04|  Netherlands|   1|     1|            Swimming|    4|2004|
| 24|         Ryan Lochte|     2|08-24-08|United States|   2|     0|            Swimming|    4|2008|
| 23|Libby Lenton-Tric...|     1|08-24-08|    Australia|   2|     1|            Swimming|    4|2008|
| 24|     Kirsty Coventry|     0|08-24-08|     Zimbabwe|   1|     3|            Swimming|    4|2008|
| 20|            Sun Yang|     1|08-12-12|        China|   2|     1|            Swimming|    4|2012|
| 29|       Marit Bjørgen|     1|02-28-10|       Norway|   3|     1|Cross Country Skiing|    5|2010|
| 18|       Nastia Liukin|     1|08-24-08|United States|   1|     3|          Gymnastics|    5|2008|
| 26|       Cindy Klassen|     2|02-26-06|       Canada|   1|     2|       Speed Skating|    5|2006|
| 33|         Dara Torres|     3|10-01-00|United States|   2|     0|            Swimming|    5|2000|
| 17|          Ian Thorpe|     0|10-01-00|    Australia|   3|     2|            Swimming|    5|2000|
| 21|    Natalie Coughlin|     1|08-29-04|United States|   2|     2|            Swimming|    5|2004|
| 22|     Allison Schmitt|     1|08-12-12|United States|   3|     1|            Swimming|    5|2012|
| 27|         Ryan Lochte|     1|08-12-12|United States|   2|     2|            Swimming|    5|2012|
| 17|      Missy Franklin|     1|08-12-12|United States|   4|     0|            Swimming|    5|2012|
| 24|       Alicia Coutts|     1|08-12-12|    Australia|   1|     3|            Swimming|    5|2012|
| 24|       Aleksey Nemov|     3|10-01-00|       Russia|   2|     1|          Gymnastics|    6|2000|
| 25|    Natalie Coughlin|     3|08-24-08|United States|   1|     2|            Swimming|    6|2008|
| 27|      Michael Phelps|     0|08-12-12|United States|   4|     2|            Swimming|    6|2012|
| 19|      Michael Phelps|     2|08-29-04|United States|   6|     0|            Swimming|    8|2004|
| 23|      Michael Phelps|     0|08-24-08|United States|   8|     0|            Swimming|    8|2008|
+---+--------------------+------+--------+-------------+----+------+--------------------+-----+----+

//Save dataframe into MySQL table
df_local.write.format("jdbc").\
option("url", "jdbc:mysql://localhost:3306/school").\
option("driver", "com.mysql.jdbc.Driver").\
option("dbtable", "olympic").\
option("user", "root").\
option("password", "Studi0Plus").save()
//Create Dataframe (Read) from MySQL
df_Olympic = sqlContext.read.format("jdbc").\
option("url", "jdbc:mysql://localhost:3306/school").\
option("driver", "com.mysql.jdbc.Driver").\
option("dbtable", "olympic").\
option("user", "root").\
option("password", "Studi0Plus").load()
df_Olympic.show(5)

+---+--------------------+------+--------+-------------+----+------+--------+-----+----+
|age|            athelete|bronze| closing|      country|gold|silver|   sport|total|year|
+---+--------------------+------+--------+-------------+----+------+--------+-----+----+
| 30|      Inge de Bruijn|     2|08-29-04|  Netherlands|   1|     1|Swimming|    4|2004|
| 24|         Ryan Lochte|     2|08-24-08|United States|   2|     0|Swimming|    4|2008|
| 23|Libby Lenton-Tric...|     1|08-24-08|    Australia|   2|     1|Swimming|    4|2008|
| 24|     Kirsty Coventry|     0|08-24-08|     Zimbabwe|   1|     3|Swimming|    4|2008|
| 20|            Sun Yang|     1|08-12-12|        China|   2|     1|Swimming|    4|2012|
+---+--------------------+------+--------+-------------+----+------+--------+-----+----+
only showing top 5 rows


df_Olympic.filter("country = 'United States' and sport='Swimming' and year=2008").show()


+---+----------------+------+--------+-------------+----+------+--------+-----+----+
|age|        athelete|bronze| closing|      country|gold|silver|   sport|total|year|
+---+----------------+------+--------+-------------+----+------+--------+-----+----+
| 24|     Ryan Lochte|     2|08-24-08|United States|   2|     0|Swimming|    4|2008|
| 25|Natalie Coughlin|     3|08-24-08|United States|   1|     2|Swimming|    6|2008|
| 23|  Michael Phelps|     0|08-24-08|United States|   8|     0|Swimming|    8|2008|
+---+----------------+------+--------+-------------+----+------+--------+-----+----+

Tuesday, 19 February 2019

Multi line JSON parser using Spark Dataframe

Input file
tags_sample.json:
-----------------------

{
  "stackoverflow": [{
    "tag": {
      "id": 1,
      "name": "scala",
      "author": "Martin Odersky",
      "frameworks": [
        {
          "id": 1,
          "name": "Play Framework"
        },
        {
          "id": 2,
          "name": "Akka Framework"
        }
      ]
    }
  },
    {
      "tag": {
        "id": 2,
        "name": "java",
        "author": "James Gosling",
        "frameworks": [
          {
            "id": 1,
            "name": "Apache Tomcat"
          },
          {
            "id": 2,
            "name": "Spring Boot"
          }
        ]
      }
    }
  ]
}


scala> import spark.sqlContext.implicits._
import spark.sqlContext.implicits._

scala> val tagsDF = spark.read.option("multiLine", true).option("inferSchema", true).format("json").load("E:\\DataSets\\tags_sample.json")
tagsDF: org.apache.spark.sql.DataFrame = [stackoverflow: array<struct<tag:struct<author:string,frameworks:array<struct<id:bigint,name:string>>,id:bigi
nt,name:string>>>]


scala>

scala> tagsDF.printSchema
root
 |-- stackoverflow: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- tag: struct (nullable = true)
 |    |    |    |-- author: string (nullable = true)
 |    |    |    |-- frameworks: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)

scala> tagsDF.show  // It is not correct
+--------------------+
|       stackoverflow|
+--------------------+
|[[[Martin Odersky...|
+--------------------+


scala> val df = tagsDF.select(explode($"stackoverflow") as "stags")
df: org.apache.spark.sql.DataFrame = [stags: struct<tag: struct<author: string, frameworks:

scala> df.printSchema
root
 |-- stags: struct (nullable = true)
 |    |-- tag: struct (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- frameworks: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)




scala> val df1 = df.select(
     |               $"stags.tag.id" as "id",
     |               $"stags.tag.author" as "author",
     |               $"stags.tag.name" as "tag_name",
     |               $"stags.tag.frameworks.id" as "frameworks_id",
     |               $"stags.tag.frameworks.name" as "frameworks_name"
     |             )
df1: org.apache.spark.sql.DataFrame = [id: bigint, author: string ... 3 more fields]

scala> df1.show
+---+--------------+--------+-------------+--------------------+
| id|        author|tag_name|frameworks_id|     frameworks_name|
+---+--------------+--------+-------------+--------------------+
|  1|Martin Odersky|   scala|       [1, 2]|[Play Framework, ...|
|  2| James Gosling|    java|       [1, 2]|[Apache Tomcat, S...|
+---+--------------+--------+-------------+--------------------+


Sunday, 3 February 2019

Multiline json parser in Spark with Scala

hdfs dfs -copyFromLocal myInput.json /user/

hdfs dfs -cat /user/myInput.json
[{
"Year": "2013",
"First Name": "JANE",
"County": "A",
"Sex": "F",
"Count": "27"
}, {
"Year": "2013",
"First Name": "JADE",
"County": "B",
"Sex": "M",
"Count": "26"
}, {
"Year": "2013",
"First Name": "JAMES",
"County": "C",
"Sex": "M",
"Count": "21"
}]


scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> val sqlContext = new SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@3318d5ab

scala> val rdd1 = sc.wholeTextFiles("hdfs://localhost:9000/user/myInput.json").map(x => x._2)
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[168] at map at <console>:28


scala> val namesJson = sqlContext.read.json(rdd1)
warning: there was one deprecation warning; re-run with -deprecation for details
namesJson: org.apache.spark.sql.DataFrame = [Count: string, County: string ... 3 more fields]



scala> namesJson.printSchema
root
 |-- Count: string (nullable = true)
 |-- County: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Year: string (nullable = true)


scala> namesJson.show
+-----+------+----------+---+----+
|Count|County|First Name|Sex|Year|
+-----+------+----------+---+----+
|   27|     A|      JANE|  F|2013|
|   26|     B|      JADE|  M|2013|
|   21|     C|     JAMES|  M|2013|
+-----+------+----------+---+----+



Input
server.json:
------------

[
     {
      "id": "59761c23b30d971669fb42ff",
      "isActive": true,
      "age": 36,
      "name": "Dunlap Hubbard",
      "gender": "male",
      "company": "CEDWARD",
      "email": "dunlaphubbard@cedward.com",
      "phone": "+1 (890) 543-2508",
      "address": "169 Rutledge Street, Konterra, Northern Mariana Islands, 8551"
    },
    {
      "id": "59761c233d8d0f92a6b0570d",
      "isActive": true,
      "age": 24,
      "name": "Kirsten Sellers",
      "gender": "female",
      "company": "EMERGENT",
      "email": "kirstensellers@emergent.com",
      "phone": "+1 (831) 564-2190",
      "address": "886 Gallatin Place, Fannett, Arkansas, 4656"
    },
    {
      "id": "59761c23fcb6254b1a06dad5",
      "isActive": true,
      "age": 30,
      "name": "Acosta Robbins",
      "gender": "male",
      "company": "ORGANICA",
      "email": "acostarobbins@organica.com",
      "phone": "+1 (882) 441-3367",
      "address": "697 Linden Boulevard, Sattley, Idaho, 1035"
    }
 ]

// it will result an Array with filePath as 1st field, json content as 2nd field
val rdd1 = sc.wholeTextFiles("hdfs://localhost:9000/user/server.json")


//Here we extract the 2nd field (json content) only. We ignore filePath (1st field)
scala> val rdd1 = sc.wholeTextFiles("hdfs://localhost:9000/user/server.json").map(x => x._2)


scala> val df = spark.sqlContext.read.json(rdd1)
warning: there was one deprecation warning; re-run with -deprecation for details
df: org.apache.spark.sql.DataFrame = [address: string, age: bigint ... 7 more fields]

scala> df.show
+--------------------+---+--------+--------------------+------+--------------------+--------+---------------+-----------------+
|             address|age| company|               email|gender|                  id|isActive|           name|            phone|
+--------------------+---+--------+--------------------+------+--------------------+--------+---------------+-----------------+
|169 Rutledge Stre...| 36| CEDWARD|dunlaphubbard@ced...|  male|59761c23b30d97166...|    true| Dunlap Hubbard|+1 (890) 543-2508|
|886 Gallatin Plac...| 24|EMERGENT|kirstensellers@em...|female|59761c233d8d0f92a...|    true|Kirsten Sellers|+1 (831) 564-2190|
|697 Linden Boulev...| 30|ORGANICA|acostarobbins@org...|  male|59761c23fcb6254b1...|    true| Acosta Robbins|+1 (882) 441-3367|
+--------------------+---+--------+--------------------+------+--------------------+--------+---------------+-----------------+

scala> df.show(true)
+--------------------+---+--------+--------------------+------+--------------------+--------+---------------+-----------------+
|             address|age| company|               email|gender|                  id|isActive|           name|            phone|
+--------------------+---+--------+--------------------+------+--------------------+--------+---------------+-----------------+
|169 Rutledge Stre...| 36| CEDWARD|dunlaphubbard@ced...|  male|59761c23b30d97166...|    true| Dunlap Hubbard|+1 (890) 543-2508|
|886 Gallatin Plac...| 24|EMERGENT|kirstensellers@em...|female|59761c233d8d0f92a...|    true|Kirsten Sellers|+1 (831) 564-2190|
|697 Linden Boulev...| 30|ORGANICA|acostarobbins@org...|  male|59761c23fcb6254b1...|    true| Acosta Robbins|+1 (882) 441-3367|
+--------------------+---+--------+--------------------+------+--------------------+--------+---------------+-----------------+

scala> df.filter($"age">30).show
+--------------------+---+-------+--------------------+------+--------------------+--------+--------------+-----------------+
|             address|age|company|               email|gender|                  id|isActive|          name|            phone|
+--------------------+---+-------+--------------------+------+--------------------+--------+--------------+-----------------+
|169 Rutledge Stre...| 36|CEDWARD|dunlaphubbard@ced...|  male|59761c23b30d97166...|    true|Dunlap Hubbard|+1 (890) 543-2508|
+--------------------+---+-------+--------------------+------+--------------------+--------+--------------+-----------------+

scala> df.filter($"gender"==="male").show
+--------------------+---+--------+--------------------+------+--------------------+--------+--------------+-----------------+
|             address|age| company|               email|gender|                  id|isActive|          name|            phone|
+--------------------+---+--------+--------------------+------+--------------------+--------+--------------+-----------------+
|169 Rutledge Stre...| 36| CEDWARD|dunlaphubbard@ced...|  male|59761c23b30d97166...|    true|Dunlap Hubbard|+1 (890) 543-2508|
|697 Linden Boulev...| 30|ORGANICA|acostarobbins@org...|  male|59761c23fcb6254b1...|    true|Acosta Robbins|+1 (882) 441-3367|
+--------------------+---+--------+--------------------+------+--------------------+--------+--------------+-----------------+

scala> df.where($"age">30 and $"gender" ==="male").show
+--------------------+---+-------+--------------------+------+--------------------+--------+--------------+-----------------+
|             address|age|company|               email|gender|                  id|isActive|          name|            phone|
+--------------------+---+-------+--------------------+------+--------------------+--------+--------------+-----------------+
|169 Rutledge Stre...| 36|CEDWARD|dunlaphubbard@ced...|  male|59761c23b30d97166...|    true|Dunlap Hubbard|+1 (890) 543-2508|
+--------------------+---+-------+--------------------+------+--------------------+--------+--------------+-----------------+



Input
ad_events.txt:
---------------
Mon Jan 20 00:00:00 -0800 2014, {"cl":"js","ua":"Mozilla/5.0 (Windows NT 6.1; rv:26.0) Gecko/20100101 Firefox/26.0","ip":"50.46.171.77","cc":"US","rg":"WA","ct":"Everett","pc":"98208","mc":819,"bf":"cd21df8fdaa6eee6b8af906ab1345fe6ce797ad1","vst":"56941a03-b0bd-4f93-912c-f90ba0dc4159","lt":"Sun Jan 19 23:59:39 -0800 2014"}, {"v":"1.1","e":"block","et":"keyword","t":"10957643","u":"http://www.rantsports.com/clubhouse/wp-content/slideshow/2013/09/15-pictures-of-nfl-cheerleaders-who-should-put-on-more-clothes/medium/cheerleadersss-Kirby-Lee-USA-TODAY-Sports1.jpg","pu":"http://www.rantsports.com/clubhouse/2013/09/22/15-nfl-cheerleaders-who-should-put-on-more-clothes/?utm_medium=referral&utm_source=nRelate","bk":"nude","pv":"98580188-4abd-4a1e-ae98-6e21be2d2c5d"}
Mon Jan 20 00:00:00 -0800 2014, {"cl":"js","ua":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36","ip":"38.69.50.179","cc":"US","rg":"IL","ct":"Chicago","pc":"60616","mc":602,"bf":"ec8498f87549a798e738fce5b312a791f1a4d683","vst":"761df6d3-8747-4939-b5f9-f2ecdb95b419","lt":"Mon Jan 20 02:00:00 -0600 2014"}, {"v":"1.1","e":"block","et":"keyword","t":"e0b5822c","u":"http://cdn01.cdnwp.celebuzz.com/wp-content/uploads/legacy/celebuzz/default/msg-127310264361-3.jpg","pu":"http://www.celebuzz.com/photos/miley-cyrus-10-most-inappropriate-moments/9-saying-she-d-do-10/","bk":"sex photos","pv":"8a529597-a4f0-45e8-9e7e-ef5ccb1aa169"}
Mon Jan 20 00:00:00 -0800 2014, {"cl":"js","ua":"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36","ip":"23.241.185.32","cc":"US","rg":"CA","ct":"Los Angeles","pc":"90025","mc":803,"bf":"b519456e03072cc4c58fa60908bca87e8a14fda3","vst":"2a1b62ff-78f5-4731-bc0d-4be3c1e1c69d","lt":"Sun Jan 19 23:59:58 -0800 2014"}, {"v":"1.1","e":"view","t":"0a196892","ab":20722,"u":"http://www.rantlifestyle.com/wp-content/uploads/2014/01/bad-mexican-food.jpg","seq":3,"tr":0.7,"af":true,"pv":"6d85dcf3-6f27-43a9-aa21-91fa46ee8908","pu":"http://www.rantlifestyle.com/2014/01/18/20-things-nobody-tells-moving-southern-california/","rpm":1.76,"pc":0.264,"nc":0.1,"cid":389}
Mon Jan 20 00:00:00 -0800 2014, {"cl":"js","ua":"Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; GTB7.5; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; McAfee)","ip":"74.178.210.131","cc":"US","rg":"FL","ct":"Lake City","pc":"32024","mc":561,"bf":"c7b60dc38c3512c45dbb57af3cac175d4099d083","vst":"2eaa64ca-1700-499b-bd0a-4216a37b50af","lt":"Mon Jan 20 03:00:13 -0500 2014"}, {"v":"1.1","e":"view","t":"05c113f6","ab":18667,"u":"http://www.goodchews.com/wp-content/uploads/2013/12/2318-290x240.jpg","seq":4,"tr":0.1,"af":true,"pv":"407dfa23-d2bc-4469-88cc-4c0a1b4c45d2","pu":"http://www.goodchews.com/recipes/appetizers","rpm":0,"pc":0.2,"nc":0.15,"cid":44}
Mon Jan 20 00:00:00 -0800 2014, {"cl":"js","ua":"Mozilla/5.0 (Windows NT 6.2; WOW64; rv:26.0) Gecko/20100101 Firefox/26.0","ip":"181.33.3.250","cc":"CO","rg":"34","ct":"Bogotá","mc":0,"bf":"454f55805932783a937972bcc648d3b8ef7d67a6","vst":"bbd8af36-4e42-4ee6-934d-c8610266e934","lt":"Mon Jan 20 02:59:56 -0500 2014"}, {"v":"1.1","e":"block","et":"user","t":"5cc3fdcf","u":"http://pmchollywoodlife.files.wordpress.com/2013/12/meadow-walker-mourning-paul-walker-gsi-lead.jpg?w=600","pu":"http://hollywoodlife.com/2013/12/04/meadow-walker-after-paul-walker-death-daughter-spotted-first-photo/","uid":154,"pv":"1bf1c548-1c4e-40e1-9c0c-9b19fb71d484"}


scala> val rdd1 = sc.textFile("hdfs://localhost:9000/user/ads/ad_events.txt")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/ads/ad_events.txt MapPartitionsRDD[45] at textFile at <console>:24

scala> rdd1.count
res22: Long = 29238

scala> rdd1.take(2).foreach(println)
Mon Jan 20 00:00:00 -0800 2014, {"cl":"js","ua":"Mozilla/5.0 (Windows NT 6.1; rv:26.0) Gecko/20100101 Firefox/26.0","ip":"50.46.171.77","cc":"US","rg":"WA","ct":"Everett","pc":"98208","mc":819,"bf":"cd21df8fdaa6eee6b8af906ab1345fe6ce797ad1","vst":"56941a03-b0bd-4f93-912c-f90ba0dc4159","lt":"Sun Jan 19 23:59:39 -0800 2014"}, {"v":"1.1","e":"block","et":"keyword","t":"10957643","u":"http://www.rantsports.com/clubhouse/wp-content/slideshow/2013/09/15-pictures-of-nfl-cheerleaders-who-should-put-on-more-clothes/medium/cheerleadersss-Kirby-Lee-USA-TODAY-Sports1.jpg","pu":"http://www.rantsports.com/clubhouse/2013/09/22/15-nfl-cheerleaders-who-should-put-on-more-clothes/?utm_medium=referral&utm_source=nRelate","bk":"nude","pv":"98580188-4abd-4a1e-ae98-6e21be2d2c5d"}
Mon Jan 20 00:00:00 -0800 2014, {"cl":"js","ua":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36","ip":"38.69.50.179","cc":"US","rg":"IL","ct":"Chicago","pc":"60616","mc":602,"bf":"ec8498f87549a798e738fce5b312a791f1a4d683","vst":"761df6d3-8747-4939-b5f9-f2ecdb95b419","lt":"Mon Jan 20 02:00:00 -0600 2014"}, {"v":"1.1","e":"block","et":"keyword","t":"e0b5822c","u":"http://cdn01.cdnwp.celebuzz.com/wp-content/uploads/legacy/celebuzz/default/msg-127310264361-3.jpg","pu":"http://www.celebuzz.com/photos/miley-cyrus-10-most-inappropriate-moments/9-saying-she-d-do-10/","bk":"sex photos","pv":"8a529597-a4f0-45e8-9e7e-ef5ccb1aa169"}

scala> val pattern = "[0-9]{4}, ".r
pattern: scala.util.matching.Regex = [0-9]{4},

scala> val rdd2 = rdd1.map(x => x.split(pattern.findFirstIn(x).get))
rdd2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[46] at map at <console>:27

// rdd2 has 2 fields
scala> rdd2.take(2)
res25: Array[Array[String]] = Array(Array("Mon Jan 20 00:00:00 -0800 ", {"cl":"js","ua":"Mozilla/5.0 (Windows NT 6.1; rv:26.0) Gecko/20100101 Firefox/26.0","ip":"50.46.171.77","cc":"US","rg":"WA","ct":"Everett","pc":"98208","mc":819,"bf":"cd21df8fdaa6eee6b8af906ab1345fe6ce797ad1","vst":"56941a03-b0bd-4f93-912c-f90ba0dc4159","lt":"Sun Jan 19 23:59:39 -0800 2014"}, {"v":"1.1","e":"block","et":"keyword","t":"10957643","u":"http://www.rantsports.com/clubhouse/wp-content/slideshow/2013/09/15-pictures-of-nfl-cheerleaders-who-should-put-on-more-clothes/medium/cheerleadersss-Kirby-Lee-USA-TODAY-Sports1.jpg","pu":"http://www.rantsports.com/clubhouse/2013/09/22/15-nfl-cheerleaders-who-should-put-on-more-clothes/?utm_medium=referral&utm_source=nRelate","bk":"nude","pv":"98580188-4abd-4a1e-ae98-6e2...
scala>

// we extract the 2nd field
scala> val rdd3 = rdd2.map(x => (x(1)))
rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[48] at map at <console>:25

scala> rdd3.take(2).foreach(println)
{"cl":"js","ua":"Mozilla/5.0 (Windows NT 6.1; rv:26.0) Gecko/20100101 Firefox/26.0","ip":"50.46.171.77","cc":"US","rg":"WA","ct":"Everett","pc":"98208","mc":819,"bf":"cd21df8fdaa6eee6b8af906ab1345fe6ce797ad1","vst":"56941a03-b0bd-4f93-912c-f90ba0dc4159","lt":"Sun Jan 19 23:59:39 -0800 2014"}, {"v":"1.1","e":"block","et":"keyword","t":"10957643","u":"http://www.rantsports.com/clubhouse/wp-content/slideshow/2013/09/15-pictures-of-nfl-cheerleaders-who-should-put-on-more-clothes/medium/cheerleadersss-Kirby-Lee-USA-TODAY-Sports1.jpg","pu":"http://www.rantsports.com/clubhouse/2013/09/22/15-nfl-cheerleaders-who-should-put-on-more-clothes/?utm_medium=referral&utm_source=nRelate","bk":"nude","pv":"98580188-4abd-4a1e-ae98-6e21be2d2c5d"}
{"cl":"js","ua":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36","ip":"38.69.50.179","cc":"US","rg":"IL","ct":"Chicago","pc":"60616","mc":602,"bf":"ec8498f87549a798e738fce5b312a791f1a4d683","vst":"761df6d3-8747-4939-b5f9-f2ecdb95b419","lt":"Mon Jan 20 02:00:00 -0600 2014"}, {"v":"1.1","e":"block","et":"keyword","t":"e0b5822c","u":"http://cdn01.cdnwp.celebuzz.com/wp-content/uploads/legacy/celebuzz/default/msg-127310264361-3.jpg","pu":"http://www.celebuzz.com/photos/miley-cyrus-10-most-inappropriate-moments/9-saying-she-d-do-10/","bk":"sex photos","pv":"8a529597-a4f0-45e8-9e7e-ef5ccb1aa169"}


// our json single line has {},{} -- but it reads just first set {}. 2nd set {} is ignored.
scala> val df1 = spark.sqlContext.read.json(rdd3)
warning: there was one deprecation warning; re-run with -deprecation for details
df1: org.apache.spark.sql.DataFrame = [bf: string, cc: string ... 9 more fields]

// this schema belongs to 1st set {}
scala> df1.printSchema
root
 |-- bf: string (nullable = true)
 |-- cc: string (nullable = true)
 |-- cl: string (nullable = true)
 |-- ct: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- lt: string (nullable = true)
 |-- mc: long (nullable = true)
 |-- pc: string (nullable = true)
 |-- rg: string (nullable = true)
 |-- ua: string (nullable = true)
 |-- vst: string (nullable = true)

// partial data
scala> df1.columns.size
res31: Int = 11

// Here we merge 2 sets
scala> val rdd3 = rdd2.map(x => x(1)).map(x => x.replace("}, {",","))
rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[56] at map at <console>:25

scala> rdd3.take(2).foreach(println)
{"cl":"js","ua":"Mozilla/5.0 (Windows NT 6.1; rv:26.0) Gecko/20100101 Firefox/26.0","ip":"50.46.171.77","cc":"US","rg":"WA","ct":"Everett","pc":"98208","mc":819,"bf":"cd21df8fdaa6eee6b8af906ab1345fe6ce797ad1","vst":"56941a03-b0bd-4f93-912c-f90ba0dc4159","lt":"Sun Jan 19 23:59:39 -0800 2014","v":"1.1","e":"block","et":"keyword","t":"10957643","u":"http://www.rantsports.com/clubhouse/wp-content/slideshow/2013/09/15-pictures-of-nfl-cheerleaders-who-should-put-on-more-clothes/medium/cheerleadersss-Kirby-Lee-USA-TODAY-Sports1.jpg","pu":"http://www.rantsports.com/clubhouse/2013/09/22/15-nfl-cheerleaders-who-should-put-on-more-clothes/?utm_medium=referral&utm_source=nRelate","bk":"nude","pv":"98580188-4abd-4a1e-ae98-6e21be2d2c5d"}
{"cl":"js","ua":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36","ip":"38.69.50.179","cc":"US","rg":"IL","ct":"Chicago","pc":"60616","mc":602,"bf":"ec8498f87549a798e738fce5b312a791f1a4d683","vst":"761df6d3-8747-4939-b5f9-f2ecdb95b419","lt":"Mon Jan 20 02:00:00 -0600 2014","v":"1.1","e":"block","et":"keyword","t":"e0b5822c","u":"http://cdn01.cdnwp.celebuzz.com/wp-content/uploads/legacy/celebuzz/default/msg-127310264361-3.jpg","pu":"http://www.celebuzz.com/photos/miley-cyrus-10-most-inappropriate-moments/9-saying-she-d-do-10/","bk":"sex photos","pv":"8a529597-a4f0-45e8-9e7e-ef5ccb1aa169"}

// Here we included both the sets
scala> val df1 = spark.sqlContext.read.json(rdd3)
warning: there was one deprecation warning; re-run with -deprecation for details
df1: org.apache.spark.sql.DataFrame = [ab: bigint, af: boolean ... 32 more fields]

// this is perfect and we have got all the columns
scala> df1.printSchema
root
 |-- ab: long (nullable = true)
 |-- af: boolean (nullable = true)
 |-- ai: string (nullable = true)
 |-- bf: string (nullable = true)
 |-- bk: string (nullable = true)
 |-- cc: string (nullable = true)
 |-- cid: long (nullable = true)
 |-- cl: string (nullable = true)
 |-- cre: string (nullable = true)
 |-- ct: string (nullable = true)
 |-- du: string (nullable = true)
 |-- e: string (nullable = true)
 |-- et: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- lt: string (nullable = true)
 |-- mc: long (nullable = true)
 |-- nc: double (nullable = true)
 |-- pc: string (nullable = true)
 |-- pc: double (nullable = true)
 |-- pu: string (nullable = true)
 |-- pv: string (nullable = true)
 |-- rg: string (nullable = true)
 |-- rpc: double (nullable = true)
 |-- rpm: double (nullable = true)
 |-- rpv: double (nullable = true)
 |-- seq: long (nullable = true)
 |-- t: string (nullable = true)
 |-- tr: double (nullable = true)
 |-- u: string (nullable = true)
 |-- ua: string (nullable = true)
 |-- uid: long (nullable = true)
 |-- v: string (nullable = true)
 |-- vst: string (nullable = true)
 |-- vtw: long (nullable = true)

scala> df1.columns.size
res35: Int = 34

// drop a column named "pc"
scala> val df2 = df1.drop("pc")
df2: org.apache.spark.sql.DataFrame = [ab: bigint, af: boolean ... 30 more fields]

scala> df2.printSchema
root
 |-- ab: long (nullable = true)
 |-- af: boolean (nullable = true)
 |-- ai: string (nullable = true)
 |-- bf: string (nullable = true)
 |-- bk: string (nullable = true)
 |-- cc: string (nullable = true)
 |-- cid: long (nullable = true)
 |-- cl: string (nullable = true)
 |-- cre: string (nullable = true)
 |-- ct: string (nullable = true)
 |-- du: string (nullable = true)
 |-- e: string (nullable = true)
 |-- et: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- lt: string (nullable = true)
 |-- mc: long (nullable = true)
 |-- nc: double (nullable = true)
 |-- pu: string (nullable = true)
 |-- pv: string (nullable = true)
 |-- rg: string (nullable = true)
 |-- rpc: double (nullable = true)
 |-- rpm: double (nullable = true)
 |-- rpv: double (nullable = true)
 |-- seq: long (nullable = true)
 |-- t: string (nullable = true)
 |-- tr: double (nullable = true)
 |-- u: string (nullable = true)
 |-- ua: string (nullable = true)
 |-- uid: long (nullable = true)
 |-- v: string (nullable = true)
 |-- vst: string (nullable = true)
 |-- vtw: long (nullable = true)


scala> df2.write.mode("append").saveAsTable("default.jsondf")
                                                                               
scala> df2.createOrReplaceTempView("test")

scala> spark.sql("select * from test").show(10)

scala> spark.sql("select * from default.jsonexa").show


Saturday, 2 February 2019

Experimenting JSON file with Spark SQL

Input file
employee1.json:
---------------
{"empid":1, "empname":"Siya", "dept":"IT", "address":{"city":"Pune","state":"Maharashtra"}, "salary":150000}
{"empid":2, "empname":"Swayam", "dept":"HR", "address":{"city":"Bangalore", "state":"Karnataka"}, "salary":130000}
{"empid":3, "empname":"Jeet", "dept":"IT", "address":{"city":"Mumbai","state":"Maharashtra"}, "salary":170000}
{"empid":4, "empname":"Priya", "dept":"HR", "address":{"city":"Chennai", "state":"Tamilnadu"}, "salary":145000}



hdfs dfs -copyFromLocal employee.json /user/employee1.json

scala> val employeeDF = spark.read.format("json").option("inferSchema","true").load("hdfs://localhost:9000/user/employee1.json")
employeeDF: org.apache.spark.sql.DataFrame = [address: struct<city: string, state: string>, dept: string ... 3 more fields]

scala> employeeDF.printSchema
root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- empid: long (nullable = true)
 |-- empname: string (nullable = true)
 |-- salary: long (nullable = true)


scala> employeeDF.show
+--------------------+----+-----+-------+------+
|             address|dept|empid|empname|salary|
+--------------------+----+-----+-------+------+
| [Pune, Maharashtra]|  IT|    1|   Siya|150000|
|[Bangalore, Karna...|  HR|    2| Swayam|130000|
|[Mumbai, Maharash...|  IT|    3|   Jeet|170000|
|[Chennai, Tamilnadu]|  HR|    4|  Priya|145000|
+--------------------+----+-----+-------+------+

scala> e.select($"empname",$"address.city").show
+-------+---------+
|empname|     city|
+-------+---------+
|   Siya|     Pune|
| Swayam|Bangalore|
|   Jeet|   Mumbai|
|  Priya|  Chennai|
+-------+---------+


scala> e.select($"empname",$"address.city",$"address.state",$"dept",$"empid",$"empname").show
+-------+---------+-----------+----+-----+-------+
|empname|     city|      state|dept|empid|empname|
+-------+---------+-----------+----+-----+-------+
|   Siya|     Pune|Maharashtra|  IT|    1|   Siya|
| Swayam|Bangalore|  Karnataka|  HR|    2| Swayam|
|   Jeet|   Mumbai|Maharashtra|  IT|    3|   Jeet|
|  Priya|  Chennai|  Tamilnadu|  HR|    4|  Priya|
+-------+---------+-----------+----+-----+-------+

scala> e.createOrReplaceTempView("emp")

scala> spark.sql("select * from emp")
res46: org.apache.spark.sql.DataFrame = [address: struct<city: string, state: string>, dept: string ... 3 more fields]

scala> spark.sql("select * from emp").show
+--------------------+----+-----+-------+------+
|             address|dept|empid|empname|salary|
+--------------------+----+-----+-------+------+
| [Pune, Maharashtra]|  IT|    1|   Siya|150000|
|[Bangalore, Karna...|  HR|    2| Swayam|130000|
|[Mumbai, Maharash...|  IT|    3|   Jeet|170000|
|[Chennai, Tamilnadu]|  HR|    4|  Priya|145000|
+--------------------+----+-----+-------+------+

scala> spark.sql("select empname,salary,dept,empid,address.city,address.state from emp").show
+-------+------+----+-----+---------+-----------+
|empname|salary|dept|empid|     city|      state|
+-------+------+----+-----+---------+-----------+
|   Siya|150000|  IT|    1|     Pune|Maharashtra|
| Swayam|130000|  HR|    2|Bangalore|  Karnataka|
|   Jeet|170000|  IT|    3|   Mumbai|Maharashtra|
|  Priya|145000|  HR|    4|  Chennai|  Tamilnadu|
+-------+------+----+-----+---------+-----------+

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...