Spark - MongoDB Data Processing (Scala)

Processing data from Mongo on distributed environment - Apache Spark

Pavan Kulkarni

7 minute read

We will look into basic details of how to process data from MongoDB using Apache Spark.

The MongoDB Connector for Spark provides integration between MongoDB and Apache Spark.

With the connector, you have access to all Spark libraries for use with MongoDB datasets: Datasets for analysis with SQL (benefiting from automatic schema inference), streaming, machine learning, and graph APIs. You can also use the connector with the Spark Shell.

Why use Apache Spark ???

Rich Operators and Algorithms : Spark supports many operators that are not natively available in mongo shell. Along with operators, Spark also brings in optimized algorithms to process huge data.

Distributed Platform : Although Mongo provides distributed environment for storing documents, processing large amount of data becomes complex. Introducing Apache Spark at this point, will ease the computation. Also, mongo spark drivers are compatible with most of the commonly used programming languages viz. Scala, Java, Python and R.

Together MongoDB and Apache Spark are enabling success by turning analytics into real-time action.

Let’s look at some examples on basic read and write operations to and from MongoDB in Scala. Full code is available on my GitHub Repo

Set up Mongo-Spark Drivers

I will be using the latest version of mongo-spark-connector We have already seen How to run spark in eclipse using Gradle. Add the mongo-spark-connector driver to build.gradle as show below. You can check for the latest version here.

build.gradle

dependencies{

	provided 'org.apache.spark:spark-core_2.11:2.2.1'
	provided 'org.apache.spark:spark-sql_2.11:2.2.1'
	provided 'org.apache.spark:spark-catalyst_2.11:2.2.1'
	compile group: 'org.mongodb.spark', name: 'mongo-spark-connector_2.11', version: '2.2.2'

}

I’m using Scala 2.11 and Spark 2.2.1 as these are the latest versions available as of the time when this post was published.

Initialize SparkSession

Starting Apache Spark 2.0.0 the entry point to Spark Job is changed from SparkContext to SparkSession. SparkSession gives combined customization options of SparkConext and SparkSQL.

val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark_Mongo")
      .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/super_hero_db.students")
      .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/super_hero_db.students")
      .getOrCreate()

Here I have

  1. Initialized the Spark Job entry point
  2. Set the Spark Master
  3. Initialized connection to MongoDB
  4. Set a name for my Spark job.

Read From MongoDB

Let’s use the spark SparkSession and read values from Mongo. I have 1 collection - students in MongoDB and sample data can be seen in this post.

    val studentsDF = MongoSpark.load(spark)
    studentsDF.show(false)


This returns a Dataframe with Students data.

studentsDF.show(false)

+--------------------------+-------------------------------------------------------------+----+----------------+--------------+
|_id                       |courses_registered                                           |id  |name            |year_graduated|
+--------------------------+-------------------------------------------------------------+----+----------------+--------------+
|[5afc9b45ef01bb656bfe5fd3]|[[CS001,Spring_2001], [CS002,Summer_2001], [CS001,Fall_2001]]|1.0 |Tom Riddle      |2001          |
|[5afc9b45ef01bb656bfe5fd4]|[[CS003,Spring_2002], [CS004,Summer_2002], [CS005,Fall_2002]]|3.0 |Haan Solo       |2002          |
|[5afc9b45ef01bb656bfe5fd5]|[[CS004,Spring_2004], [CS005,Summer_2004], [CS003,Fall_2004]]|5.0 |Sheldon Cooper  |2004          |
|[5afc9b45ef01bb656bfe5fd6]|[[CS009,Spring_2005], [CS006,Summer_2005], [CS004,Fall_2005]]|6.0 |Iron Man        |2005          |
|[5afc9b45ef01bb656bfe5fd7]|[[CS004,Spring_2006], [CS005,Summer_2006], [CS003,Fall_2006]]|7.0 |Stephan Hawkings|2006          |
|[5afc9b45ef01bb656bfe5fd8]|[[CS001,Spring_2007], [CS003,Summer_2007], [CS009,Fall_2007]]|8.0 |Cerci Lannister |2007          |
|[5afc9b45ef01bb656bfe5fd9]|[[CS006,Spring_2008], [CS007,Summer_2008], [CS009,Fall_2008]]|9.0 |Wonder Woman    |2008          |
|[5afc9b45ef01bb656bfe5fda]|[[CS009,Spring_2003], [CS010,Summer_2003], [CS004,Fall_2003]]|4.0 |Frodo Baggins   |2003          |
|[5afc9b45ef01bb656bfe5fdb]|[[CS001,Spring_2010], [CS002,Summer_2010], [CS005,Fall_2010]]|11.0|Peter Parker    |2010          |
|[5afc9b45ef01bb656bfe5fdc]|[[CS010,Spring_2009], [CS002,Summer_2009], [CS007,Fall_2009]]|10.0|Hermione Granger|2009          |
|[5afc9b45ef01bb656bfe5fdd]|[[CS003,Spring_2002], [CS004,Summer_2002], [CS005,Fall_2002]]|2.0 |Ned Stark       |2002          |
+--------------------------+-------------------------------------------------------------+----+----------------+--------------+

studentsDF.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- courses_registered: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- CID: string (nullable = true)
 |    |    |-- cid: string (nullable = true)
 |    |    |-- sem: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- year_graduated: string (nullable = true)

N.B : _id field is different from id field. _id is automatically generated by MongoDB when a new data is inserted.

Write Data to Mongo.

Writing to MongoDB using the mongo-spark connector is very simple. I have used the SaveMode.Overwrite to append data to the table and avoid re-writing to table.

Here, I will be creating a DataFrame and use this dataframe to insert data in Mongo.

Steps to follow are:

  1. Create case classes for the required UDT.

    case class students_cc(id : Int, year_graduated : String, courses_registered : List[cid_sem], name : String)
    case class cid_sem(cid : String, sem : String)
    
    
  2. Create DataFrame based of off the case class

    val listOfCoursesSem = List(cid_sem("CS003", "Spring_2011"),
                cid_sem("CS006", "Summer_2011"),
                cid_sem("CS009", "Fall_2011")
            )
    
    val newStudents = Seq(students_cc(12, "2011", listOfCoursesSem, "Black Panther")).toDF()
    
    

    N.B: To use toDF(), we need to make use of our old friend import spark.implicits._ .

  3. write the DataFrame to MongoDB.

    MongoSpark.save(newStudents.write.mode(SaveMode.Overwrite))
    
    
  4. Verify if insert is successful

    val studentsData = MongoSpark.load(spark)
    studentsData.show(false)
    
    +--------------------------+----------------------------------------------------------------------------+---+----------------+--------------+
    |_id                       |courses_registered                                                          |id |name            |year_graduated|
    +--------------------------+----------------------------------------------------------------------------+---+----------------+--------------+
    |[5afcc577aebca2bc98a7135e]|[[null,CS003,Spring_2011], [null,CS006,Summer_2011], [null,CS009,Fall_2011]]|12 |Black Panther   |2011          |
    |[5afcc674ef01bb656bfe6066]|[[CS003,null,Spring_2002], [CS004,null,Summer_2002], [CS005,null,Fall_2002]]|3  |Haan Solo       |2002          |
    |[5afcc674ef01bb656bfe6067]|[[CS003,null,Spring_2002], [CS004,null,Summer_2002], [CS005,null,Fall_2002]]|2  |Ned Stark       |2002          |
    |[5afcc674ef01bb656bfe6068]|[[CS001,null,Spring_2001], [CS002,null,Summer_2001], [CS001,null,Fall_2001]]|1  |Tom Riddle      |2001          |
    |[5afcc674ef01bb656bfe6069]|[[CS004,null,Spring_2004], [CS005,null,Summer_2004], [CS003,null,Fall_2004]]|5  |Sheldon Cooper  |2004          |
    |[5afcc674ef01bb656bfe606a]|[[CS004,null,Spring_2006], [CS005,null,Summer_2006], [CS003,null,Fall_2006]]|7  |Stephan Hawkings|2006          |
    |[5afcc674ef01bb656bfe606b]|[[CS009,null,Spring_2003], [CS010,null,Summer_2003], [CS004,null,Fall_2003]]|4  |Frodo Baggins   |2003          |
    |[5afcc674ef01bb656bfe606c]|[[CS009,null,Spring_2005], [CS006,null,Summer_2005], [CS004,null,Fall_2005]]|6  |Tony Stark      |2005          |
    |[5afcc674ef01bb656bfe606d]|[[CS001,null,Spring_2007], [CS003,null,Summer_2007], [CS009,null,Fall_2007]]|8  |Cerci Lannister |2007          |
    |[5afcc674ef01bb656bfe606e]|[[CS006,null,Spring_2008], [CS007,null,Summer_2008], [CS009,null,Fall_2008]]|9  |Wonder Woman    |2008          |
    |[5afcc674ef01bb656bfe606f]|[[CS010,null,Spring_2009], [CS002,null,Summer_2009], [CS007,null,Fall_2009]]|10 |Hermione Granger|2009          |
    |[5afcc674ef01bb656bfe6070]|[[CS001,null,Spring_2010], [CS002,null,Summer_2010], [CS005,null,Fall_2010]]|11 |Peter Parker    |2010          |
    +--------------------------+----------------------------------------------------------------------------+---+----------------+--------------+
    

There… !! Black Panther is now a part of students collection

More Options

We can use additional optimization options to process larger data in a more efficient way. Here is a complete documentation.

Run with spark-submit

You can run the same using spark-submit

First, let’s go ahead and build the jar.

Pavans-MacBook-Pro:Spark_Mongo_Example pavanpkulkarni$ gradle clean build
Starting a Gradle Daemon (subsequent builds will be faster)

> Task :compileScala 
Pruning sources from previous analysis, due to incompatible CompileSetup.


BUILD SUCCESSFUL in 25s
3 actionable tasks: 3 executed

We now have build/libs/Spark_Mongo_Example-1.0.jar ready for deployment.

Run the spark-submit command as below.

Pavans-MacBook-Pro:Spark_Mongo_Example pavanpkulkarni$ spark-submit --master local[4] --verbose --class com.pavanpkulkarni.mongo.SparkScalaMongo build/libs/Spark_Mongo_Example-1.0.jar 

Using properties file: null
Parsed arguments:
  master                  local[4]
  deployMode              null
  executorMemory          null
  executorCores           null
  totalExecutorCores      null
  propertiesFile          null
  driverMemory            null
  driverCores             null
  driverExtraClassPath    null
  driverExtraLibraryPath  null
  driverExtraJavaOptions  null
  supervise               false
  queue                   null
  numExecutors            null
  files                   null
  pyFiles                 null
  archives                null
  mainClass               com.pavanpkulkarni.mongo.SparkScalaMongo
  primaryResource         file:/Users/pavanpkulkarni/Documents/workspace/Spark_Mongo_Example/build/libs/Spark_Mongo_Example-1.0.jar
  name                    com.pavanpkulkarni.mongo.SparkScalaMongo
  childArgs               []
  jars                    null
  packages                null
  packagesExclusions      null
  repositories            null
  verbose                 true

Spark properties used, including those specified through
 --conf and those from the properties file null:
  

    
Main class:
com.pavanpkulkarni.mongo.SparkScalaMongo
Arguments:

System properties:
(SPARK_SUBMIT,true)
(spark.app.name,com.pavanpkulkarni.mongo.SparkScalaMongo)
(spark.jars,*********(redacted))
(spark.submit.deployMode,client)
(spark.master,local[4])
Classpath elements:
file:/Users/pavanpkulkarni/Documents/workspace/Spark_Mongo_Example/build/libs/Spark_Mongo_Example-1.0.jar


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/05/16 21:37:55 INFO SparkContext: Running Spark version 2.2.1
18/05/16 21:37:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/05/16 21:37:56 INFO SparkContext: Submitted application: Spark_Mongo
18/05/16 21:37:56 INFO SecurityManager: Changing view acls to: pavanpkulkarni
18/05/16 21:37:56 INFO SecurityManager: Changing modify acls to: pavanpkulkarni
18/05/16 21:37:56 INFO SecurityManager: Changing view acls groups to: 
18/05/16 21:37:56 INFO SecurityManager: Changing modify acls groups to: 
18/05/16 21:37:56 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(pavanpkulkarni); groups with view permissions: Set(); users  with modify permissions: Set(pavanpkulkarni); groups with modify permissions: Set()
18/05/16 21:37:56 INFO Utils: Successfully started service 'sparkDriver' on port 51168.
18/05/16 21:37:56 INFO SparkEnv: Registering MapOutputTracker

.
.
.
.
.

18/05/16 21:38:03 INFO DAGScheduler: ResultStage 3 (show at SparkScalaMongo.scala:47) finished in 0.029 s
18/05/16 21:38:03 INFO DAGScheduler: Job 3 finished: show at SparkScalaMongo.scala:47, took 0.042215 s
+--------------------------+----------------------------------------------------------------------------+---+----------------+--------------+
|_id                       |courses_registered                                                          |id |name            |year_graduated|
+--------------------------+----------------------------------------------------------------------------+---+----------------+--------------+
|[5afcc577aebca2bc98a7135e]|[[null,CS003,Spring_2011], [null,CS006,Summer_2011], [null,CS009,Fall_2011]]|12 |Black Panther   |2011          |
|[5afcc674ef01bb656bfe6066]|[[CS003,null,Spring_2002], [CS004,null,Summer_2002], [CS005,null,Fall_2002]]|3  |Haan Solo       |2002          |
|[5afcc674ef01bb656bfe6067]|[[CS003,null,Spring_2002], [CS004,null,Summer_2002], [CS005,null,Fall_2002]]|2  |Ned Stark       |2002          |
|[5afcc674ef01bb656bfe6068]|[[CS001,null,Spring_2001], [CS002,null,Summer_2001], [CS001,null,Fall_2001]]|1  |Tom Riddle      |2001          |
|[5afcc674ef01bb656bfe6069]|[[CS004,null,Spring_2004], [CS005,null,Summer_2004], [CS003,null,Fall_2004]]|5  |Sheldon Cooper  |2004          |
|[5afcc674ef01bb656bfe606a]|[[CS004,null,Spring_2006], [CS005,null,Summer_2006], [CS003,null,Fall_2006]]|7  |Stephan Hawkings|2006          |
|[5afcc674ef01bb656bfe606b]|[[CS009,null,Spring_2003], [CS010,null,Summer_2003], [CS004,null,Fall_2003]]|4  |Frodo Baggins   |2003          |
|[5afcc674ef01bb656bfe606c]|[[CS009,null,Spring_2005], [CS006,null,Summer_2005], [CS004,null,Fall_2005]]|6  |Tony Stark      |2005          |
|[5afcc674ef01bb656bfe606d]|[[CS001,null,Spring_2007], [CS003,null,Summer_2007], [CS009,null,Fall_2007]]|8  |Cerci Lannister |2007          |
|[5afcc674ef01bb656bfe606e]|[[CS006,null,Spring_2008], [CS007,null,Summer_2008], [CS009,null,Fall_2008]]|9  |Wonder Woman    |2008          |
|[5afcc674ef01bb656bfe606f]|[[CS010,null,Spring_2009], [CS002,null,Summer_2009], [CS007,null,Fall_2009]]|10 |Hermione Granger|2009          |
|[5afcc674ef01bb656bfe6070]|[[CS001,null,Spring_2010], [CS002,null,Summer_2010], [CS005,null,Fall_2010]]|11 |Peter Parker    |2010          |
+--------------------------+----------------------------------------------------------------------------+---+----------------+--------------+

18/05/16 21:38:03 INFO ContextCleaner: Cleaned accumulator 99
18/05/16 21:38:03 INFO SparkContext: Invoking stop() from shutdown hook
18/05/16 21:38:03 INFO BlockManagerInfo: Removed broadcast_6_piece0 on 10.0.0.67:51169 in memory (size: 411.0 B, free: 366.3 MB)
18/05/16 21:38:03 INFO MongoClientCache: Closing MongoClient: [127.0.0.1:27017]
18/05/16 21:38:03 INFO connection: Closed connection [connectionId{localValue:2, serverValue:57}] to 127.0.0.1:27017 because the pool has been closed.
18/05/16 21:38:03 INFO BlockManagerInfo: Removed broadcast_7_piece0 on 10.0.0.67:51169 in memory (size: 6.0 KB, free: 366.3 MB)
18/05/16 21:38:03 INFO BlockManagerInfo: Removed broadcast_4_piece0 on 10.0.0.67:51169 in memory (size: 411.0 B, free: 366.3 MB)
18/05/16 21:38:03 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 10.0.0.67:51169 in memory (size: 6.0 KB, free: 366.3 MB)
18/05/16 21:38:03 INFO BlockManagerInfo: Removed broadcast_5_piece0 on 10.0.0.67:51169 in memory (size: 2.7 KB, free: 366.3 MB)
18/05/16 21:38:03 INFO ContextCleaner: Cleaned accumulator 98
18/05/16 21:38:03 INFO SparkUI: Stopped Spark web UI at http://10.0.0.67:4040
18/05/16 21:38:03 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/05/16 21:38:03 INFO MemoryStore: MemoryStore cleared
18/05/16 21:38:03 INFO BlockManager: BlockManager stopped
18/05/16 21:38:03 INFO BlockManagerMaster: BlockManagerMaster stopped
18/05/16 21:38:03 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/05/16 21:38:03 INFO SparkContext: Successfully stopped SparkContext
18/05/16 21:38:03 INFO ShutdownHookManager: Shutdown hook called
18/05/16 21:38:03 INFO ShutdownHookManager: Deleting directory /private/var/folders/nb/ygmwx13x6y1_9pyzg1_82w440000gn/T/spark-7172f78f-0852-43f6-887d-a2dce687af3f

Being an avid Eclipse fan, I was experimenting with IntelliJ for this project. I seem to like IntelliJ over PyCharm for its ease of importing gradle dependencies and ease of setting up projects. Having said that, I will continue using IntelliJ until such time where I feel .. “nah.. Eclipse could’ve done this better !!!”

References

  1. https://docs.mongodb.com/spark-connector/master/
  2. https://docs.mongodb.com/spark-connector/current/scala-api/
  3. https://spark.apache.org/docs/latest/sql-programming-guide.html