Recent Evolution of Zero Data Loss Guarantee in Spark Streaming With Kafka
When properly deployed, Spark Streaming 1.2 provides zero data loss guarantee. To enjoy this mission-critical feature, you need to fulfil following prerequisites:
- The input data comes from reliable source and reliable receivers
- Application metadata is checkpointed by the application driver
- Write ahead log is enabled
Let’s briefly describe these prerequisites. In this blog post, we focus on Kafka integration and try to cover as little details as possible to understand them.
Reliable sources and reliable receivers
For some input sources (including Kafka), Spark Streaming can acknowledge the received data. The input data is first received by receivers and then stored in Spark with replication (by default, data is replicated to two Spark executors for fault-tolerance). Once data is replicated in Spark, the receiver can acknowledge it (e.g. by updating offsets in ZooKeeper in case of consuming from Kafka). This ensures that no input data is lost when the receiver suddenly crashes while receiving input data – the received and not yet replicated data simply won’t be acknowledged, so it will be resent by the source when the receiver eventually recovers.
The reliable sources and reliable receivers allow us to recover from the receiver failure (or the executor or the server where the receiver process runs). The much more tricky thing, however, is to recover from the driver failure. Several techniques are introduced to make it possible. One of them is checkpointing application metadata. With this feature, the driver saves the application-critical metadata to a fault-tolerant storage like HDFS or S3 to restore them later in the event of recovery from failure. The metadata includes:
- a list of queued but not yet completed batches (just the metadata, not data)
Thanks to the metadata checkpointing, the driver is able to reconstruct the application and determine how far the application had gone before the driver failure.
Possible data loss scenario
Surprisingly or not, reliable sources, reliable receivers and metadata checkpointing are still not enough to prevent from potential data loss. Imagine a following bad scenario:
- Two executors successfully receive input data from the receiver and buffer it in their memory
- Receiver acknowledges the input source
- Executors start processing the buffered data according to the application code
- The driver suddenly fails
- By design, when the driver fails, all of its executors are killed too
- Since the executors are killed, all data buffered in their memory gets lost. In consequence, the acknowledged and buffered but yet not processed data is lost
- Buffered data can’t be recovered, because it’s stored only in executors memory, so the data is lost forever
It sounds really bad for mission-critical applications, doesn’t it?
Write ahead log
To address the unfortunate scenario described in the previous section, the Write Ahead Log (WAL) was introduced in Spark Streaming 1.2.
With the WAL enabled, all received data is additionally written by receiver to the checkpoint directory in a fault-tolerant file system such as HDFS or S3. Thanks to that it can be re-read from there when the driver is recovered and buffered data in executors memory is lost. In this simple way, Spark Streaming provides a mechanism to avoid data loss on driver recovery!
Although the WAL ensures the zero data loss, it doesn’t guarantee an exactly-once semantics for all sources. Imagine the following bad scenario that might happen when integrating Spark Streaming with Kafka:
- Receiver receives input data and stores it reliably in WAL
- Receiver fails before updating the Kafka offsets in Zookeeper
- Spark Streaming assumes that input data has been successfully received (because it’s written to WAL), while Kafka thinks that data hasn’t been yet consumed (because the corresponding offsets haven’t been updated in ZooKeeper)
- After a moment, the receiver recovers from failure
- The unprocessed, but saved data is read from the WAL
- Once all data is read from the WAL, the receivers resume consumption from Kafka. Because the receiver uses the Kafka High-Level Consumer API, it starts consuming data from the place where offsets in ZooKeeper currently point to. Since the offsets hasn’t been updated when the receiver failed, some data is processed twice
Apart from the scenario described above, WAL has two other non-negligible disadvantages:
- WAL reduces the throughput of receivers as the received data must be stored reliably in distributed file-system (the workaround is to start more receivers)
- For some input sources, it duplicates the same data. For instance, when reading from Kafka, you keep one copy of data in Kafka brokers (because other Kafka consumers might want to consume it) and second copy in Spark Streaming (in the form of WAL stored in Hadoop API compatible file system)
Kafka direct API
To address the performance penalty introduced by WAL and ensure the exactly-once semantics, the experimental feature, called Kafka direct API, has been implemented in Spark Streaming 1.3.
This idea for this feature is brilliant. The Spark driver simply calculates what ranges of Kafka’s offsets define the next computation batch and then it instructs Spark executors consume corresponding data directly from Kafka’s topics and partitions. In other words, this approach treats Kafka as a filesystem and consumes data from a topic like from a file.
In this simple, but powerful design:
- there are no longer Kafka receivers needed – executors consume data directly from Kafka using Simple Consumer API
- the WAL is not used – we can always re-consume data from Kafka if case recovering from failures
- the exactly-once semantics is guaranteed – we won’t reread any duplicate data from the WAL
Please note that Kafka direct API is an experimental feature in Spark 1.3.
This blog post briefly describe how Spark Streaming has continuously added various features that together provide the zero data loss guarantee and the exactly-once semantics (especially when reading from Kafka). It’s nice to see that each version of Spark Streaming improves the existing design, makes it more robust and ready for more and more demanding production use-cases.