While a lot of problems can be solved in batch, the stream processing approach can give you even more benefits. Today, we’ll discuss a real-world example of user session analytics to give you an insight to a use-case with a comprehensive overview of business and technical problems that modern stream processing technologies like Apache Flink can solve. This post is jointly written by Adam Kawa (CEO at GetInData) and Dawid Wysakowicz (a Flink contributor — previously at GetInData and currently at Ververica) and it is based on the talk “Streaming analytics better than batch — when and why ?” given at Big Data Tech Warsaw Summit 2017 in February 2017. Because the talk was very well received by the audience, we decided to convert it into blog post. Originally we have published it on our website on March 2017, and now we republish it on our new blog on Medium.com.
User Sessionization
Our sessionization use-case is inspired by Spotify. When analyzing user sessions at Spotify, you can compute many basic statistics like: duration of the session, a number of songs, skips, and interruptions as well as more advanced things like the current mood of the user and so on.
User Sessionization — when and why?
You can do at least three types of things with the output of your session analytics jobs.
First, you can visualize KPIs on dashboards. For example, you can show how long users listen to a new episode of Discover Weekly playlist (personalized playlist with a song propositions depending on the songs heard and current recommendations) i.e.how many consecutive songs they listen to or skip.
Secondly, using the stream analytics functionalities, we can spot current users behaviours using multiple metrics and react accordingly. For example, if Australian users don’t listen to Discover Weekly as long as usual on Monday morning, we can quickly trigger an alert to Spotify. Perhaps, we will be able to identify a problem before European or American users wake up and the negative incident can be properly managed.
Third, we can also use the insight from the current sessions to recommend even better songs and ads based on what a user listens to or feels right now.
Of course, the above use-cases can be achieved by classic batch processing, with hourly or daily jobs. But obviously, we can get more value out of our data if we process it real-time with low latency.
Classic Batch Architecture
Many companies use technologies like Kafka, Hadoop, Spark, and Oozie to analyze user sessions.
How do they do it?
At first, a system needs to gather the data from the users — events that represent users activity are continuously sent to Kafka in real-time. Next, we use a batch tool like Camus to copy events from Kafka to HDFS (Hadoop system) periodically, let’s say each hour. As a next step, we use a technology like Spark to run batch jobs to group individual user events into user sessions. A single user session at Spotify can last many hours, e.g. when a user is listening to music at work. This makes the process of building complete and correct user sessions challenging. Events from the same user session can be located in many hourly buckets and you don’t know them upfront.
Sessions after midnight
To mitigate this problem, our Spark job can run daily (e.g. at midnight) and process the last 24 hourly buckets to generate a dataset with user sessions for a particular day. This approach allows you to build many complete and correct user sessions. However, there will be a problem with the sessions that start before and end after midnight. Because Spotify is a global product, there will be many of such sessions since users listen to music all the time and everywhere.
With the alternative approach, our Spark job can run hourly but each time it will combine intermediate data about active sessions from past hour to generate sessions that ended at a particular hour. Keeping such an intermediate data (state) about the in-flight session is a non-trivial task, though.
Regardless from the approach we follow, in the logic of our Spark job, we must group events by user, sort them by a timestamp, remove duplicates and what’s most important, assign the same session ID to events that occur close enough to each other.
Drawbacks of Classic Batch
First of all, it has many moving parts. You need to decide how to segregate events hourly or daily in HDFS, learn tools like Camus for ingestion and Oozie for scheduling. You need to write a lot of gluing code to integrate these technologies together into a single pipeline. Then you need to monitor this pipeline and make sure that each component is up and running.
Even if your pipeline don’t fail and finish successfully, it will take hours to complete and process the data. Such a time gap makes the value of this data lower and you can use it only for making reactive and historical analyses. If you want to make actionable or predictive decisions (i.e. triggering instant alerts or updating music recommendations), then you need to generate session datasets with much lower latency.
Micro-Batch Architecture
The easiest way to decrease latency and shorten the feedback loop is to use Spark Streaming.
With continuously running Spark Streaming job, the infrastructure is much simpler, because you no longer need Camus, Oozie or partition data in HDFS. You can generate results faster by configuring your Spark Streaming job to process all new events in small batches created each hour or each 10 minutes or even each minute.
Sessionization is actually not natively supported in Spark Streaming (SPARK-10816) and is far from being easy in general case. However, it’s feasible with a few tricks and a custom code. A specific code is needed because Spark Streaming internally divides your continuous stream of events into separate micro-batches. Because a single user session can span multiple of micro-batches, you need to implement own custom code to build the user session from the events that belong to many micro-batches.
This can be achieved with the mapWithState method that maintains internal session state for each user across batches. There are a few blog posts that explain how to implement it and describe its pros and cons — you can find them here. Please note that these blog posts focus only on how to build a user session, but they don’t describe other problems that can happen that we describe below.
Reality of Event Stream
Problems can happen when events arrive late due to such problems like network connectivity issues. With Spotify, you can listen to music in offline mode when the songs stored locally on your device will be played. When you are offline, e.g. because you take a flight to Warsaw, the events of your music session are cached locally on your phone by the Spotify client. They will not be sent to Kafka yet as it requires internet connection. Therefore all previously buffered events will be routed to Kafka once user is back online. The events happen to be included in the same micro-batch as new events that are generated in the online mode afterwards. If your processing logic doesn’t somehow differentiate the original event time (generated offline) from the current time (generated online), then you will get incorrect results because old and new events will be considered equally will be processed in the same micro-batch.
Data Out of Order
The variation of this problem can happen when a user changes devices. Assume that you fly to Warsaw again. You listen to music in the offline mode on your laptop because you want to save your phone battery for later. These events are again buffered locally and can’t be sent to Kafka. When the fight ends, you shutdown your laptop, try to get out of the airport and you order a taxi. While driving home, you listen to music on your mobile phone in the online mode and these mobile events are sent to Kafka immediately. However, when you arrive home and relax on your sofa, you turn on your laptop and start listening to music from it. This time the laptop connects to the Internet successfully, so all previously buffered desktop events are sent to Kafka now. This means that events for a particular user are sent to Kafka out of order. First events from a mobile, then events from a laptop but the reality was obviously different. Again, if we don’t handle this scenario in our processing logic, then we will get incorrect results.
As you see there are a few serious problems to solve. Some of them are caused by classic batch technologies. That forced us to recognize data not as a continuous stream of events, but as files in HDFS or micro-batches that are processed periodically. What’s more, due to lack of functionalities to handle late or out-of-order events, the results of our jobs are incorrect.
Solving a Streaming Problem
At GetInData we do believe that modern stream processing engines allow us to process data in a simple and correct way.
This can be achieved with Apache Flink. Just like with Spark Streaming we can access Kafka events directly, but this time, with Flink, we process them in their native representation — as a stream instead of batch abstraction. Of course, it does not mean that further processing will be more complicated. Quite the opposite — this means no more data lakes, just a flowing river.
Example Implementation
We’ve implemented user sessionization in Flink to show you how easily all those problems can be resolved with a few lines of code. The use-case that we are solving with this example is to count how long user listens to music in a single session (case A) or how many consecutive songs a user plays from a particular playlist (case B) as shown in the picture below.
The first step of our pipeline is reading events from Kafka. Flink provides a Kafka consumer that, with the help of internal checkpointing mechanism, gives us the power of exactly-once processing. All we need to do is to provide simple connection parameters like topic or Kafka broker address.
sEnv.addSource(new FlinkKafkaConsumer09[Event](conf.topic(),
getSerializationSchema,
kafkaProperties(conf.kafkaBroker()))
)
Next, we have to create user sessions — first by grouping incoming events by userId as a key and then assigning to session windows. It is as simple as specifying a gap between events that constitute a window and all the event-time magic will happen automatically underneath.
.keyBy(_.userId)
.window(EventTimeSessionWindows.withGap(Time.minutes(15)))
We can complete the most basic example with a computation function that will be applied for each user session window. As a result, we have a nice processing pipeline that handles out of order events that fits into five lines of code. Isn’t it neat?
val sessionStream : DataStream[SessionStats] = sEnv
.addSource(new FlinkKafkaConsumer09[Event](…))
.keyBy(_.userId)
.window(EventTimeSessionWindows.withGap(Time.minutes(15)))
.apply(new CountSessionStats())
Advanced time handling
We do not stop at the basic example because there is still some room for improvement. For example, we want to be able to handle late events. Flink also comes with a solution for that real-world issue. Let’s assume that we know some events may be late by the maximum amount of time i.e. 60 minutes. We just need to set that parameter in our pipeline. Flink will keep all windows state for that extra period so that when some outdated events come, we can react to them by updating our aggregates.
.allowedLateness(Time.minutes(60))
What else can we ask for? A good idea is to shorten the feedback loop. We can do that by writing an early firing trigger that will emit each window every few minutes with intermediate results.
.trigger(EarlyTriggeringTrigger.every(Time.minutes(10)))
We believe that the above examples shows pretty well that all time-specific issues, even hard ones, become simple and robust with Flink.
Beyond the code
Though the code examples are descriptive, not every aspect of stream processing can be expressed with them. What is not visible from the provided case study is that Flink:
- is still a very efficient engine with both low latency and high throughput
- provides exactly-once stateful processing
- makes operational tasks easier
All those features are highly rated as confirmed by results of recent user survey conducted by our fellow colleagues in Data Artisans here and here.
The most important advice to take
Stream processing is not only about triggering alerts or getting results with low latency.
The stream is often a natural representation of data for many real-world problems. It can be successfully used for implementing ETL pipelines, calculating KPI metrics, powering business reports etc. — in use-cases where many companies have traditionally used batch processing technologies.
With modern processing frameworks like Flink, you can process your data in an easy, accurate and continuous manner.