Spark Structured Streaming - File-to-File Real-time Streaming (3/3)

CSV File to JSON File Real Time Streaming Example

Pavan Kulkarni

6 minute read

In this post we will see how to build a simple application to process file to file real time processing.

Most of the clients I have worked with so far still rely on files - either CSV, TSV or JSON. These applications are usually batch processed. In this new age world of realtime processing it would be great to move these batch processes to some kind of streaming process that can provide realtime data processing.

We will see in this post how to process data from a CSV file to JSON file in realtime.

Set up to this is similar to all our previous Spark Examples.

Let’s build a use case

We have 2 directories,

  • src/main/resources/input/cutomer_info which contains a static file with Customer information
  • src/main/resources/input/orders in which CSV files with order details are dropped periodically. This directory is partitioned date-wise as shown below

    Pavans-MacBook-Pro:Spark_Streaming_Examples pavanpkulkarni$ cd src/main/resources/input/
    Pavans-MacBook-Pro:input pavanpkulkarni$ ls -ltr *
    cutomer_info:
    total 8
    -rw-r--r--  1 pavanpkulkarni  staff  58 Jun 12 11:30 customer.csv
    
    orders:
    total 0
    drwxr-xr-x  3 pavanpkulkarni  staff  96 Jun 12 11:30 date=2018-06-02
    drwxr-xr-x  3 pavanpkulkarni  staff  96 Jun 12 11:54 date=2018-06-04
    drwxr-xr-x  3 pavanpkulkarni  staff  96 Jun 12 11:57 date=2018-06-03
    drwxr-xr-x  3 pavanpkulkarni  staff  96 Jun 12 13:00 date=2018-06-05
    drwxr-xr-x  3 pavanpkulkarni  staff  96 Jul  2 12:08 date=2018-06-01
    Pavans-MacBook-Pro:input pavanpkulkarni$
    

Objective here is to join the order details from CSV file with the customer information file, and write the resulting data to JSON file as output in real-time.

Let’s Talk Scala !

  1. We have customer information is as follows

    Pavans-MacBook-Pro:input pavanpkulkarni$ cat cutomer_info/customer.csv
    id,name,location
    1,kash,VT
    2,pavan,IL
    3,john,CA
    4,jane,NJ
    Pavans-MacBook-Pro:input pavanpkulkarni$
    
    
  2. Sample CSV data with order information is :

    Pavans-MacBook-Pro:input pavanpkulkarni$ cat orders/date\=2018-06-01/data.csv
    id,pid,pname,date
    1,011,p11,2018-06-01
    2,012,p12,2018-06-01
    1,012,p12,2018-06-01
    2,023,p23,2018-06-01
    2,034,p34,2018-06-01
    3,034,p34,2018-06-01
    
    
  3. Now we begin by initializing Spark context

        //initialize the spark session
            val spark = SparkSession
                .builder()
                .master("local")
                .appName("File_Streaming")
                .getOrCreate()
    
  4. In order to stream data from CSV file, we need to define a schema for the data. Spark will not allow streaming of CSV data, unless the schema is defined.

    val schema = StructType(
                Array(StructField("customer_id", StringType),
                    StructField("pid", StringType),
                    StructField("pname", StringType),
                    StructField("date", StringType)))
    
    //stream the orders from the csv files.
    val ordersStreamDF = spark
                .readStream
                .option("header", "true")
                .schema(schema)
                .csv(args(0))
    
  5. Read the customer information from the static file and store it in a static dataset

    case class Customer(customer_id : String, customer_name: String, customer_location: String)
    
    import spark.implicits._
    
    //get the static customer data
    val customerDS = spark.read
                .format("csv")
                .option("header", true)
                .load("src/main/resources/input/cutomer_info/customer.csv")
                .map(x => Customer(x.getString(0), x.getString(1), x.getString(2)))
    
  6. Join the streaming dataframe ordersStreamDF with dataset customerDS on the customer_id field.

    val finalResult = ordersStreamDF.join(customerDS, "customer_id")
    

    The resultant dataframe is now a streaming dataframe containing the resultant aggregation.

  7. Using the above stremaing dataframe we can write data to any source supported by Spark

    //write the joined stream to json/parquet output.
    val query = finalResult
                .writeStream
                .queryName("count_customer")
                //.format("console")
                .outputMode("append")
                .format("json")
                .partitionBy("date")
                .option("path", "src/main/resources/output/")
                .option("checkpointLocation", "src/main/resources/chkpoint_dir")
                .start()
    

Here,

  • format(“console”) : can be used for debugging purpose by printing the values on console.
  • outputMode(“append”).format(“json”) : Write the output in append mode to JSON files.
  • partitionBy(“date”) : The output is partitioned date-wise. Partitioning data is one of the good strategies to adopt for performance improvement.
  • option(“path”, “src/main/resources/output/”) : Specify output path to dump the data as JSON files. The partitioned directories are created based on the date field of data.
  • option(“checkpointLocation”, “src/main/resources/chkpoint_dir”) : Specify a path for checkpoint directory for fault tolerance.

The full code can be found in my Github Repo

Let’s get Streaming Started !

  1. Run the project as Scala project in IDE.
  2. Once the streaming job starts, you will see the chkpoint_dir and output directories created.

    Pavans-MacBook-Pro:resources pavanpkulkarni$ ls -ltr
    total 0
    drwxr-xr-x  4 pavanpkulkarni  staff  128 Jul  2 12:08 input
    drwxr-xr-x  8 pavanpkulkarni  staff  256 Jul  6 16:08 output
    drwxr-xr-x  7 pavanpkulkarni  staff  224 Jul  6 16:08 chkpoint_dir
    
  3. The output directory will now contain partitioned subdirectories based off of the date field in schema.

        Pavans-MacBook-Pro:resources pavanpkulkarni$ ls -ltr output/
        total 0
        drwxr-xr-x  4 pavanpkulkarni  staff  128 Jul  6 16:08 date=2018-06-05
        drwxr-xr-x  4 pavanpkulkarni  staff  128 Jul  6 16:08 date=2018-06-04
        drwxr-xr-x  4 pavanpkulkarni  staff  128 Jul  6 16:08 date=2018-06-03
        drwxr-xr-x  4 pavanpkulkarni  staff  128 Jul  6 16:08 date=2018-06-02
        drwxr-xr-x  4 pavanpkulkarni  staff  128 Jul  6 16:08 date=2018-06-01
        drwxr-xr-x  3 pavanpkulkarni  staff   96 Jul  6 16:08 _spark_metadata
    
    

    N.B: It is important for us to maintain /path/to/input/key=value so that we achieve desired partitioning. Here key is the field name from schema (date in our demo) and value will be the values of that column.

  4. Let’s look at the data for partition date=2018-06-01.

    Pavans-MacBook-Pro:resources pavanpkulkarni$ ls -ltra output/date\=2018-06-01/
    total 16
    -rw-r--r--  1 pavanpkulkarni  staff   16 Jul  6 16:07 .part-00000-56b3e0bd-705f-4b15-8a9f-61d6954dd1f2.c000.json.crc
    -rw-r--r--  1 pavanpkulkarni  staff  567 Jul  6 16:07 part-00000-56b3e0bd-705f-4b15-8a9f-61d6954dd1f2.c000.json
    drwxr-xr-x  8 pavanpkulkarni  staff  256 Jul  6 16:08 ..
    drwxr-xr-x  4 pavanpkulkarni  staff  128 Jul  6 16:08 .
    
    Pavans-MacBook-Pro:resources pavanpkulkarni$ cat output/date\=2018-06-01/part-00000-56b3e0bd-705f-4b15-8a9f-61d6954dd1f2.c000.json
    {"customer_id":"1","pid":"011","pname":"p11","customer_name":"kash","customer_location":"VT"}
    {"customer_id":"2","pid":"012","pname":"p12","customer_name":"pavan","customer_location":"IL"}
    {"customer_id":"1","pid":"012","pname":"p12","customer_name":"kash","customer_location":"VT"}
    {"customer_id":"2","pid":"023","pname":"p23","customer_name":"pavan","customer_location":"IL"}
    {"customer_id":"2","pid":"034","pname":"p34","customer_name":"pavan","customer_location":"IL"}
    {"customer_id":"3","pid":"034","pname":"p34","customer_name":"john","customer_location":"CA"}
    

    As seen here, the JSON file is aggregated value of both the static customer information and the orders information.

Bonus advantages of this application

The application does not stop here. It keeps getting awesome !!

Imagine a situation where you get additional file to process. We need to make sure that this file to avoid data loss. Let’s go ahead add new data file in the input/orders/date\=2018-06-01 directory.

    Pavans-MacBook-Pro:resources pavanpkulkarni$ ls -ltr input/orders/date\=2018-06-01/
    total 16
    -rw-r--r--  1 pavanpkulkarni  staff  144 Jun 12 11:29 data.csv
    -rw-r--r--  1 pavanpkulkarni  staff   79 Jul  6 16:32 data_new.csv
    
    Pavans-MacBook-Pro:resources pavanpkulkarni$ cat input/orders/date\=2018-06-01/data_new.csv
    id,pid,pname,date
    2,012,p34,2018-06-01
    3,003,p3,2018-06-01
    4,004,p4,2018-06-01

As soon as the new file is detected by the Spark engine, the streaming job is initiated and we can see the JSON file almost immediately. The most awesome part is that, a new JSON file will be created in the same partition.

Pavans-MacBook-Pro:resources pavanpkulkarni$ ls -ltra output/date\=2018-06-01/
total 32
-rw-r--r--  1 pavanpkulkarni  staff   16 Jul  6 16:07 .part-00000-56b3e0bd-705f-4b15-8a9f-61d6954dd1f2.c000.json.crc
-rw-r--r--  1 pavanpkulkarni  staff  567 Jul  6 16:07 part-00000-56b3e0bd-705f-4b15-8a9f-61d6954dd1f2.c000.json
-rw-r--r--  1 pavanpkulkarni  staff   12 Jul  6 16:32 .part-00000-70a945d1-2397-416b-8b80-e96e7c31758d.c000.json.crc
-rw-r--r--  1 pavanpkulkarni  staff  281 Jul  6 16:32 part-00000-70a945d1-2397-416b-8b80-e96e7c31758d.c000.json
drwxr-xr-x  8 pavanpkulkarni  staff  256 Jul  6 16:32 ..
drwxr-xr-x  6 pavanpkulkarni  staff  192 Jul  6 16:32 .

Checkpoint directory maintains the state of the engine and processes the new files from there on. By doing so, we can avoid re-running if the job for every new file or late file arrival. Another advantage is that the Spark engine will stay idle until the data arrives. Thus saving us the computation power which is a great way for cost reduction.

All the files can be viewed in my GitHub Repo

References:

  1. https://spark.apache.org/docs/latest/streaming-programming-guide.html
  2. https://spark.apache.org/docs/latest/rdd-programming-guide.html