You just finished the Apache Spark-based application.
You ran spark-submit
so many times, you just know the app works exactly as expected: it loads the input files, then wrangles the data according to the specification, finally, it saves the results in some permanent storage like HDFS or AWS S3. The application is deployed to the scheduler and it works!
The next day you got a bug report — one of the cases does not work as expected. “Ah, I know, let me fix it quickly”, you say, then you apply a fix on the code. Are you sure the change didn’t break the existing logic?
The next month the operations team decides to install brand new Spark 3.0. They ask you if the application is compatible with the upgraded version. “I hope so”, you say, then you run the application a few times, it looks OK. Are you sure you covered all the cases?
After a couple of weeks, the data operations team informs you that the schema of input data will slightly change. They ask you if the application can handle it. “Yes, I think so, but I’d better check!”, you say, then again you run some manual tests on just created sample data. It seems to handle the change well. Are you sure it will work on production?
Unit tests are not enough?
Writing an Apache Spark application does not differ from creating any other application. A responsible developer should provide not only the working code but also a set of unit tests that prove the implementation was right in an automated way. Unit tests should cover the smallest possible units of code, like UDFs or DataFrames/DataSets API operations on input data. It’s easy to create unit tests for both because you can mock the input Dataframe, run the function, and finally — check the output using the local Spark context.
What is quite challenging to test with unit tests is creating testcases that would answer questions like:
- will my code work if the input data are malformed?
- does my
spark.read
call correctly recognizes the partitions? - does the app loads the data from paths where the data is expected?
- what happens if the input data are not there yet?
- does the app de-duplicate data with the previous run output in the proper way?
- are created Hive partitions readable after the application finishes?
- does the code properly infer the schema of CSV/JSON files?
- is the application idempotent? (do re-runs create the same output as one run?)
A solution to the challenges listed above is focusing not on the unit tests, but on the “external” tests of the application itself — running the application in the simulated environment and testing if the results match the expectations of the given testcase, like:
- proper exit code or error message,
- creating the data in expected format with expected content,
- updating the metadata (tables and partitions).
In the next paragraphs of this article, I will refer to these tests as integration tests, similar to how we used to name tests of the web services that simulate the input call by the client and verify how the state changed within the service and what result was returned to the user.
Our first Spark integration test
Let’s start with a simple example. Imagine you have to write a simple ETL:
- it’s job is to enrich incoming data using simple join in daily runs
- the main data source format in parquet, it’s daily-partitioned and contains ad-related events like ad-impression or ad-click of a user
- second dataset format is in JSON and it contains ads details
- there is a requirement to store separately events that can’t be joined (when the ads metadata is not available)
The Apache Spark code that would implement this logic looks as follows:
from pyspark.sql import SparkSession
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument('--input-events', help='Events, parquet format')
parser.add_argument('--input-ads', help='Ads, JSON format')
parser.add_argument('--output-joined', help='Output location of enriched data')
parser.add_argument('--output-invalid', help='Invalid data')
parser.add_argument('--dt', help='Date partition indicator')
args = parser.parse_args()
spark = SparkSession.builder.getOrCreate()
all_events = spark.read.parquet(args.input_events)
events = all_events.where(all_events.dt == args.dt)
ads = spark.read.json(args.input_ads)
events.join(ads, events.ad_id == ads.id) \
.write.parquet(f'{args.output_joined}/dt={args.dt}')
events.join(ads, events.ad_id == ads.id, 'leftanti') \
.write.parquet(f'{args.output_invalid}/dt={args.dt}')
As the job is intended to run in the production environment, it’s usually scheduled with Oozie or Airflow, so dt
parameter is dynamic. Also, the paths (2 inputs and 2 outputs) are locations on HDFS, S3 or other storage systems. When it comes to integration tests, we don’t want the testing process to depend on any particular schedule or external locations. In the usual web services integration tests we would need to mock some kind of distributed storage to ensure all dependent interfaces are available. Luckily, Hadoop’s Filesystem API has a simple implementation that uses the local filesystem for read/write operations, so we don’t need any extra effort here. Also, we will stick to one sample date in the testcase.
The usual testing scenario is composed of the 3 sections:
- given - a section where you prepare mocks/stubs/samples to create a simulated, controlled testing environment
- when - a section where you actually call your function/application on the given data
- then - final section, comparing if the results of when match the expectations.
In the next chapters, we’re going to implement all these 3 sections for a simple test scenario.
“Given” — mocking input files
Every time the application starts, it expects two input datasets:
- events - in parquet format, daily-partitioned
- ads - in JSON format, with no partitioning
There are 2 ways of supplying them to the testcase — by creating samples (and storing them in the repository) or generating them in runtime. While the first method saves some time, it’s not the best practice to store binary parquet files in the repository and it’s not that flexible when it comes to schema evolution (a developer needs to create a new set of testing files). Instead, we will create them in the testcase run itself.
import unittest
import shutil
import os
import json
from datetime import datetime
from pyspark.sql import SparkSession
class TestIntegration(unittest.TestCase):
INPUT_EVENTS = "/tmp/input_events"
INPUT_ADS = "/tmp/input_ads"
OUTPUT_JOINED = "/tmp/output_joined"
OUTPUT_INVALID = "/tmp/output_invalid"
def test_enrichment(self):
self.add_event(
ts=datetime(2020, 3, 31, 13, 15),
user_id='USER1',
ad_id='AD1')
self.add_ad(
id='AD1',
name='Sample ad'
)
def add_event(self, ts, user_id, ad_id):
self.spark.createDataFrame(
[(ts, user_id, ad_id)],
['ts', 'user_id', 'ad_id']) \
.write.parquet(f'{self.INPUT_EVENTS}/dt={ts.date()}', mode='append')
def add_ad(self, id, name):
with open(f'{self.INPUT_ADS}/sample.json', 'a+') as f:
json.dump({'id': id, 'name': name}, f)
f.write('\n')
def setUp(self):
for path in [self.INPUT_EVENTS, self.INPUT_ADS,
self.OUTPUT_JOINED, self.OUTPUT_INVALID]:
shutil.rmtree(path, True)
os.makedirs(path)
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder.enableHiveSupport().getOrCreate()
@classmethod
def tearDownClass(cls):
cls.spark.stop()
As you can see, our testcase starts a global SparkSession
and stops it when tests are done. The setUp
function ensures there are no leftovers from previous runs. Also, two helper functions hide logic of creating input data either with Spark’s local context (writing parquet file) or simple JSON write using python’s base library.
“When” — executing Spark application
When we have created the mock data, it’s time to start the ETL. In unit tests we would just call a function of other class, but here we will simulate spark-submit
run:
import subprocess
class TestIntegration(unittest.TestCase):
def test_enrichment(self):
self.add_event(
ts=datetime(2020, 3, 31, 13, 15),
user_id='USER1',
ad_id='AD1')
self.add_ad(
id='AD1',
name='Sample ad'
)
exit_code = self.run_app("2020-03-31")
def run_app(self, date):
return subprocess.run(
[
"spark-submit",
"--conf","spark.sql.shuffle.partitions=1",
"job.py",
"--dt", date,
"--input-events", self.INPUT_EVENTS,
"--input-ads", self.INPUT_ADS,
"--output-joined", self.OUTPUT_JOINED,
"--output-invalid", self.OUTPUT_INVALID
],
stderr=subprocess.DEVNULL,
).returncode
As you can see, the complexity of running spark-submit
is covered in the helper function. The paths are globally static and the only dynamic parameter is the input date (that can be static within one testcase as well, as it depends on a given section). spark-submit
call uses the default parameters except for the number of shuffle partitions — limited to 1, as we do not plan to join big sets and it speeds up the execution of the tests a lot.
“Then” —validating the output data
Finally, when the resulting data is written in the local filesystem, we can reuse our Spark context to verify that the business logic of the ETL works as expected.
class TestIntegration(unittest.TestCase):
def test_enrichment(self):
self.add_event(
ts=datetime(2020, 3, 31, 13, 15),
user_id='USER1',
ad_id='AD1')
self.add_ad(
id='AD1',
name='Sample ad'
)
exit_code = self.run_app("2020-03-31")
self.assertEqual(exit_code, 0)
joined = self.spark.read.parquet(self.OUTPUT_JOINED)
self.assertEqual(joined.where(joined.dt == "2020-03-31").count(), 1)
record = joined.where(joined.dt == "2020-03-31").first()
self.assertEquals(record.ts, datetime(2020, 3, 31, 13, 15))
self.assertEquals(record.user_id, 'USER1')
self.assertEquals(record.name, 'Sample ad')
invalid = self.spark.read.parquet(self.OUTPUT_INVALID)
self.assertEqual(invalid.where(invalid.dt == "2020-03-31").count(), 0)
It is useful to begin with a simple assertion for the exit code of the application being 0, meaning the Spark executed the entire application successfully. Later, we validate the output datasets, one after another: verifying the structure of a generated record in the joined dataset and checking that there were no invalid records saved.
Production-grade integration tests
The above how-to steps describe one test scenario, but the structure allows us to write as many testcases as needed. To ensure that they work in the same way in every environment, like developer’s laptops or Continuous Integration systems, it is recommended to use Docker image. For my testcases I usually use bde2020/spark-base images — they are simple, small, and do not need any additional configuration to start.
Summary
As you can see, writing production-grade integration tests for Spark applications doesn’t involve any magic. It is simple, 3-steps work:
- Create input data,
- Run the application,
- Verify the outputs.
It would be possible to use Test Driven Development, but based on my experience, it’s not the easiest way to develop Spark ETLs. Often, data engineer’s work is mostly data exploration and it would be quite hard to assume how the input data looks like and how to load it. But, having at least one testcase allows you to fix bugs in TDD way — create data that caused faulty execution (in given part), specify how the application should behave (in then), make the test passing by fixing the application code. Finally, you have a bug-free code without even running it on real data. And there is no risk of regression, as the testcase will be always executed before the next releases.
Integration tests are also a great place for examples on how to run the scripts and what the input parameters mean. Moreover, they allow to change the input data (“given” section) and check how the application would behave when the new input schema is applied or some new format of the data appears.
One of the major drawbacks of integration tests is the execution time. Depending on the complexity, it may take even more than 10 seconds to execute one testcase. Therefore, it’s optimal to put more than one scenario into the testcase, as for our example it could be one input dataset with one valid and one invalid event, testing both outputs.
The proper tests do not rely on the implementation details. It means that if you decide to rewrite your application from Python to Scala, or even from Spark to Flink, you can still reuse the same integrations tests set to prove the changes do not break the requirements.