Spark Structured Streaming - Socket Word Count (2/3)

Socket Word Count demo for Spark Structured Streaming

Pavan Kulkarni

2 minute read

Structured Streaming is a new of looking at realtime streaming. In this post we will see how to build our very first Structured Streaming app to perform Word Count over network.

Socket Word Count

  1. Initialize the spark session as

            val spark = SparkSession
                .builder()
                .master("local")
                .appName("Socket_Streaming")
                .getOrCreate()
    
  2. Next, we need to initialize a ReadStream

            val socketStreamDf = spark.readStream
                .format("socket")
                .option("host", "localhost")
                .option("port", 9999)
                .load()
    

    Let’s look into details. Here,

    • readstream : belong to org.apache.spark.sql.streaming.DataStreamReader. This is used to load streaming data from external storage systems
    • format(“socket”) : the source of our streaming data is socket
    • .option(“host”, “localhost”).option(“port”, 9999) : These options specify the hostname and port number of socket

    The dataframe socketStreamDf is now a streaming dataframe and we can perform all the Dataframe operations on socketStreamDf.

  3. Now, we go ahead and perform WordCount on this dataframe

            import spark.implicits._
            val words = socketStreamDf.as[String].flatMap(_.split(" "))
            val wordCounts = words.groupBy("value").count()
    
  4. We then go ahead and create a streaming query that will output the wordcount to console sink.

            val query = wordCounts.writeStream
                .outputMode("complete")
                .format("console")
                .start()
    

    Let’s look into the details,

    • writeStream : belong to org.apache.spark.sql.streaming.DataStreamWriter. This is used to write a streaming data from external storage systems
    • format(“console”) : sink to output the results in of type console.
    • .outputMode(“complete”) : This specifies that the result will be output to the sink after every trigger
    • start() : Finally, we start the streaming process.
  5. The StreamingQuery object created when a query is started can be used to monitor and manage the query. We use awaitTermination to specify the app to block until query is terminated, with stop() or with error

    query.awaitTermination()
    

Full code is available in my GitHub Repo

TO RUN

  1. Open Terminal and type

    Pavans-MacBook-Pro:Spark_Streaming_Examples pavanpkulkarni$ nc -lk 9999
    
  2. Run the spark application. Make sure you run the above nc -lk command before running the streaming application.

  3. Start typing in terminal

    Pavans-MacBook-Pro:Spark_Streaming_Examples pavanpkulkarni$ nc -lk 9999
    hi welcome to spark streaming
    welcme welcome again
    
  4. You will see the output on the output in console as:

    -------------------------------------------
    Batch: 1
    -------------------------------------------
    +---------+-----+
    |    value|count|
    +---------+-----+
    |  welcome|    2|
    |streaming|    1|
    |    again|    1|
    |    spark|    1|
    |       hi|    1|
    |   welcme|    1|
    |       to|    1|
    +---------+-----+
    

The word counts will start appending to dataframe in realtime as you the you keep typing in terminal.

References:

  1. https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html