Streaming Analytics Better than Classic Batch – When and Why ? – Part 1
While a lot of problems can be solved in batch, the stream-processing approach can give you even more benefits. In this blog post series, we’ll discuss a real-world example of user session analytics to give you a use-case driven overview of business and technical problems that modern stream processing technologies like Apache Flink help you solve, and benefits you can get by using them today for processing your data as a continuous stream of events. This two-part blog post series is based on the talk “Streaming analytics better than batch – when and why ?” that we gave at Big Data Tech Warsaw 2017 in Poland in February 2017. Because our talk was very well received by the audience and it was even rated as the best talk in the whole “Real-time processing” track, we decided to convert it into blog posts.
This post was jointly written by Adam Kawa (CEO at GetInData) and Dawid Wysakowicz (Flink contributor at GetInData).
We will explain the benefits of stream processing by example. Our example is analyzing user sessions. This is something that many companies do, usually in batch and sometimes in real-time.
Our sessionization use-case is inspired by Spotify. Of course, it can be applied to many other companies from sectors like e-commerce, media or IoT. When you analyze user sessions at Spotify, you can compute many basic statistics like duration of the session, a number of songs, skips, and interruptions as well as more advanced things like the current mood of the user and so on. Please read this research paper by Spotify if you are interested in more ideas.
Dashboards, Alerts, Content
You can do at least three types of things with the output of your session analytics jobs. First, you can visualize KPIs on dashboards. For example, you can show how long users listen to a new episode of Discover Weekly playlist. How many consecutive songs they listen to or skip. If you don’t know, the Discover Weekly playlist is a personalized playlist generated for each user each week. On Monday morning and it contains fresh songs that a user might like. Because each week, each user gets new songs in this playlist, the calculated metrics will change depending on how good current music recommendations are. Therefore it will be useful to receive alerts when some metric changes dramatically. For example, if Australian users don’t listen to Discover Weekly as long as usual on Monday morning, then we can quickly trigger an alert so that someone can react. Perhaps, we will be able to identify a problem before European or American users wake up. What’s more, we can also use the insight from the current sessions to recommend even better songs and ads based on what a user listens to or feels right now.
To some extent, all of these use-cases can be achieved by classic batch processing with hourly or daily jobs. However, we can obviously get more value out of our data if we process it in real-time with low latency. What’s more if our jobs process data correctly, then we can trust our numbers and alerts even more.
Classic Batch Architecture
Many companies use technologies like Kafka, Hadoop, Spark, and Oozie to analyze user sessions.
Events that represent users activity are continuously sent to Kafka in real-time. Then we use a batch tool like Camus to copy events from Kafka to HDFS periodically, let’s say each hour. As a next step, we use a technology like Spark to run batch jobs to group individual user events into user sessions. A single user session at Spotify can last many hours, e.g. when a user is listening to music at work. This makes the process of building complete and correct user sessions challenging. Events from the same user session can be located in many hourly buckets and you don’t know them upfront.
Sessions after midnight
To mitigate this problem, our Spark job can run daily (e.g. at midnight) and process last 24 hourly buckets to generate a dataset with user sessions for a particular day. This approach allows you to build many complete and correct user sessions. However, there will be a problem with the sessions that start before and end after midnight. Because Spotify is a global product, there will be many of these sessions since users listen to music all the time and everywhere.
With the alternative approach, our Spark job can run hourly but each time it will combine intermediate data about active sessions from past hour to generate sessions that ended at a particular hour. Keeping such an intermediate data (state) about the in-flight session is also a non-trivial task, though.
Regardless with the approach we follow, in the logic of our Spark job, we must group events by user, sort them by timestamp, remove duplicates and what’s the most important, assign the same session ID to events that occur close enough to each other.
If you are interested in more details, you can read a case-study chapter in the O’Reilly book (“Hadoop Application Architectures”) that describes common sessionization steps on a clickstream example. Even though there is a book that describes the classic batch solution to user sessionization with Hadoop and many companies have deployed it in production, it has a number of drawbacks.
Drawbacks of Classic Batch
First of all, it has many moving parts. You need to decide how to partition events hourly or daily in HDFS, learn tools like Camus for ingestion and Oozie for scheduling. You need to write a lot of gluing code to integrate these technologies together into a single pipeline. Then you will need to monitor this pipeline, making sure that each component is up and running. From time to time, something will fail due to issue in your code or problem with the tools that you use.
Even if your pipeline won’t fail and finish successfully, it will take hours to complete because we use classic batch processing technologies. If you wait hours or days for your data, then the value of this data is lower and you can use it only for making reactive and historical decisions. However, if you want to make actionable or predictive decisions like triggering instant alerts or updating music recommendations based on what a user does and feels right now, then you need to generate session datasets with much lower latency.
The easiest way to decrease latency and shorten the feedback loop seems to be using Spark Streaming.
With continuously running Spark Streaming job, the infrastructure is much simpler, because you don’t longer need Camus, Oozie or partition data in HDFS and so on. You can generate results faster by configuring your Spark Streaming job to process all new events in small batches created each hour or each 10 minutes or maybe even each 1 minute.
Sessionization is actually not natively supported in Spark Streaming and is far from being easy in general case. However, it’s doable with a few tricks and implementing custom code. They are needed because Spark Streaming internally divides your continuous stream of events into separate micro-batches. Because a single user session can span multiple of micro-batches, you need to implement own custom code to build the user session from the events that belong to many micro-batches.
This can be achieved with the mapWithState method that maintains internal session state for each user across batches. There are few, quite recent blog posts that explain how to implement it and describe its pros and cons – you can find them here, here and here. Please note that these blog posts focus only on how to build a user session, but they don’t describe other problems that can happen.
Reality of Event Stream
Problems can happen when events arrive late due to problems like network connectivity issues. With Spotify, you can listen to music in offline mode when songs that are stored locally on your device will be played. When you are offline, e.g. because you take a flight to Warsaw, the events are cached locally on your phone by the Spotify client. They will not be sent to Kafka yet. It is only when you connect to the Internet and start listening to music in the online mode later. All previously buffered events will be sent to Kafka. They happen to be included in the same micro-batch as new events that are generated in the online mode. If your processing logic doesn’t somehow differentiate original event time from the current time, then you will get incorrect results because old and new events will be considered as the events that happen right now and they will be processed in the same micro-batch.
Out of Order Data
The variation of this problem can happen when a user changes devices. Assume that you fly to Warsaw again. You listen to music in the offline mode on your laptop because you want to save phone battery for later. These events are again buffered locally and can’t be sent to Kafka. When the flight ends, you shut down your laptop, try to get out of the airport and you order a taxi. While driving home, you listen to music on your mobile phone in the online mode. Of course, these mobile events are sent to Kafka immediately. However, when you arrive home and relax on your sofa, you turn on your laptop and start listening to music from it. This time the laptop connects to the Internet successfully, so all previously buffered desktop events are sent to Kafka now. This means that events for a particular user are sent to Kafka out of order. First events from a mobile, then events from a desktop, but the reality was obviously different. Again, if we don’t handle this scenario in our processing logic, then we will get incorrect results.
As you see there are a few serious problems to solve. Some of them are caused by classic batch technologies. That forced us to think about data not as a continuous stream of events, but just as files in HDFS or micro-batches that are processed periodically. What’s more, due to lack of tooling to handle late or out-of-order events, the results of our jobs are usually not correct.
The Second Part
In the second part of “Streaming Analytics Better Than Batch – when & why?” blog post series, we explain how you can address these problems by using stream processing technologies such as Apache Flink. Please stay tuned!