Running Observability Stack on Grafana
Introduction At GetInData, we understand the value of full observability across our application stacks. For our Customers, we always recommend…
Read more
You just finished the Apache Spark-based application.
You ran spark-submit
so many times, you just know the app works exactly as expected: it loads the input files, then wrangles the data according to the specification, finally, it saves the results in some permanent storage like HDFS or AWS S3. The application is deployed to the scheduler and it works!
The next day you got a bug report — one of the cases does not work as expected. “Ah, I know, let me fix it quickly”, you say, then you apply a fix on the code. Are you sure the change didn’t break the existing logic?
The next month the operations team decides to install brand new Spark 3.0. They ask you if the application is compatible with the upgraded version. “I hope so”, you say, then you run the application a few times, it looks OK. Are you sure you covered all the cases?
After a couple of weeks, the data operations team informs you that the schema of input data will slightly change. They ask you if the application can handle it. “Yes, I think so, but I’d better check!”, you say, then again you run some manual tests on just created sample data. It seems to handle the change well. Are you sure it will work on production?
Writing an Apache Spark application does not differ from creating any other application. A responsible developer should provide not only the working code but also a set of unit tests that prove the implementation was right in an automated way. Unit tests should cover the smallest possible units of code, like UDFs or DataFrames/DataSets API operations on input data. It’s easy to create unit tests for both because you can mock the input Dataframe, run the function, and finally — check the output using the local Spark context.
What is quite challenging to test with unit tests is creating testcases that would answer questions like:
spark.read
call correctly recognizes the partitions?A solution to the challenges listed above is focusing not on the unit tests, but on the “external” tests of the application itself — running the application in the simulated environment and testing if the results match the expectations of the given testcase, like:
In the next paragraphs of this article, I will refer to these tests as integration tests, similar to how we used to name tests of the web services that simulate the input call by the client and verify how the state changed within the service and what result was returned to the user.
Let’s start with a simple example. Imagine you have to write a simple ETL:
The Apache Spark code that would implement this logic looks as follows:
from pyspark.sql import SparkSession
from argparse import ArgumentParser
# parse arguments
parser = ArgumentParser()
parser.add_argument('--input-events', help='Events, parquet format')
parser.add_argument('--input-ads', help='Ads, JSON format')
parser.add_argument('--output-joined', help='Output location of enriched data')
parser.add_argument('--output-invalid', help='Invalid data')
parser.add_argument('--dt', help='Date partition indicator')
args = parser.parse_args()
# load the data
spark = SparkSession.builder.getOrCreate()
all_events = spark.read.parquet(args.input_events)
events = all_events.where(all_events.dt == args.dt)
ads = spark.read.json(args.input_ads)
# save the reults
events.join(ads, events.ad_id == ads.id) \
.write.parquet(f'{args.output_joined}/dt={args.dt}')
events.join(ads, events.ad_id == ads.id, 'leftanti') \
.write.parquet(f'{args.output_invalid}/dt={args.dt}')
As the job is intended to run in the production environment, it’s usually scheduled with Oozie or Airflow, so dt
parameter is dynamic. Also, the paths (2 inputs and 2 outputs) are locations on HDFS, S3 or other storage systems. When it comes to integration tests, we don’t want the testing process to depend on any particular schedule or external locations. In the usual web services integration tests we would need to mock some kind of distributed storage to ensure all dependent interfaces are available. Luckily, Hadoop’s Filesystem API has a simple implementation that uses the local filesystem for read/write operations, so we don’t need any extra effort here. Also, we will stick to one sample date in the testcase.
The usual testing scenario is composed of the 3 sections:
In the next chapters, we’re going to implement all these 3 sections for a simple test scenario.
Every time the application starts, it expects two input datasets:
There are 2 ways of supplying them to the testcase — by creating samples (and storing them in the repository) or generating them in runtime. While the first method saves some time, it’s not the best practice to store binary parquet files in the repository and it’s not that flexible when it comes to schema evolution (a developer needs to create a new set of testing files). Instead, we will create them in the testcase run itself.
import unittest
import shutil
import os
import json
from datetime import datetime
from pyspark.sql import SparkSession
class TestIntegration(unittest.TestCase):
INPUT_EVENTS = "/tmp/input_events"
INPUT_ADS = "/tmp/input_ads"
OUTPUT_JOINED = "/tmp/output_joined"
OUTPUT_INVALID = "/tmp/output_invalid"
def test_enrichment(self):
# given
self.add_event(
ts=datetime(2020, 3, 31, 13, 15),
user_id='USER1',
ad_id='AD1')
self.add_ad(
id='AD1',
name='Sample ad'
)
### TODO
def add_event(self, ts, user_id, ad_id):
self.spark.createDataFrame(
[(ts, user_id, ad_id)],
['ts', 'user_id', 'ad_id']) \
.write.parquet(f'{self.INPUT_EVENTS}/dt={ts.date()}', mode='append')
def add_ad(self, id, name):
with open(f'{self.INPUT_ADS}/sample.json', 'a+') as f:
json.dump({'id': id, 'name': name}, f)
f.write('\n')
def setUp(self):
for path in [self.INPUT_EVENTS, self.INPUT_ADS,
self.OUTPUT_JOINED, self.OUTPUT_INVALID]:
shutil.rmtree(path, True)
os.makedirs(path)
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder.enableHiveSupport().getOrCreate()
@classmethod
def tearDownClass(cls):
cls.spark.stop()
As you can see, our testcase starts a global SparkSession
and stops it when tests are done. The setUp
function ensures there are no leftovers from previous runs. Also, two helper functions hide logic of creating input data either with Spark’s local context (writing parquet file) or simple JSON write using python’s base library.
When we have created the mock data, it’s time to start the ETL. In unit tests we would just call a function of other class, but here we will simulate spark-submit
run:
import subprocess
class TestIntegration(unittest.TestCase):
def test_enrichment(self):
# given
self.add_event(
ts=datetime(2020, 3, 31, 13, 15),
user_id='USER1',
ad_id='AD1')
self.add_ad(
id='AD1',
name='Sample ad'
)
# when
exit_code = self.run_app("2020-03-31")
### TODO
def run_app(self, date):
return subprocess.run(
[
"spark-submit",
"--conf","spark.sql.shuffle.partitions=1",
"job.py",
"--dt", date,
"--input-events", self.INPUT_EVENTS,
"--input-ads", self.INPUT_ADS,
"--output-joined", self.OUTPUT_JOINED,
"--output-invalid", self.OUTPUT_INVALID
],
stderr=subprocess.DEVNULL,
).returncode
As you can see, the complexity of running spark-submit
is covered in the helper function. The paths are globally static and the only dynamic parameter is the input date (that can be static within one testcase as well, as it depends on a given section). spark-submit
call uses the default parameters except for the number of shuffle partitions — limited to 1, as we do not plan to join big sets and it speeds up the execution of the tests a lot.
Finally, when the resulting data is written in the local filesystem, we can reuse our Spark context to verify that the business logic of the ETL works as expected.
class TestIntegration(unittest.TestCase):
def test_enrichment(self):
# given
self.add_event(
ts=datetime(2020, 3, 31, 13, 15),
user_id='USER1',
ad_id='AD1')
self.add_ad(
id='AD1',
name='Sample ad'
)
# when
exit_code = self.run_app("2020-03-31")
# then -> ETL succeeded
self.assertEqual(exit_code, 0)
# then -> verify one joined file
joined = self.spark.read.parquet(self.OUTPUT_JOINED)
self.assertEqual(joined.where(joined.dt == "2020-03-31").count(), 1)
record = joined.where(joined.dt == "2020-03-31").first()
self.assertEquals(record.ts, datetime(2020, 3, 31, 13, 15))
self.assertEquals(record.user_id, 'USER1')
self.assertEquals(record.name, 'Sample ad')
# then -> verify no invalid rows
invalid = self.spark.read.parquet(self.OUTPUT_INVALID)
self.assertEqual(invalid.where(invalid.dt == "2020-03-31").count(), 0)
It is useful to begin with a simple assertion for the exit code of the application being 0, meaning the Spark executed the entire application successfully. Later, we validate the output datasets, one after another: verifying the structure of a generated record in the joined dataset and checking that there were no invalid records saved.
The above how-to steps describe one test scenario, but the structure allows us to write as many testcases as needed. To ensure that they work in the same way in every environment, like developer’s laptops or Continuous Integration systems, it is recommended to use Docker image. For my testcases I usually use bde2020/spark-base images — they are simple, small, and do not need any additional configuration to start.
As you can see, writing production-grade integration tests for Spark applications doesn’t involve any magic. It is simple, 3-steps work:
It would be possible to use Test Driven Development, but based on my experience, it’s not the easiest way to develop Spark ETLs. Often, data engineer’s work is mostly data exploration and it would be quite hard to assume how the input data looks like and how to load it. But, having at least one testcase allows you to fix bugs in TDD way — create data that caused faulty execution (in given part), specify how the application should behave (in then), make the test passing by fixing the application code. Finally, you have a bug-free code without even running it on real data. And there is no risk of regression, as the testcase will be always executed before the next releases.
Integration tests are also a great place for examples on how to run the scripts and what the input parameters mean. Moreover, they allow to change the input data (“given” section) and check how the application would behave when the new input schema is applied or some new format of the data appears.
One of the major drawbacks of integration tests is the execution time. Depending on the complexity, it may take even more than 10 seconds to execute one testcase. Therefore, it’s optimal to put more than one scenario into the testcase, as for our example it could be one input dataset with one valid and one invalid event, testing both outputs.
The proper tests do not rely on the implementation details. It means that if you decide to rewrite your application from Python to Scala, or even from Spark to Flink, you can still reuse the same integrations tests set to prove the changes do not break the requirements.
Introduction At GetInData, we understand the value of full observability across our application stacks. For our Customers, we always recommend…
Read moreOne of the core features of an MLOps platform is the capability of tracking and recording experiments, which can then be shared and compared. It also…
Read moreLearning new technologies is like falling in love. At the beginning, you enjoy it totally and it is like wearing pink glasses that prevent you from…
Read moreLast month, I had the pleasure of performing at the latest Flink Forward event organized by Ververica in Seattle. Having been a part of the Flink…
Read moreInterested in joining the data analytics world? Not sure where to start? Are more and more questions popping into your head? I’ve been there myself…
Read moreAI regulatory initiatives of EU countries On April 21, 2021, the EU Commission adopted a proposal for a regulation on artificial intelligence…
Read moreTogether, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.
What did you find most impressive about GetInData?