Stream Analytics platform for a Telco – part 2
The real story how we built a modern Stream Analytics platform at telco (part 2)
In the first part, we described business requirements and we showed you a new architecture of stream analytics platform with Apache Flink as the main processing engine. In the second part, we’ll describe the challenges we have been struggling, lower-level technical decisions that we have made as well as our testing and monitoring environment.
Low-latency and high-throughput performance
Handling more than 160k messages per second on average and over 300k messages per second in a peak requires careful design from the very beginning.
Of course, it is impossible to foresee every bottleneck and premature optimisation may end up as inaccurate. Nevertheless, we’ve started with a few assumptions that in a long run allowed us to fulfill our performance requirements. The most important one is not to access external systems on a per event level. Whenever we need data enclosed in some external system, we stream it inside our platform. That way we achieve two things:
- all lookups are local, possibly just to memory, therefore the resulting latency is the smallest possible;
- we do not put any additional stress on those external systems. As we mentioned we are not owners of those systems and we are not able to ensure they can handle the load we could possibly generate;
As we previously said one of the requirements was to provide an easy, straightforward way to adjust running logic. It is a quite common request to change some threshold from which an offer should be sent to a subscriber. To allow it we have built a GUI that underneath generates a stream of control messages that should be applied to a stream of events. Fortunately, Apache Flink gives all required pieces to do it.
Our event streams are keyed by user’s msisdn. Fortunately, we always look for business rules based on events of a single user, therefore we can redistribute events based on that key and keep most of the computational state scoped to that key. However, it is a bit different story with the control stream. Those rules should be applied to each and every key. Therefore we broadcast that stream to all operators and store those definitions in an OperatorState. Thanks to that mix of Keyed and Operator State we are able to dynamically apply new, adjusted rules with no downtime.
React to schema changes
Often when badly approached, big data solutions might not bring any value to a company for quite some time. Therefore at Getindata, we always try to start with a use case in mind and implement it as fast as possible for it to start producing a return on investment. More on our approach called “Lean Big Data” you can read here. The downside of it is many things can change along the way.
One of the most often changing thing is our data. Is it because we did not know as well as we would like from the very beginning or is it just changed extended externally. No matter the reason we should be able to adjust seamlessly. The approach we chose is to store all our data in the Avro format and publish it in a schema registry. This way we can keep data with different versions of schema within same Kafka topic. Moreover, it allows us to migrate our Flink’s state. All of that we could achieve with an arguably small effort of writing a few serializers that are able to lookup schema from Hortonworks Schema Registry.
Slowly changing streams
It is already quite some time since the need for event time processing was noticed. We really recommend you to read through the great post from Tyler Akidau (“The world beyond batch”), which brilliantly walks through the event-time handling. Unfortunately, it is not enough just to extract timestamp from the event rather than from a system clock, but we also need to have a way of tracking the progress of that event-time. The mechanism that allows us to track this progress is called Watermark, which in short is a heuristic based on incoming events (we really encourage you to read through the article).
Watermark is a great mechanism, but it is important to understand it well because there might occur some corner cases. One of those that we faced was the problem of joining a slowly changing stream of control messages with a stream of events. The problem is that if there is no data arriving for some time, then the Watermark does not advance and therefore whole processing is withheld. The solution is to do not take Watermark of that slow stream into account. We can do it by generating Watermark equal to Long.MAX_VALUE. Of course, it does not come without a cost. We are not able to guarantee a finite global order of those two streams, but as those are rules applied dynamically we do not care if they will be applied ten events earlier or later.
Let’s talk about testing now, end-to-end tests in particular. Essentially, the purpose of performing end-to-end testing is to check whether the business requirements are met and to ensure the integrity of all the components involved. At first glance, it seemed to us that testing a distributed platform should be a struggle: the platform consists of lots of components, we integrate with external subsystems and datasources, the components communicate using different protocols, it is hard to debug such a system and so on. Nonetheless, despite the aforementioned hardships, we were able to create a testing environment, which is easy to use and allows to focus on business logic only while writing test scenarios.
Imagine that we have a preproduction environment with our platform running on it and a test scenario we want to validate.
Since this is a testing environment, there were no subsystems our platform communicates with. That is why we have implemented fake data sources and data sinks, which not only expose exactly the same interfaces as their production counterparts but also integrate with the testing framework, that is, allow to push test events or to query notifications received. Having had all the necessary building blocks in place, we could run test scenarios.
A typical execution works as follows.
As the first step, we create and submit a business rule to be validated. Under the hood, the testing framework translates the trigger (that’s how we call a business rule internally) into an Avro control event and pushes it into the control stream in Kafka.
Then, we simulate an action which should activate the trigger. To this end, we push a test event, which is perfectly valid syntactically and semantically, to the fake data source.
Now we wait until the event flows through the platform.
Hopefully, the test event has activated the trigger and a notification was generated. Eventually, the notification reaches the fake outgestion system. Now, in then section, we fetch relevant notification and validate it against assertions.
Finally, it is worth to say a few words about the problem with watermark progress which occurred in our testing environment. In the Slowly changing streams section we have mentioned that if there is not enough data flowing through the system (events generated by test scenarios are definitely insufficient), the watermark in Flink job is not advancing in a predictable way (and hence tests fail). To address the problem, we had to generate artificial traffic on the testing environment. To this end, we created test event generators, one for each data source. Their only goal is to keep watermark constantly progressing. The test generators are just simple pipelines in Apache Nifi (yet another application of Nifi in our platform).
Unfortunately, not every aspect can be covered with tests. Therefore, in order to ensure the platform works properly, we need yet another tool, monitoring system.
As you can see, we collect metrics into InfluxDB, a dedicated database for time series. For metrics visualization we use Grafana. Grafana serves us also as an alerting tool. Similarly, we collect logs from all the platform components for debugging purposes. To this end, we have introduced the widely-known Elasticsearch + Logstash + Kibana stack.
Personally, we believe it is worth building a monitoring platform from the very beginning since it gives you a lot of insight into your platform. But which aspects of a stream processing platform are worth monitoring? Which ones should we focus on first?
Let’s run through several important metrics.
- Incoming messages rate. When the rate is lower than the usual, it indicates there are problems with either a processing job or a data source. Having some historical data, one can easily notice some regularities. Based on them, one can define alerting rules accordingly.
- Failed and late events. In practice, it may turn out that our initial assumptions about data sources are inaccurate or incomplete. Firstly, some events may fail to be processed, which suggests there are some corner cases that are not handled properly. Secondly, some events may be late, which means the adopted time restrictions are too tight. This way, for instance, we have discovered that each day at midnight a huge amount of billing data was dropped because the allowed event lateness was exceeded.
- Kafka consumer lag. Generally speaking, the lower the latency the better. Therefore, in order to ensure we consume the data in a timely manner, one should monitor Kafka consumer lag.
- Watermark progress. While the Kafka consumer lag shows whether we consume the data as soon as it arrives, the currentLowWatermark metric shows whether we processed the data smoothly and with an acceptable delay. Each task in Flink exposes a metric called currentLowWatermark that represents the lowest watermark received by this task, which in fact, shows the current event time within the task.
- Checkpointing success rate. In order to make state fault tolerant, Flink makes periodic checkpoints of the state. Checkpoints, allow Flink to recover state and positions in the streams after a program failure. Therefore, we should ensure that c we create checkpoints, so that in case of any failure we are confident we can recover very quickly and prevent any data loss.
Logs are crucial when it comes to tracking down system failures and bugs. However, logs analysis becomes problematic when the issue you want to identify spans multiple components in a distributed system. In this case, a centralized logging platform is extremely helpful. That is why we have introduced ELK stack, which allows us to easily search through the logs in one place.
Logback Mapped Diagnostic Context
What is more, to identify the root cause of a bug, one should be able to analyze a sequence of events which lead to the unexpected outcome. To this end, we need to collect low-level, detailed logs, which show how events flow through the system and how the system state is updated upon the event. However, we want to collect detailed logs only for a small, predefined set of (test) users. Otherwise, we will end up with an overwhelming amount of logs. In order to filter logs by user id, we have used Logback Mapped Diagnostic Context. As you can see below, the usage is extremely simple. If one needs to update the list of observed users, only logging configuration file has to be changed. In addition, it’s possible to reload config without the application restart.
While this post became very lengthy, I still feel we just scratched the surface of all interesting stuff, techniques, and knowledge we applied in this project. We completed this project phase, that was about building high quality, performance and reliability stream processing platform from the scratch … in less than 6 months. In fact, first production use cases were deployed in production just 4 months, bringing tangible results.
We had a goal to build business rules engine and not all others like the data lake, data science, and machine learning workbench and solve 1000 other also important issues of the company. Of course now, incrementally we are going to add these functions because the platform architecture allows to extend it seamlessly. The client (and we!) can’t wait to implement use cases like next-generation BI and reporting, Customer 360 view, CAPEX and OPEX optimisations, predictive maintenance, revenue assurance, data monetisation and many more.
Secondly, we attribute the project success also to the system quality that we achieved by using the right mix of technologies and practices. The use of Flink, Kafka, and Nifi allowed us to build the platform that is ready for large-scale production use from day 1. Flink gives us the power to express powerful and accurate real-time analytics, Nifi allowed us to define the dataflows quickly, and Kafka decoupled all systems in a way, that makes producers and consumers independent. That is invaluable for ease of development and maintenance.
The technologies, their HA capabilities combined with the practices of code review, devops, monitoring and testing gave us confidence in the quality and resiliency of the solution. It makes us believe we won’t be called at night to fix the system.
Stay in touch!
If you would like to know more about the project, write to us at email@example.com. If you would like to solve a similar problem, we would be glad to help you too.
And do not forget to sign up for our newsletter, not to miss any next interesting blog we are going write.
We would like to give big thanks to Alexey Brodovshuk who is a senior software engineer and a technical lead at Kcell for creating beautiful images and diagrams used in these two blog posts.