Less data, less problems: Airbyte’s column selection is finally here
The Airbyte 0.50 release has brought some exciting changes to the platform: checkpointing (so that you don’t have to start from scratch in case of…
Read moreMy goal is to create a comprehensive review of available options when dealing with Complex Event Processing using Apache Flink. We will be building a simple proof-of-concept solution for an example use case.
The codebase for examples is provided at GitHub.
The term "complex event processing" defines methods of analyzing pattern relationships between streamed events. When done in real-time, it can provide advanced insights further into the data processing system.
There are numerous industries in which complex event processing has found widespread use, financial sector, IoT and Telco to name a few. Those uses include real-time marketing, fraud and anomalies detection and a variety of other business automation cases.
Flink is a promising framework to combat the subject of complex event processing. It supports low-latency stream processing. I recommend Flink docs as the best way to learn more about the project - they're very well written and cover both high-level concepts and concrete API calls.
Complex events may be processed in Flink using several different approaches, three of which I'll cover moving forwards:
I'll be using Scala as a programming language of choice.
I believe that the best way to learn the strengths and weaknesses of each approach is to show them in action. Therefore, I created an example use case inspired by the real business needs of one of our Telco clients. We had to solve this case on the scale of hundreds of thousands of events per second and over ten million unique subscribers.
Let's say a telco company would like to make use of its billing data to make advertising decisions to its pre-paid customers. The billing data structure would be as follows:
The company currently sends alert information to those customers, whose balance approaches 0$. The threshold under which alarm will trigger is set at 10$. The company would like to know customers' reactions to the alarm trigger. It estimates that if the customer tops up his account within one hour from the alarm trigger, this is the reaction sparked by that alarm.
Thus, the event pattern we would like to capture would look like this:
So, we can define input and output structures like this:
case class BillingEvent(id: String,
datetime: String,
balanceBefore: Long,
balanceAfter: Long) {
val dateFormat = "yyyy-MM-dd HH:mm:ss"
}
object BillingEvent {
def apply(line: String): BillingEvent = {
val splitLine: Array[String] = line.split(",")
BillingEvent(
splitLine(0),
splitLine(1),
splitLine(2).toLong,
splitLine(3).toLong
)
}
}
case class AlertReactionEvent(id: String,
alarmTriggerDatetime: String,
topupDatetime: String)
Stateful stream processing is the lowest level of abstraction provided in Flink API. Because of it, it is suitable for the custom tasks not covered by higher-level APIs. An example solution using ProcessFunction
will look as follows:
case class KeyLastModified(key: String, lastModified: Long)
class AlertReactionEventProcessor extends KeyedProcessFunction[Tuple, BillingEvent, AlertReactionEvent] {
lazy val lastModifiedState: ValueState[KeyLastModified] = getRuntimeContext
.getState(new ValueStateDescriptor[KeyLastModified]("last_modified_state", classOf[KeyLastModified]))
lazy val alarmTriggerDatetime: ValueState[String] = getRuntimeContext
.getState(new ValueStateDescriptor[String]("alarm_trigger_datetime", classOf[String]))
lazy val alerted: ValueState[Boolean] = getRuntimeContext
.getState(new ValueStateDescriptor[Boolean]("alerted", classOf[Boolean]))
override def processElement(value: BillingEvent,
ctx: KeyedProcessFunction[Tuple, BillingEvent, AlertReactionEvent]#Context,
out: Collector[AlertReactionEvent]): Unit = {
val current: KeyLastModified = lastModifiedState.value match {
case null =>
KeyLastModified(value.id, ctx.timestamp)
case KeyLastModified(key, lastModified) =>
KeyLastModified(key, ctx.timestamp)
}
if (value.balanceBefore >= 10 && value.balanceAfter < 10) {
lastModifiedState.update(current)
alerted.update(true)
alarmTriggerDatetime.update(value.datetime)
ctx.timerService.registerEventTimeTimer(current.lastModified + withinValue)
}
if(ctx.timestamp >= lastModifiedState.value.lastModified + withinValue){
logger.info(f"Time up for ${value.id} , ${alarmTriggerDatetime.value()}")
alerted.update(false)
alarmTriggerDatetime.update("")
}
if (value.balanceBefore < value.balanceAfter && alerted.value()) {
lastModifiedState.update(current)
out.collect(AlertReactionEvent(value.id, alarmTriggerDatetime.value(), value.datetime))
alerted.update(false)
alarmTriggerDatetime.update("")
}
}
}
As we can see, Flinks ProcessFunction
API is a powerful tool that can accurately capture patterns of any complexity. However, heavy usage of state variables is needed. Moreover, in the real world applications of stream processing, we have to deal with out-of-order events and this API level would then require us to keep track of this order, using for example a priority queue. The following solutions will provide a more streamlined way of thinking about complex patterns, resulting in better clarity, as well as built-in sorting as support for out-of-order events.
FlinkCEP is the library that allows complex patterns definition using its higher level Pattern API. Internally, the pattern is abstracted to a non-deterministic finite automaton. We can, therefore, draw similarities between event pattern matching and regular expression matching.
To illustrate, let's define alarm trigger event as event A, following zero or more non top up actions as event B and finally top up action as C. Then, the abstracted target event pattern will be A B* C
. FlinkCEP equivalent will look roughly like this:
Pattern
.begin("A").where(/* conditions */)
.next("B").oneOrMore().optional().where(/* conditions */)
.next("C").where(/* conditions */)
Matched patterns can be processed with PatternProcessedFunction
, which is ProcessFunction
equivalent in Pattern API, taking whole matched event chains as input.
FlinkCEP implementation is shown below:
val pattern = Pattern.begin[BillingEvent]("A", AfterMatchSkipStrategy.skipPastLastEvent())
.where(new SimpleCondition[BillingEvent]() {
override def filter(value: BillingEvent): Boolean = {
value.balanceBefore >= 10 && value.balanceAfter < 10
}
})
.next("B").oneOrMore().optional().where(new SimpleCondition[BillingEvent]() {
override def filter(value: BillingEvent): Boolean = {
value.balanceBefore >= value.balanceAfter
}
})
.next("C").where(new SimpleCondition[BillingEvent]() {
override def filter(value: BillingEvent): Boolean = {
value.balanceBefore < value.balanceAfter
}
})
.within(Time.hours(1))
val patternStream: PatternStream[BillingEvent] = CEP.pattern(keyedBillings.javaStream, pattern)
val result: SingleOutputStreamOperator[AlertReactionEvent] = patternStream.process(
new PatternProcessFunction[BillingEvent, AlertReactionEvent]() {
override def processMatch(
patternMatch: util.Map[String, util.List[BillingEvent]],
ctx: PatternProcessFunction.Context,
out: Collector[AlertReactionEvent]): Unit = {
out.collect(
AlertReactionEvent(
patternMatch.get("A").get(0).id,
patternMatch.get("A").get(0).datetime,
patternMatch.get("C").get(0).datetime
)
)
}
})
Flink CEP strength is an intuitive method of modeling event cause-and-effect relations. With interface more suitable for the job than ProcessFunction
one, FlinkCEP-powered applications will be easier to maintain and scale.
In December 2016, SQL standard (link) was enriched with MATCH_RECOGNIZE clause to make pattern recognition with SQL possible. Flink support for MATCH_RECOGNIZE clause was added in version 1.7, following issue FLIP-20. Under the hood, MATCH_RECOGNIZE is implemented using Flink CEP. We will codify our event phases exactly as in the FlinkCEP section.
Declarative way of approaching our problem results in the following code:
```
val result: Table = tableEnv.sqlQuery(
f"""
| SELECT *
| FROM $billingsTable
| MATCH_RECOGNIZE (
| PARTITION BY id
| ORDER BY user_action_time
| MEASURES
| A.datetime AS alarm_trigger_datetime,
| C.datetime AS topup_datetime
| ONE ROW PER MATCH
| AFTER MATCH SKIP PAST LAST ROW
| PATTERN (A B* C) WITHIN INTERVAL '1' HOUR
| DEFINE
| A AS A.balanceBefore >= 10 AND A.balanceAfter < 10,
| B AS B.balanceBefore >= B.balanceAfter,
| C AS C.balanceBefore < C.balanceAfter
| ) t
|""".stripMargin)
```
SQL option provides arguably the best clarity and brevity of them all. Moreover, this approach allows for better mutual understanding between data engineers and data analysts, as SQL is commonly used by both groups.
While MATCH_RECOGNIZE approach may look universally optimal, there are still some problems due to the freshness of the project.
Flink still lacks full support for MATCH_RECOGNIZE standard. Here is the list of some of the missing features. Depending on your use case, this may end up not being the problem or it may require working your way out of the unsupported clause with additional code. Hopefully, over time those holes in the standard will be plugged by the Flink development team.
While holes may eventually be plugged in Flink implementation, it may still not be enough to cover all CEP use cases in the current version of the MATCH_RECOGNIZE standard. Notably, as it stands, the standard lacks a dedicated way to deal with time constraints.
Currently, Flink implementation handles simple time constraints with the use of aforementioned non-standard WITHIN clause which can be helpful for a lot of cases. However, there are still some gaps.
In the case we implemented for our client, the requirement was to also detect the absence of the event. And that appeared to be impossible using MATCH_RECOGNIZE clause.
Let's say that our company, in addition to "top-up within one hour" events would like to track those customers who do not top-up in the appointed time. Therefore, after an hour without a top-up, we would like to generate an appropriate event. Unfortunately, there is no capability in the current SQL standard nor in Flink SQL implementation to define how to handle such "absence of the top-up event in one hour" cases.
A way out, which seems most in line with MATCH_RECOGNIZE event handling, would be to implement the pattern using FlinkCEP, which in addition to within
functionality provides processTimedOutMatch
function, which would allow passing timed out events to side output.
val patternStream: PatternStream[BillingEvent] = CEP.pattern(keyedBillings.javaStream, pattern)
val sideOutputTag = OutputTag[AlertReactionEvent]("absence-of-topup-event")
val result: SingleOutputStreamOperator[AlertReactionEvent] = patternStream.process(
new PatternProcessFunction[BillingEvent, AlertReactionEvent]() with TimedOutPartialMatchHandler[BillingEvent] {
override def processMatch(
patternMatch: util.Map[String, util.List[BillingEvent]],
ctx: PatternProcessFunction.Context,
out: Collector[AlertReactionEvent]): Unit = {
out.collect(
AlertReactionEvent(
patternMatch.get("A").get(0).id,
patternMatch.get("A").get(0).datetime,
patternMatch.get("C").get(0).datetime
)
)
}
override def processTimedOutMatch(patternMatch: util.Map[String, util.List[BillingEvent]],
ctx: PatternProcessFunction.Context): Unit = {
ctx.output(
sideOutputTag,
AlertReactionEvent(
patternMatch.get("A").get(0).id,
patternMatch.get("A").get(0).datetime,
""
)
)
}
})
result.getSideOutput(sideOutputTag).print()
Flink provides a variety of ways of handling complex event processing. Each way has its merit:
FlinkCEP
is the more versatile approachFlink SQL MATCH_RECOGNIZE
is the more expressive oneProcessFunction
is an everything-goes backup for highly non-standard transformations.Which one is the best changes according to the specific use case. We hope that someday most, if not all, of complex pattern matching will be possible in a higher-level language like Flink SQL as this will enable analysts to define business rules themselves, without low-level coding, resulting in increased productivity.
Finally, I'd like to thank Krzysztof Zarzycki and a whole GetInData team for their invaluable support - none of it would have been possible without their help :)
---
Would like to know more about Complex Event Processing? Read this article: "Personalised offers? Business process automation? Anomaly detection? What about real-time event stream processing to achieve that?"
If you have any questions about automating decisions in real-time, please do not hesitate to contact us!
The Airbyte 0.50 release has brought some exciting changes to the platform: checkpointing (so that you don’t have to start from scratch in case of…
Read moreIn the fast-paced world of data processing, efficiency and reliability are paramount. Apache Flink SQL offers powerful tools for handling batch and…
Read moreOutstanding customer experience is usually backed by robust data analytics. Same applies to Mamava, a business that celebrates and supports…
Read moreIn this episode of the RadioData Podcast, Adama Kawa talks with Alessandro Romano about FREE NOW use cases: data, techniques, signals and the KPIs…
Read moreOne of the biggest challenges of today’s Machine Learning world is the lack of standardization when it comes to models training. We all know that data…
Read moreA data-driven approach helps companies to make decisions based on facts rather than perceptions. One of the main elements that supports this approach…
Read moreTogether, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.
What did you find most impressive about GetInData?