Spark Structured Streaming - Introduction (1/3)

A brief introduction to Spark Structured Streaming

Pavan Kulkarni

10 minute read

Structured Streaming is a new of looking at realtime streaming. With abstraction on DataFrame and DataSets, structured streaming provides alternative for the well known Spark Streaming. Structured Streaming is built on top of Spark SQL Engine. Some of the main features of Structured Streaming are -

  1. Reads streams as infinite table.
  2. Fault-tolerance guarantees through check pointing and Write Ahead Logs (WAL)
  3. Ensures end-to-end exactly-once.
  4. Streaming aggregations, event-time windows, stream-to-batch joins
  5. Continuous Processing (Experimental - Available from Spark 2.3) and many more

We will see each of these featured in detail in the coming posts. The streams are computed on highly optimized Spark SQL Engine. The queries are processed in micro-batches which allows batch like processing on Streams. This is an important point to achieve low end-to-end latencies.

Spark Streaming (D-Streams) vs Spark Structured Streaming

Most of us have heard of Spark Streaming and often mistake Structured Streaming with Spark Streaming D-Stream. We will see some major differences in these 2.

Spark Streaming (D-Streams) Spark Structured Streaming
RDD based streaming Dataframe/ Dataset based streaming
Batch time has to be specified inside the application Uses micro-batch
Higher latency is batch size is not properly assigned Low latency achieved through micro-batches
Triggers are bound often not customizable and are bound by batch and window time Highly customizable triggers that can manipulate processing and events times separately
Update mode not available Sink can be treated as infinite table where new data can be updated as it arrives



Buzzwords in Structured Streaming

Let’s look at some of the core concepts of Structured Streaming.

  1. Output Modes
  2. Fault Tolerance
  3. List of Sources
  4. List of Sinks
  5. Schema Inference and Partitioning Data
  6. Watermarks
  7. Joins
  8. Triggers


Output Modes :

Structured Streaming provides Output mode to control the data behaviour at sink. These mode are:

  1. Append Mode - This is the default mode, where only the new rows added to the Result Table since the last trigger will be thrown as output to the sink.

    Usage :  df.spark.outputMode("append")
    
  2. Update Mode - Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink.

    Usage :  df.spark.outputMode("update")
    
  3. Complete Mode - The whole Result Table will be output to the sink after every trigger. This is supported for aggregation queries.

    Usage :  df.spark.outputMode("complete")
    

Matrix of Supported Queries :

Query Type Watermark Append Mode Update Mode Complete Mode
Aggregation Yes Yes Yes Yes
Aggregation No No Yes Yes
Queries with joins N/A Yes No No

Fault Tolerance (Checkpoint Directory) :

Structured Streaming provides a robust fault tolerance by maintaining a checkout directory and write ahead logs. The checkpoint directory also ensures that the query progress and running aggregates are stored in the specified path.

   Usage:  df.spark.option("checkpointLocation", "path/to/HDFS/dir")

List of Sources :

Input streams for creating streaming Dataframes are created using DataStreamReader interface. Below is a matrix of list of sources

Matrix of Supported Sources :

Source Supported Types Options Fault Tolerant Notes
File text, csv, json, orc, parquet path: path to the input directory, and common to all file formats.
maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max)
latestFirst: whether to processs the latest new files first, useful when there is a large backlog of files (default: false)
fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false).
Yes Does not support multiple comma-separated paths
Socket Reads UTF8 text data from a socket connection host: host to connect to, must be specified
port: port to connect to, must be specified
No Used only for testing
Rate Generates data at the specified number of rows per second rowsPerSecond (default: 1): How many rows should be generated per second.
rampUpTime (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes rowsPerSecond. Using finer granularities than seconds will be truncated to integer seconds.
numPartitions (e.g. 10, default: Spark’s default parallelism): The partition number for the generated rows.
Yes This source is intended for testing and benchmarking.
Kafka kafka Refer Kafka integration docs Yes

List of Sinks :

Sinks are created from DataStreamWriter.

Matrix of Supported Sinks

Sink Supported Output Modes Options Fault Tolerant Notes
File Append path: path to the output directory, must be specified Yes (exactly-once) Supports writes to partitioned tables. Partitioning by time may be useful.
Kafka Append, Update, Complete Refer Kafka integration docs Yes (at-least-once)
Foreach Append, Update, Complete None. This is a custom sink Depends on ForeachWriter implementation Implementation Details
Console Append, Update, Complete numRows: Number of rows to print every trigger (default: 20)
truncate: Whether to truncate the output if too long (default: true)
No
Memory Append, Complete - No

Schema Inference and Partitioning Data :

Structured Streaming provides a clean way of partition discovery. This way we can organize our dataset in a more logical way. By partitioning the data, we can further improve the fetch performance of SQL Engine especially when data is loaded in Hive tables or any partitioned schema.

To achieve this, we need to name the file landing directories as path/to/data/key=value/. Example is shown in my GitHub Repo. The output directories in the path path/to/data/key=value/ are created when writing output, when you specify key as part of schema. We will see this is a live demo in the upcoming post.

N.B: Make sure you have atleast one partition created before running the query.

Watermarks :

Watermarking is an awesome way to capture late data. By specifying the maximum threshold on the timestamp field, we can enusre that all the late data is accounted for in our processing. At this point, I would like to introduce the concept of Processing Time and Event Time.

  1. Processing Time - This is the time at which data reached Spark engine for processing.
  2. Event Time - This is the time at which an event occurred.

E.g. : An IoT sensor captures an event at 13:30 CST and this event reaches spark engine in window of 13:30 - 13:45. Here the Event time is 13:30. Say for some reason, the event occurring at 13:32 is seen by Spark engine at 13:50, this data could be rejected by spark because it is out-of-order and has arrived late, out-of-window period to engine. We do not want to lose this delayed data. From Spark 2.1 onwards, watermark is introduced to account for such late data scenarios. Here you can specify a threshold from the time of occurrence when data should be captured. Let’s say, engine is processing window of 13:45 - 14:00 and a watermark of “15 minutes” is specified on event_time column, spark will consider the data of coming in at 13:32 despite the fact that window 13:30 - 13:45 has elapsed. The data at 13:32 is now capture in window of 13:45 - 14:00 thus ensuring no data loss.

From Spark Docs :
For a specific window starting at time T, the engine will maintain state and allow late data to update the state until (max event time seen by the engine - late threshold > T). In other words, late data within the threshold will be aggregated, but data later than the threshold will start getting dropped

While watermark appears tempting to avoid data loss, we need to be careful while using it. Watermarks can be used only in Append or Update mode. There should be an event time column in the schema. When using aggregation, withWatermark API should be called on same timestamp column as mentioned in the aggregation. withWatermark must be called before the aggregation for the watermark details to be used.

Joins :

Structured Streaming supports both static-stream joins and stream-stream joins. The result of the streaming join is generated incrementally, similar to the results of streaming aggregations in the previous section.

Pointers on stream-stream joins : Any row received from one input stream can match with any future, yet-to-be-received row from the other input stream. Hence, for both the input streams, we buffer past input as streaming state, so that we can match every future input with past input and accordingly generate joined results. The challenge od stream-stream join is that at any given point of time, the data from either of the sources could be empty. This may cause delay while using Outer, Left or Right joins.

  1. in Inner joins, to avoid unbounded state, you have to define additional join conditions such that indefinitely old inputs cannot match with future inputs and therefore can be cleared from the state. In other words
  2. Define a constraint on event-time across the two inputs such that the engine can figure out when old rows of one input is not going to be required (i.e. will not satisfy the time constraint) for matches with the other input. This constraint can be defined in one of the two ways. Time range join conditions (e.g. …JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR) OR Join on event-time windows (e.g. …JOIN ON leftTimeWindow = rightTimeWindow).
  3. watermark + event-time constraints is mandatory for left and right outer joins.This is because for generating the NULL results in outer join, the engine must know when an input row is not going to match with anything in future.

Below matrix will explain different types of joins.

Left Input Right Input Type of Join Supported Notes
Static Static all Yes
Static Stream Inner Yes
  • Not Stateful
Static Stream Left Outer No
Static Stream Right Outer Yes
  • Not Stateful
Static Stream Full Outer No
Stream Static Inner Yes
  • Not Stateful
Stream Static Left Outer Yes
  • Not Stateful
Stream Static Right Outer No
Stream Static Full Outer No
Stream Stream Inner Yes
  • optionally specify watermark on both sides + time constraints for state cleanup
Stream Stream Left Outer Yes
  • must specify watermark on right + time constraints for correct results, optionally specify watermark on left for all state cleanup
  • Specify Time range join condition or Join on event-time windows
  • E.g : leftDfWithWatermark.join(rightDfWithWatermark, expr(""" leftDfId = rightDfId AND leftDfTime >= rightDfTime AND leftDfTime <= rightDfTime + interval 1 hour"""), joinType = "leftOuter" )
Stream Stream Right Outer Yes
  • must specify watermark on right + time constraints for correct results, optionally specify watermark on right for all state cleanup
  • Specify Time range join condition or Join on event-time windows
  • E.g : leftDfWithWatermark.join(rightDfWithWatermark, expr(""" leftDfId = rightDfId AND leftDfTime >= rightDfTime AND leftDfTime <= rightDfTime + interval 1 hour"""), joinType = "rightOuter" )
Stream Stream Full Outer No

Halt !!!! ✋ .. There are few caveats we need to keep in mind before proceeding

  1. A watermark delay of “2 hours” guarantees that the engine will never drop any data that is less than 2 hours delayed. But data delayed by more than 2 hours may or may not get processed.
  2. The NULL values in outer joins will be generated with a delay. This is because the engine will wait long enough to make sure there are no joins in future.
  3. The generation of outer joins will be delayed. This is because each micro-batch is adavanced at the end of watermark. The next micro-batch uses updated watermark to clean-up the state and produce the output. If any of the two input streams being joined does not receive data for a while, the outer (both cases, left or right) output may get delayed.

Triggers :

Trigger can be considered an alternative approach to specify windows and batch time. Triggers provide options to specify timing of streaming data processing, whether the query is going to executed as micro-batch query with a fixed batch interval or as a continuous processing query

Trigger Type Description Example
Unspecified The micro-batches will be executes ASAP i.e, as soon as they arrive df.writeStream.format(“console”).start()
Fixed Interval The micro-batches are kicked off at specified interval. Summarizing 3 scenarios :
  • if the previous micro-batch completes before interval, engine will wait for the interval to complete before starting the next micro-batch
  • if the micro-batch takes more time than interval, the next micro-batch will be processed after the processing previous micro-batch
  • <
  • if no data arrives, no micro-batch will be processed
  • /ul>
df.writeStream.format(“console”).trigger(Trigger.ProcessingTime(“2 seconds”)).start() Trigger API Doc
One Time Only one micro-batch is executed to process all the available data in the engine. df.writeStream.format(“console”).trigger(Trigger.Once()).start()
Continuous Process (Experimental) The query will be executed in the new low-latency, continuous processing mode. df.writeStream.format(“console”).trigger(Trigger.Continuous(“1 second”)).start() More on Continous Process


Where to go form here

Let’s look at some demo examples. These examples are available in my GitHub Repo

  1. Word Count over socket (network)
  2. File to File Real-time streaming.

References

  1. https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
  2. https://github.com/jaceklaskowski/spark-structured-streaming-book