Wednesday, 16 January 2019

Broadcasting an object in Spark

u.item content:
-----------------
1|Toy Story (1995)
2|GoldenEye (1995)
3|Four Rooms (1995)
4|Get Shorty (1995)


scala> import scala.io
import scala.io

scala> val lines = scala.io.Source.fromFile("E:\\POCs\\ml-100k\\u.item").getLines()
lines: Iterator[String] = non-empty iterator

var movieNames:Map[Int,String] = Map()

scala> for (line <- lines) {
       var fields = line.split('|')
       if (fields.length > 1) {
       movieNames += (fields(0).toInt -> fields(1))
       }
       }

scala> movieNames.take(5).foreach(println)
(645,Paris Is Burning (1990))
(892,Flubber (1997))
(69,Forrest Gump (1994))
(1322,Metisse (Café au Lait) (1993))
(1665,Brother's Kiss, A (1997))


scala> movieNames(1665)
res10: String = Brother's Kiss, A (1997)

scala> movieNames(100)
res11: String = Fargo (1996)

scala> import scala.io
import scala.io

scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext._

scala> import org.apache.log4j._
import org.apache.log4j._

scala> import scala.io.Source
import scala.io.Source

scala> import java.nio.charset.CodingErrorAction
import java.nio.charset.CodingErrorAction

scala> import scala.io.Codec
import scala.io.Codec

scala> def loadMovieNames() : Map[Int, String] = {
   
          // Handle character encoding issues:
          implicit val codec = Codec("UTF-8")
          codec.onMalformedInput(CodingErrorAction.REPLACE)
          codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
   
          // Create a Map of Ints to Strings, and populate it from u.item.
          var movieNames:Map[Int, String] = Map()
   
           val lines = Source.fromFile("E:\\POCs\\ml-100k\\u.item").getLines()
           for (line <- lines) {
             var fields = line.split('|')
             if (fields.length > 1) {
              movieNames += (fields(0).toInt -> fields(1))
             }
           }
   
           return movieNames
        }
loadMovieNames: ()Map[Int,String]

// Broadcast Map Object
scala> var nameDict = sc.broadcast(loadMovieNames)
nameDict: org.apache.spark.broadcast.Broadcast[Map[Int,String]] = Broadcast(0)


// get data from Broadcast variable
scala> nameDict.value(100)
res17: String = Fargo (1996)

scala> nameDict.value(1)
res18: String = Toy Story (1995)

No comments:

Post a Comment

Flume - Simple Demo

// create a folder in hdfs : $ hdfs dfs -mkdir /user/flumeExa // Create a shell script which generates : Hadoop in real world <n>...