Use-cases/Project
16 min read

Integration tests of Spark applications

Integration-test-spark-applications

You just finished the Apache Spark-based application.

You ran spark-submit so many times, you just know the app works exactly as expected: it loads the input files, then wrangles the data according to the specification, finally, it saves the results in some permanent storage like HDFS or AWS S3. The application is deployed to the scheduler and it works!

The next day you got a bug report — one of the cases does not work as expected. “Ah, I know, let me fix it quickly”, you say, then you apply a fix on the code. Are you sure the change didn’t break the existing logic?

The next month the operations team decides to install brand new Spark 3.0. They ask you if the application is compatible with the upgraded version. “I hope so”, you say, then you run the application a few times, it looks OK. Are you sure you covered all the cases?

After a couple of weeks, the data operations team informs you that the schema of input data will slightly change. They ask you if the application can handle it. “Yes, I think so, but I’d better check!”, you say, then again you run some manual tests on just created sample data. It seems to handle the change well. Are you sure it will work on production?

Unit tests are not enough?

Writing an Apache Spark application does not differ from creating any other application. A responsible developer should provide not only the working code but also a set of unit tests that prove the implementation was right in an automated way. Unit tests should cover the smallest possible units of code, like UDFs or DataFrames/DataSets API operations on input data. It’s easy to create unit tests for both because you can mock the input Dataframe, run the function, and finally — check the output using the local Spark context.

What is quite challenging to test with unit tests is creating testcases that would answer questions like:

  • will my code work if the input data are malformed?
  • does my spark.read call correctly recognizes the partitions?
  • does the app loads the data from paths where the data is expected?
  • what happens if the input data are not there yet?
  • does the app de-duplicate data with the previous run output in the proper way?
  • are created Hive partitions readable after the application finishes?
  • does the code properly infer the schema of CSV/JSON files?
  • is the application idempotent? (do re-runs create the same output as one run?)

A solution to the challenges listed above is focusing not on the unit tests, but on the “external” tests of the application itself — running the application in the simulated environment and testing if the results match the expectations of the given testcase, like:

  • proper exit code or error message,
  • creating the data in expected format with expected content,
  • updating the metadata (tables and partitions).

In the next paragraphs of this article, I will refer to these tests as integration tests, similar to how we used to name tests of the web services that simulate the input call by the client and verify how the state changed within the service and what result was returned to the user.

Our first Spark integration test

Let’s start with a simple example. Imagine you have to write a simple ETL:

  • it’s job is to enrich incoming data using simple join in daily runs
  • the main data source format in parquet, it’s daily-partitioned and contains ad-related events like ad-impression or ad-click of a user
  • second dataset format is in JSON and it contains ads details
  • there is a requirement to store separately events that can’t be joined (when the ads metadata is not available)

The Apache Spark code that would implement this logic looks as follows:

from pyspark.sql import SparkSession
from argparse import ArgumentParser

# parse arguments
parser = ArgumentParser()
parser.add_argument('--input-events', help='Events, parquet format')
parser.add_argument('--input-ads', help='Ads, JSON format')
parser.add_argument('--output-joined', help='Output location of enriched data')
parser.add_argument('--output-invalid', help='Invalid data')
parser.add_argument('--dt', help='Date partition indicator')
args = parser.parse_args()

# load the data
spark = SparkSession.builder.getOrCreate()
all_events = spark.read.parquet(args.input_events)
events = all_events.where(all_events.dt == args.dt)
ads = spark.read.json(args.input_ads)

# save the reults
events.join(ads, events.ad_id == ads.id) \
    .write.parquet(f'{args.output_joined}/dt={args.dt}') 

events.join(ads, events.ad_id == ads.id, 'leftanti') \
    .write.parquet(f'{args.output_invalid}/dt={args.dt}') 

As the job is intended to run in the production environment, it’s usually scheduled with Oozie or Airflow, so dt parameter is dynamic. Also, the paths (2 inputs and 2 outputs) are locations on HDFS, S3 or other storage systems. When it comes to integration tests, we don’t want the testing process to depend on any particular schedule or external locations. In the usual web services integration tests we would need to mock some kind of distributed storage to ensure all dependent interfaces are available. Luckily, Hadoop’s Filesystem API has a simple implementation that uses the local filesystem for read/write operations, so we don’t need any extra effort here. Also, we will stick to one sample date in the testcase.

The usual testing scenario is composed of the 3 sections:

  • given - a section where you prepare mocks/stubs/samples to create a simulated, controlled testing environment
  • when - a section where you actually call your function/application on the given data
  • then - final section, comparing if the results of when match the expectations.

In the next chapters, we’re going to implement all these 3 sections for a simple test scenario.

“Given” — mocking input files

Every time the application starts, it expects two input datasets:

  • events - in parquet format, daily-partitioned
  • ads - in JSON format, with no partitioning

There are 2 ways of supplying them to the testcase — by creating samples (and storing them in the repository) or generating them in runtime. While the first method saves some time, it’s not the best practice to store binary parquet files in the repository and it’s not that flexible when it comes to schema evolution (a developer needs to create a new set of testing files). Instead, we will create them in the testcase run itself.

import unittest
import shutil
import os
import json
from datetime import datetime

from pyspark.sql import SparkSession

class TestIntegration(unittest.TestCase):
    INPUT_EVENTS = "/tmp/input_events"
    INPUT_ADS = "/tmp/input_ads"
    OUTPUT_JOINED = "/tmp/output_joined"
    OUTPUT_INVALID = "/tmp/output_invalid"

    def test_enrichment(self):
        # given
        self.add_event(
            ts=datetime(2020, 3, 31, 13, 15),
            user_id='USER1',
            ad_id='AD1')
        self.add_ad(
            id='AD1',
            name='Sample ad'
        )
        
        ### TODO

    def add_event(self, ts, user_id, ad_id):
        self.spark.createDataFrame(
            [(ts, user_id, ad_id)],
            ['ts', 'user_id', 'ad_id']) \
            .write.parquet(f'{self.INPUT_EVENTS}/dt={ts.date()}', mode='append')

    def add_ad(self, id, name):
        with open(f'{self.INPUT_ADS}/sample.json', 'a+') as f:
            json.dump({'id': id, 'name': name}, f)
            f.write('\n')

    def setUp(self):
        for path in [self.INPUT_EVENTS, self.INPUT_ADS,
                     self.OUTPUT_JOINED, self.OUTPUT_INVALID]:
            shutil.rmtree(path, True)
            os.makedirs(path)

    @classmethod
    def setUpClass(cls):
        cls.spark = SparkSession.builder.enableHiveSupport().getOrCreate()

    @classmethod
    def tearDownClass(cls):
        cls.spark.stop()

As you can see, our testcase starts a global SparkSession and stops it when tests are done. The setUp function ensures there are no leftovers from previous runs. Also, two helper functions hide logic of creating input data either with Spark’s local context (writing parquet file) or simple JSON write using python’s base library.

“When” — executing Spark application

When we have created the mock data, it’s time to start the ETL. In unit tests we would just call a function of other class, but here we will simulate spark-submit run:

import subprocess

class TestIntegration(unittest.TestCase):

    def test_enrichment(self):
        # given
        self.add_event(
            ts=datetime(2020, 3, 31, 13, 15),
            user_id='USER1',
            ad_id='AD1')
        self.add_ad(
            id='AD1',
            name='Sample ad'
        )

        # when
        exit_code = self.run_app("2020-03-31")
        
        ### TODO
        
        
    def run_app(self, date):
        return subprocess.run(
            [
                "spark-submit",
                "--conf","spark.sql.shuffle.partitions=1",
                "job.py",
                "--dt", date,
                "--input-events", self.INPUT_EVENTS,
                "--input-ads", self.INPUT_ADS,
                "--output-joined", self.OUTPUT_JOINED,
                "--output-invalid", self.OUTPUT_INVALID
            ],
            stderr=subprocess.DEVNULL,
        ).returncode

As you can see, the complexity of running spark-submit is covered in the helper function. The paths are globally static and the only dynamic parameter is the input date (that can be static within one testcase as well, as it depends on a given section). spark-submit call uses the default parameters except for the number of shuffle partitions — limited to 1, as we do not plan to join big sets and it speeds up the execution of the tests a lot.

“Then” —validating the output data

Finally, when the resulting data is written in the local filesystem, we can reuse our Spark context to verify that the business logic of the ETL works as expected.

class TestIntegration(unittest.TestCase):

    def test_enrichment(self):
        # given
        self.add_event(
            ts=datetime(2020, 3, 31, 13, 15),
            user_id='USER1',
            ad_id='AD1')
        self.add_ad(
            id='AD1',
            name='Sample ad'
        )

        # when
        exit_code = self.run_app("2020-03-31")

        # then -> ETL succeeded
        self.assertEqual(exit_code, 0)

        # then -> verify one joined file
        joined = self.spark.read.parquet(self.OUTPUT_JOINED)
        self.assertEqual(joined.where(joined.dt == "2020-03-31").count(), 1)
        record = joined.where(joined.dt == "2020-03-31").first()
        self.assertEquals(record.ts, datetime(2020, 3, 31, 13, 15))
        self.assertEquals(record.user_id, 'USER1')
        self.assertEquals(record.name, 'Sample ad')

        # then -> verify no invalid rows
        invalid = self.spark.read.parquet(self.OUTPUT_INVALID)
        self.assertEqual(invalid.where(invalid.dt == "2020-03-31").count(), 0)

It is useful to begin with a simple assertion for the exit code of the application being 0, meaning the Spark executed the entire application successfully. Later, we validate the output datasets, one after another: verifying the structure of a generated record in the joined dataset and checking that there were no invalid records saved.

Production-grade integration tests

The above how-to steps describe one test scenario, but the structure allows us to write as many testcases as needed. To ensure that they work in the same way in every environment, like developer’s laptops or Continuous Integration systems, it is recommended to use Docker image. For my testcases I usually use bde2020/spark-base images — they are simple, small, and do not need any additional configuration to start.

Summary

As you can see, writing production-grade integration tests for Spark applications doesn’t involve any magic. It is simple, 3-steps work:

  1. Create input data,
  2. Run the application,
  3. Verify the outputs.

It would be possible to use Test Driven Development, but based on my experience, it’s not the easiest way to develop Spark ETLs. Often, data engineer’s work is mostly data exploration and it would be quite hard to assume how the input data looks like and how to load it. But, having at least one testcase allows you to fix bugs in TDD way — create data that caused faulty execution (in given part), specify how the application should behave (in then), make the test passing by fixing the application code. Finally, you have a bug-free code without even running it on real data. And there is no risk of regression, as the testcase will be always executed before the next releases.

Integration tests are also a great place for examples on how to run the scripts and what the input parameters mean. Moreover, they allow to change the input data (“given” section) and check how the application would behave when the new input schema is applied or some new format of the data appears.

One of the major drawbacks of integration tests is the execution time. Depending on the complexity, it may take even more than 10 seconds to execute one testcase. Therefore, it’s optimal to put more than one scenario into the testcase, as for our example it could be one input dataset with one valid and one invalid event, testing both outputs.

The proper tests do not rely on the implementation details. It means that if you decide to rewrite your application from Python to Scala, or even from Spark to Flink, you can still reuse the same integrations tests set to prove the changes do not break the requirements.

big data
technology
hdfs
spark
8 April 2020

Want more? Check our articles

managingmultipledatasourceobszar roboczy 1 4
Tutorial

Feature store - managing multiple data sources with Feast

As the effort to productionize ML workflows is growing, feature stores are also growing in importance. Their job is to provide standardized and up-to…

Read more
blog7

5 main data-related trends to be covered at Big Data Tech Warsaw 2021 Part II

Trend 4. Larger clouds over the Big Data landscape  A decade ago,  only a few companies ran their Big Data infrastructure and pipelines in the public…

Read more
kedro dynamic pipelinesobszar roboczy 1 4
Tutorial

Kedro Dynamic Pipelines

“How can I generate Kedro pipelines dynamically?” - is one of the most commonly asked questions on Kedro Slack. I’m a member of Kedro’s Technical…

Read more
getindata amundsen feast machine learining notext
Tutorial

Machine Learning Features discovery with Feast and Amundsen

One of the main challenges of today's Machine Learning initiatives is the need for a centralized store of high-quality data that can be reused by Data…

Read more
nifiobszar roboczy 1 3 3x 100
Tutorial

Apache NiFi: A Complete Guide E-book.

We are proud to present you our first e-book, created by GetInData specialists. Apache NiFi: A Complete Guide is the result of long and fruitful work…

Read more
introducinggeiparquetobszar roboczy 1 4
Tutorial

Introducing the Geoparquet data format

The need for a unified format for geospatial data In recent years, a lot of geospatial frameworks have been created to process and analyze big…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.


What did you find most impressive about GetInData?

They did a very good job in finding people that fitted in Acast both technically as well as culturally.
Type the form or send a e-mail: hello@getindata.com
The administrator of your personal data is GetInData Poland Sp. z o.o. with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the Terms & Conditions. For more information on personal data processing and your rights please see Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy