17 min read

My experience with Apache Flink for Complex Event Processing

My goal is to create a comprehensive review of available options when dealing with Complex Event Processing using Apache Flink. We will be building a simple proof-of-concept solution for an example use case.

The codebase for examples is provided at GitHub.


Complex Event Processing (CEP)

The term "complex event processing" defines methods of analyzing pattern relationships between streamed events. When done in real-time, it can provide advanced insights further into the data processing system.

There are numerous industries in which complex event processing has found widespread use, financial sector, IoT and Telco to name a few. Those uses include real-time marketing, fraud and anomalies detection and a variety of other business automation cases.


Flink is a promising framework to combat the subject of complex event processing. It supports low-latency stream processing. I recommend Flink docs as the best way to learn more about the project - they're very well written and cover both high-level concepts and concrete API calls.

Complex events may be processed in Flink using several different approaches, three of which I'll cover moving forwards:

I'll be using Scala as a programming language of choice.

Use case

I believe that the best way to learn the strengths and weaknesses of each approach is to show them in action. Therefore, I created an example use case inspired by the real business needs of one of our Telco clients. We had to solve this case on the scale of hundreds of thousands of events per second and over ten million unique subscribers.


Let's say a telco company would like to make use of its billing data to make advertising decisions to its pre-paid customers. The billing data structure would be as follows:

  • client id (presumably his phone number)
  • event timestamp
  • client's pre-paid balance before the event
  • client's pre-paid balance after the event

The company currently sends alert information to those customers, whose balance approaches 0$. The threshold under which alarm will trigger is set at 10$. The company would like to know customers' reactions to the alarm trigger. It estimates that if the customer tops up his account within one hour from the alarm trigger, this is the reaction sparked by that alarm.

Thus, the event pattern we would like to capture would look like this:

  • event with balance before value >= 10$ and balance after value < 10$ (alarm trigger)
  • zero or more events with balance after <= balance before (non-top-up actions)
  • event with balance after > balance before (top-up action), happening within one hour from the first event

So, we can define input and output structures like this:

    case class BillingEvent(id: String,
                            datetime: String,
                            balanceBefore: Long,
                            balanceAfter: Long) {
      val dateFormat = "yyyy-MM-dd HH:mm:ss"
    object BillingEvent {
      def apply(line: String): BillingEvent = {
        val splitLine: Array[String] = line.split(",")

    case class AlertReactionEvent(id: String,
                              alarmTriggerDatetime: String,
                              topupDatetime: String)

ProcessFunction solution

Stateful stream processing is the lowest level of abstraction provided in Flink API. Because of it, it is suitable for the custom tasks not covered by higher-level APIs. An example solution using ProcessFunction will look as follows:

case class KeyLastModified(key: String, lastModified: Long)

    class AlertReactionEventProcessor extends KeyedProcessFunction[Tuple, BillingEvent, AlertReactionEvent] {

    lazy val lastModifiedState: ValueState[KeyLastModified] = getRuntimeContext
      .getState(new ValueStateDescriptor[KeyLastModified]("last_modified_state", classOf[KeyLastModified]))

    lazy val alarmTriggerDatetime: ValueState[String] = getRuntimeContext
      .getState(new ValueStateDescriptor[String]("alarm_trigger_datetime", classOf[String]))

    lazy val alerted: ValueState[Boolean] = getRuntimeContext
      .getState(new ValueStateDescriptor[Boolean]("alerted", classOf[Boolean]))

    override def processElement(value: BillingEvent,
                                ctx: KeyedProcessFunction[Tuple, BillingEvent, AlertReactionEvent]#Context,
                                out: Collector[AlertReactionEvent]): Unit = {

      val current: KeyLastModified = lastModifiedState.value match {
        case null =>
          KeyLastModified(value.id, ctx.timestamp)
        case KeyLastModified(key, lastModified) =>
          KeyLastModified(key, ctx.timestamp)

      if (value.balanceBefore >= 10 && value.balanceAfter < 10) {
        ctx.timerService.registerEventTimeTimer(current.lastModified + withinValue)
      if(ctx.timestamp >= lastModifiedState.value.lastModified + withinValue){
        logger.info(f"Time up for ${value.id} , ${alarmTriggerDatetime.value()}")
      if (value.balanceBefore < value.balanceAfter && alerted.value()) {

        out.collect(AlertReactionEvent(value.id, alarmTriggerDatetime.value(), value.datetime))


As we can see, Flinks ProcessFunction API is a powerful tool that can accurately capture patterns of any complexity. However, heavy usage of state variables is needed. Moreover, in the real world applications of stream processing, we have to deal with out-of-order events and this API level would then require us to keep track of this order, using for example a priority queue. The following solutions will provide a more streamlined way of thinking about complex patterns, resulting in better clarity, as well as built-in sorting as support for out-of-order events.

FlinkCEP solution

FlinkCEP is the library that allows complex patterns definition using its higher level Pattern API. Internally, the pattern is abstracted to a non-deterministic finite automaton. We can, therefore, draw similarities between event pattern matching and regular expression matching.

To illustrate, let's define alarm trigger event as event A, following zero or more non top up actions as event B and finally top up action as C. Then, the abstracted target event pattern will be A B* C. FlinkCEP equivalent will look roughly like this:

        .begin("A").where(/* conditions */)
        .next("B").oneOrMore().optional().where(/* conditions */)
        .next("C").where(/* conditions */)

Matched patterns can be processed with PatternProcessedFunction, which is ProcessFunction equivalent in Pattern API, taking whole matched event chains as input.

FlinkCEP implementation is shown below:

    val pattern = Pattern.begin[BillingEvent]("A", AfterMatchSkipStrategy.skipPastLastEvent())
      .where(new SimpleCondition[BillingEvent]() {
      override def filter(value: BillingEvent): Boolean = {
        value.balanceBefore >= 10 && value.balanceAfter < 10
      .next("B").oneOrMore().optional().where(new SimpleCondition[BillingEvent]() {
      override def filter(value: BillingEvent): Boolean = {
        value.balanceBefore >= value.balanceAfter
      .next("C").where(new SimpleCondition[BillingEvent]() {
      override def filter(value: BillingEvent): Boolean = {
        value.balanceBefore < value.balanceAfter
    val patternStream: PatternStream[BillingEvent] = CEP.pattern(keyedBillings.javaStream, pattern)
    val result: SingleOutputStreamOperator[AlertReactionEvent] = patternStream.process(
      new PatternProcessFunction[BillingEvent, AlertReactionEvent]() {
        override def processMatch(
                                   patternMatch: util.Map[String, util.List[BillingEvent]],
                                   ctx: PatternProcessFunction.Context,
                                   out: Collector[AlertReactionEvent]): Unit = {

Flink CEP strength is an intuitive method of modeling event cause-and-effect relations. With interface more suitable for the job than ProcessFunction one, FlinkCEP-powered applications will be easier to maintain and scale.


In December 2016, SQL standard (link) was enriched with MATCH_RECOGNIZE clause to make pattern recognition with SQL possible. Flink support for MATCH_RECOGNIZE clause was added in version 1.7, following issue FLIP-20. Under the hood, MATCH_RECOGNIZE is implemented using Flink CEP. We will codify our event phases exactly as in the FlinkCEP section.

Declarative way of approaching our problem results in the following code:

val result: Table = tableEnv.sqlQuery(
        | SELECT *
        | FROM $billingsTable
        |   MATCH_RECOGNIZE (
        |     PARTITION BY id
        |     ORDER BY user_action_time
        |     MEASURES
        |       A.datetime AS alarm_trigger_datetime,
        |       C.datetime AS topup_datetime
        |     ONE ROW PER MATCH
        |     DEFINE
        |       A AS A.balanceBefore >= 10 AND A.balanceAfter < 10,
        |       B AS B.balanceBefore >= B.balanceAfter,
        |       C AS C.balanceBefore < C.balanceAfter
        |   ) t


SQL option provides arguably the best clarity and brevity of them all. Moreover, this approach allows for better mutual understanding between data engineers and data analysts, as SQL is commonly used by both groups.

It's not all roses, though

While MATCH_RECOGNIZE approach may look universally optimal, there are still some problems due to the freshness of the project.

What Flink SQL MATCH_RECOGNIZE can't do (yet)

Flink still lacks full support for MATCH_RECOGNIZE standard. Here is the list of some of the missing features. Depending on your use case, this may end up not being the problem or it may require working your way out of the unsupported clause with additional code. Hopefully, over time those holes in the standard will be plugged by the Flink development team.

What the standard itself can't do (yet?)

While holes may eventually be plugged in Flink implementation, it may still not be enough to cover all CEP use cases in the current version of the MATCH_RECOGNIZE standard. Notably, as it stands, the standard lacks a dedicated way to deal with time constraints.

Currently, Flink implementation handles simple time constraints with the use of aforementioned non-standard WITHIN clause which can be helpful for a lot of cases. However, there are still some gaps.

Unruly case

In the case we implemented for our client, the requirement was to also detect the absence of the event. And that appeared to be impossible using MATCH_RECOGNIZE clause.

Let's say that our company, in addition to "top-up within one hour" events would like to track those customers who do not top-up in the appointed time. Therefore, after an hour without a top-up, we would like to generate an appropriate event. Unfortunately, there is no capability in the current SQL standard nor in Flink SQL implementation to define how to handle such "absence of the top-up event in one hour" cases.

A way out, which seems most in line with MATCH_RECOGNIZE event handling, would be to implement the pattern using FlinkCEP, which in addition to within functionality provides processTimedOutMatch function, which would allow passing timed out events to side output.

    val patternStream: PatternStream[BillingEvent] = CEP.pattern(keyedBillings.javaStream, pattern)

    val sideOutputTag = OutputTag[AlertReactionEvent]("absence-of-topup-event")
    val result: SingleOutputStreamOperator[AlertReactionEvent] = patternStream.process(
      new PatternProcessFunction[BillingEvent, AlertReactionEvent]() with TimedOutPartialMatchHandler[BillingEvent] {
        override def processMatch(
                                   patternMatch: util.Map[String, util.List[BillingEvent]],
                                   ctx: PatternProcessFunction.Context,
                                   out: Collector[AlertReactionEvent]): Unit = {

        override def processTimedOutMatch(patternMatch: util.Map[String, util.List[BillingEvent]],
                                          ctx: PatternProcessFunction.Context): Unit = {



Flink provides a variety of ways of handling complex event processing. Each way has its merit:

  • FlinkCEP is the more versatile approach
  • Flink SQL MATCH_RECOGNIZE is the more expressive one
  • ProcessFunction is an everything-goes backup for highly non-standard transformations.

Which one is the best changes according to the specific use case. We hope that someday most, if not all, of complex pattern matching will be possible in a higher-level language like Flink SQL as this will enable analysts to define business rules themselves, without low-level coding, resulting in increased productivity.

Finally, I'd like to thank Krzysztof Zarzycki and a whole GetInData team for their invaluable support - none of it would have been possible without their help :)


Would like to know more about Complex Event Processing? Read this article: "Personalised offers? Business process automation? Anomaly detection? What about real-time event stream processing to achieve that?"

If you have any questions about automating decisions in real-time, please do not hesitate to contact us!

big data
stream processing
29 May 2020

Want more? Check our articles

blogobszar roboczy 1
Success Stories

How we built a Modern Data Platform in 4 months for Volt.io, a FinTech scale-up

Money transfers from one account to another within one second, wherever you are? Volt.io is building the world’s first global real-time payment…

Read more
screenshot 2022 08 02 at 10.56.56
Tech News

2022 Big Data Trends: Retail and eCommerce become one of the hottest sectors for AI/ML

Nowadays, we can see that AI/ML is visible everywhere, including advertising, healthcare, education, finance, automotive, public transport…

Read more
paweł lesszczyński 2obszar roboczy 1 4x 100

Alert backoff with Flink CEP

Flink complex event processing (CEP).... ....provides an amazing API for matching patterns within streams. It was introduced in 2016 with an…

Read more
data analyst data analytics how start career non technical background getindata big data blog

Data Analyst - how to start your career with a non-technical background

Interested in joining the data analytics world? Not sure where to start? Are more and more questions popping into your head? I’ve been there myself…

Read more
Radio DaTa Podcast

Data Journey with Yetunde Dada & Ivan Danov (QuantumBlack) – Kedro (an open-source MLOps framework) – introduction, benefits, use-cases, data & insights used for its development

In this episode of the RadioData Podcast, Adam Kawa talks with Yetunde Dada & Ivan Danov  about QuantumBlack, Kedro, trends in the MLOps landscape e.g…

Read more
1 RsDrT5xOpdAcpehomqlOPg
Big Data Event

2³ Reasons To Speak at Big Data Tech Warsaw 2020 (February 27th, 2020)

Big Data Technology Warsaw Summit 2020 is fast approaching. This will be 6th edition of the conference that is jointly organised by Evention and…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.

What did you find most impressive about GetInData?

They did a very good job in finding people that fitted in Acast both technically as well as culturally.
Type the form or send a e-mail: hello@getindata.com
The administrator of your personal data is GetInData Poland Sp. z o.o. with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the Terms & Conditions. For more information on personal data processing and your rights please see Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy