Streaming analytics better than classic batch – when and why ? – Part 2

In the first part of this blog post we described a number of challenges that need to be addressed when implementing data pipelines with technologies like Pig, Scalding, Spark, Spark Streaming or Storm. For instance, if we want to sessionize our events correctly with these technologies, we have to implement complex custom code to deal with batch boundaries, out of order or late events.

Solving a Streaming Problem

Let’s ask ourselves a question – wouldn’t it be easier if we approach those problems with a different mindset? Do we really want to split our data into hourly batches or micro batches rather than handling them as a stream as they truly arrive? Do we actually solve a streaming problem with periodic batch or micro-batch jobs? Is it just because we always found stream processing too hard to implement or didn’t have right tools to do so? At GetInData we believe that modern stream processing engines allow us to process data in a simple, fun and what is most important correct way.

One such tool we have in mind is Apache Flink. Just like with Spark Streaming we can access Kafka events directly, but this time we process them in their native representation – as a stream instead of batch abstraction. Of course it does not impose that the further processing will be more complicated. Quite the opposite, what we want to show you with some code examples. This means no more data lakes, just the flowing river.

flink-pipeline

Example Implementation

We implemented user sessionization in Flink to show you how easily all those problems can be resolved. All that just in few lines of code. The use-case that we are solving with this example is to count how long user listens to music in a single session (case A) or how many consecutive songs she or he plays from a particular playlist (case B) as shown in the picture below.

case_example

The first step of our pipeline is reading events from Kafka. Flink provides a Kafka consumer that, with the help of internal checkpointing mechanism, gives us the power of exactly-once processing. All we need to do is provide simple connection parameters like topic or Kafka broker address.

sEnv.addSource(new FlinkKafkaConsumer09[Event](conf.topic(),
    getSerializationSchema,
    kafkaProperties(conf.kafkaBroker()))
)

Next we have to create user sessions – first by grouping incoming events by userId as a key and then assigning to session windows. It is as simple as specifying a gap between events that constitute a window and all the event-time magic will happen automatically underneath.

.keyBy(_.userId)             
.window(EventTimeSessionWindows.withGap(Time.minutes(15)))

We can complete the most basic example with a computation function that will be applied for each user session window. As a result we have a nice processing pipeline that handles out of order events that fits into five lines of code. Isn’t it neat?

val sessionStream : DataStream[SessionStats] = sEnv
    .addSource(new FlinkKafkaConsumer09[Event](...))
    .keyBy(_.userId)             
    .window(EventTimeSessionWindows.withGap(Time.minutes(15)))            
    .apply(new CountSessionStats())

Advanced time handling

We do not stop at the basic example because there is still some room for improvement. For example we want to be able to handle late events. Flink also comes with a solution for that real-world issue. Let’s assume that we know some events may be late by maximum amount of time e.g 60 minutes. We just need to set that parameter in our pipeline. Flink will keep all windows’ state for that extra period, so that when some outdated events come, we can react to them by updating our aggregates.

.allowedLateness(Time.minutes(60))

What else can we ask for? A good idea is to shorten the feedback loop. We can do that by writing an early firing trigger that will emit each window every few minutes with intermediate results.

.trigger(EarlyTriggeringTrigger.every(Time.minutes(10)))

We believe that example shows pretty well that all time-specific issues, even hard ones, become simple and robust with Flink. For a complete working example have a look at our github project

Beyond the code

Though the code examples are descriptive, not every aspect of stream processing can be expressed in them. What is not visible from the provided case study is that Flink:

  • is still a very efficient engine with both low latency and high throughput
  • provides exactly-once stateful processing
  • makes operational tasks easier

All those features are highly rated as confirmed by results of recent user survey here and here.

Follow-up Questions

Implementing an example use case is just the first step of introducing a new thrilling feature into our solution. When we have some working proof of concept many further questions arrive. Let’s consider some of them.

How can I reprocess data?

Many of us consider the possibility to redo their processing job easily as a main advantage of batch processing. Just relaunch the batch job on the same input. Looks true, but a side comment is that it is never that easy, because you may have some downstream jobs that already consumed the output of the job, so they need to redo their processing too or somehow accept a replayed output.

savepoints

Good news is that you can do such reprocessing in streaming with Flink as well! This can be achieved with a feature called “savepoints”. A savepoint is a snapshots of the state of your job (like Kafka offsets, in-memory key-value pairs, on-flight sessions). You can create “savepoints” of a Flink job periodically – let’s say each day at 12 PM.

Whenever you want to restart your streaming job, you just start it with the previously stored savepoint that will trigger computation from the saved Kafka offsets and reconstruct on-flight sessions and state. In other words, it will rewind time to the moment savepoint was taken. Of course the side problem of downstream jobs mentioned before still exist, but how to solve it is out of scope of any framework, at least yet.

Ah, and one more great thing is you can modify your application code when reprocessing. Savepoints are not tightly coupled with the exact code version that produced it!

Check out short but sweet presentation from Stephan Ewen how savepoints work with session windows here.

What if data is no longer in Kafka?

It’s straightforward to consume events from Kafka in Flink, but not always all your data is located in there. Very often, Kafka is not the permanent storage, but only source of truth in our infrastructure. Common thing is to apply some retention policy and store historical data in filesystem like HDFS. Still, one may want to reprocess data from, let’s say, a year ago.

Useful thing about modern streaming frameworks like Flink, is that we can consume  data from HDFS with the same streaming code. There are though a few things to keep in mind here. First of all, you need to provide input data in correct order, which streaming job expects, which is ordered by event-time. In order to do that you have to provide a proper watermark generation mechanism.

One idea would be to generate a watermark after all data from HDFS has been read. Unfortunately, if we were to process data from a whole year this way, we could easily exhaust all available memory for the required state.

Another possibility would be to dump our events from Kafka into files structured in such way that we could generate watermark more often. There is no out-of-the-box tooling for that purpose, but it can be achieved with a custom SourceFunction and proper conventions regarding data partitioning in HDFS directories and files naming. This custom source would have to be able to leverage such information as a range of Kafka offsets covered in a particular file that could be stored in the filename.

The last thing one has to consider is that there is also no ready-to-use solution for automatic switching between Kafka and HDFS based on the currently required offset.

How to join with other data sets/streams?

Joins are another big and common thing that is extensively used in batch processing. It is common that not all data comes from the same place. One such example is when you join user events with user profile details to add richer user information to the pipeline. These details might slowly change in time. In batch it is handled seamlessly. Can streaming framework handle such joins for you as well? Yes! Well, at least in the nearest future.

Flink distinguishes two join types:

  • Joining windowed streams, which is done easily

joined-stream

  • Joining stream with a data set, which at that time is work in progressjoined-dataset

I like this streaming API. Can I use it for batch?

Can I do batch processing with the streaming framework?

Yes you can! There are projects like Apache Beam or Flink’s Table API that automatically leverages advantages of both words based on the data input.

When is batch processing good?

Is it really the case that we want to put continuous stream processing everywhere? No, of course not. Batch processing is still good for a number of scenarios.

For instance, batch is still the only choice in places like ad-hoc analysis or data exploration. Tools like Hive, Spark SQL, Presto, Kylin, Spark + R and many others are very good fit for that.

Batch is also a valid choice when you have large swaths of data stored in HDFS to process. In this case it will be more efficient to process this data in batch manner, because all the optimizations that you get when data is bounded and stored in efficient layout (e.g. Parquet, ORC).

Other case, quite obvious, is when you can leverage already existing and mature libraries that are implemented only in batch.

However, if none of the above is true, think about stream processing. Does my data arrive continuously? Have I just implemented data pipelines on incoming data? Then it’s not a batch problem, but streaming one! And as such should be easily solved with modern streaming frameworks, thanks to described features (e.g. late and out-of-order data handling, session and custom windows, savepoints) and benefits (e.g. very low latency and shorter feedback loop).

The most important advice to take

Stream processing is not only about triggering alerts or getting results with low latency.

Stream is often a natural representation of data for many real-world problems. It can be successfully used for implementing ETL pipelines, calculating KPI metrics, powering business reports etc. – in use-cases where many companies has traditionally used batch processing technologies.

With modern processing frameworks like Flink you can process your data in an easy, accurate and continuous manner. We believe a best summary is to:

dont-solve

SHARE THIS!
Post by Dawid Wysakowicz

Data Engineer at GetInData working to help people and companies succeed with Apache Flink. Has already started to participate in the Flink community with first patch contributions behind. First interested with Big Data technologies in 2015 while writing Master Thesis on Distributed Genomic Datawarehouse. Recently had helped to extract value from large datasets at mBank

3 Responses to Streaming analytics better than classic batch – when and why ? – Part 2

  1. Zhe

    I have a question about EarlyTriggeringTrigger, why the reduceFunction is get the smaller “fire time”? I think it should be the biggest one(larger fire time) between two fire times.
    Because the event time is ever-increasing, but if you always register the smaller one to event time Timer, then how to fire for the window?

    • Dawid Wysakowicz

      Early triggering trigger registers two timers:
      – one for the end of a window
      – second for firing before end of window with given interval

      The part I you are referring to is for the second trigger. First of all lets make it clear that in event-time, events can occur out of order. Let’s assume first occured event with timestamp equal to 2(which is also the start of the window), so we want to early fire at timestamp: 2 + interval. But before we fired it, an event with timestamp 1 occured, which moved our start of event 1 sec before, than we should fire at 1 + interval already. That is why we always schedule at smaller. If we scheduled for the biggest one than we would never fire it, as we would constantly move the firing time forward.

      Also as I described before the timer for end of window is independent for the one that fires early.

      Hope it makes things a bit clearer!

Leave a Reply

Your email address will not be published. Required fields are marked *

Blue Captcha Image
Refresh

*