Tutorial
8 min read

Flink with MLeap

MLOps with Stream Processing

In the big data world, more and more companies are discovering the potential in fast data processing using stream technologies. We don't need to even mention how useful Machine Learning is, as it goes without saying. 

Usually, not a lot of experts master these two domains. So, integrating your streaming architecture with the accomplishments of your data scientist team may not be so easy. 

In this article, I would like to present one of the possible ways to show how this kind of integration is possible. To achieve this, I chose Apache Flink, a streaming engine widely used across the world and MLeap, which is a serialization library for machine learning models for different programming languages. You can find more information about MLeap and how to create a model in Python and run it in Scala in my previous article here

The idée fixe of this paper is to explain how the ML model trained in Python can be used inside the Flink pipeline and provide information about the Flink library we at  GetInData are working on. Check details here.

Motivation for Flink with MLeap

Why serve ML models inside a streaming engine? Why not just reuse some HTTP service that can provide us with predictions or read (predicted and saved data) from DB? 

Here are three arguments: 

  • The need for ultra-low latencies - serving predictions inside streaming jobs is much faster as we can omit network latencies caused by connections to an HTTP server or DB. Not mentioning the fact that the network sometimes fails. 
  • Service decoupling - what if other teams change the REST JSON payload or a  Data Analyst renames the column in the database? Storing models inside Flink jobs is a simple solution. 
  • Costs - nobody likes to talk about them, still they are one of the main factors if projects are to be successful. Keeping additional HTTP servers and DBs + pipelines that compute predictions and save them to DB generates costs. Your management team doesn't like costs, I'm 100% sure about that!  

Of course, storing ML models inside Jobs create other challenges, like: 

  • How do you load the model?
  • How do you make predictions? 
  • My team is using Flink SQL. Is it possible to make predictions using this API? 

To answer these questions, we have created the Flink-MLeap library which you can find here.

Use Case 

Imagine you are a Data Scientist (or maybe you actually are) and would like to use the existing Flink infrastructure to plug your trained ML model and make predictions on streamed data. You're a very busy person and don’t want to waste time on learning totally new technology. It's alright, you're not being lazy! It's only natural, no-one can learn all the different technologies in the world. 

Let's focus on what you know, and how to use this knowledge. You're a “data guy/girl” so you've probably heard about SQL, or maybe even written thousands of queries in this language. Good! When you were reading about Flink you came across this interesting blog post about building Flink jobs with SQL, like this one here

Connecting to the Flink cluster using SQL Client and writing queries to make predictions on streams should be so easy and natural, right?

Usage

With our library, you can effortlessly serve ML models in a stream environment: 

So first we create a stream with features using SQL. In order to do that we use a datagen connector which is a helper connector in Flink, that generates a stream with random values - very useful for the development phase. 

// Create table with features
CREATE TABLE Features (
  feature1 DOUBLE NOT NULL,
  feature2 INT NOT NULL,
  feature3 DOUBLE NOT NULL,
  feature_timestamp TIMESTAMP(3))
WITH ( 
'connector' = 'datagen',
'number-of-rows' = '10',
'fields.feature1.min' = '0.0',
'fields.feature1.max' = '1.0'
)


Next we make predictions based on those features:

// Execute predictions
SELECT
  Predict(feature1, feature2, feature3) as prediction,
  Predictv2(feature1) as prediction2
FROM Features


As you can see, here we are using the SQL user defined function Predict and Predictv2 on our features set. They can take a different number of arguments and types. The names of functions and models they use can be simply defined in the configuration. 

Below you can find more technical aspects of the library, how we built it and examples of how to use and configure it.

FLINK SQL API 

We focused more on Flink SQL API and prepared more utils for this API, so someone who's not too familiar with Flink or feels more comfortable using SQL rather than Java/Scala can effortlessly use the ML model in Flink jobs. 

To this end, we prepared MLeapUDFRegistry. The main purpose of this registry is to register UDFs (Flink user defined functions) that can be later used in SQL queries. To add your UDFs, you can define them inside application.conf like this:

mleap {
	udfRegistry = [
    	{
        	udfName = "Predict"
        	bundlePath = "/mleap-example-1"
        	bundleSource = "file"
    	},
    	{
           udfName = "Predictv2"
        	bundlePath = "/mleap-example-2"
        	bundleSource = "file"
    	}
	]
}

And run  MLeapUDFRegistry.registerFromConfig(config, tableEnv) before running your queries like  we did in these example  applications: FlinkSqlWithMLeap.

...
	val env = StreamExecutionEnvironment.getExecutionEnvironment
	val tableEnv = StreamTableEnvironment.create(env)
 
	// Register UDFs basing on config
	val config = ConfigFactory.load()
	MLeapUDFRegistry.registerFromConfig(config, tableEnv)
 
...

Another thing which can be quite problematic is writing  specific UDFs for each ML Model. This is of course the easiest, but also most time-consuming approach. That's why we defined a very generic MLeapUDF so it can be easily reused for any MLeap bundle. 

Thanks to MLeapUDFRegistry and MLeapUDF, using ML models with SQL goes very smoothly. Just look at the FlinkSqlWithMLeap applications. Anybody who knows SQL and has a ML model can easily use them with Flink.

Bon appetit!

Code revive

Let's have  a quick look at the code. We wrote this project in Scala. It contains two modules: 

  • lib - with library classes
  • example - with usage examples

In the library module we covered two Flink APIs: Streaming and SQL so they can be reused in any job.

Loading ML models

In order to load a model we need to create one first. I will reuse the models presented in the previous article.

Random forest regressors take one float as input and give one float as output.  

To load the models into the Flink jobs, we created BundleLoaders. One is FileBundleLoader which loads bundles from local files. The other is GCSBundleLoader, which can fetch models from Google Cloud Storage Bucket and use them in Flink jobs. 

Streaming API 

In our library we focus more on SQL examples because the audience for that feature is larger. I believe more Data Scientists know SQL than Java. Having said that, the Streaming API was a good starting point for checking if it was even possible to run jobs with MLeap models. 

In MleapMapFunction we presented a way to use MLeap bundles. We loaded the model in the open method. 

case class MleapMapFunction(bundleName: String, bundleLoader: BundleLoader) extends
  RichMapFunction[Double, Double] {
 
  private val LOG = LoggerFactory.getLogger(classOf[MleapMapFunction])
  @transient var transformer: Transformer = _
 
  override def open(parameters: Configuration): Unit = {
	transformer = bundleLoader.loadBundle(bundleName) match {
  	case Failure(exception) => {
    	LOG.error(s"Error while loading bundle: $bundleName", exception)
    	throw BundleLoadProblem(exception)
  	}
  	case Success(value) => value
	}
  }
 
  override def map(value: Double): Double = {
	val dataset = Seq(Row(DenseTensor(Array(value), List(1))))
	val frame = DefaultLeapFrame(transformer.inputSchema, dataset)
	val res = transformer.transform(frame).get.dataset.head(1).asInstanceOf[Double]
	res
  }
}


Then in the map method we make predictions. As you can see, it was a very straightforward solution. 

To test it we implemented a simple Fink job FlinkDatastreamWithMleap:

object FlinkDatastreamWithMleap {
  def main(args: Array[String]): Unit = {
 
	implicit val typeInfo = TypeInformation.of(classOf[StructType])
	val env = StreamExecutionEnvironment.getExecutionEnvironment
 
	val rand: Random = new Random()
 
	val text = env.fromElements(rand.nextDouble(), rand.nextDouble(), rand.nextDouble())
	val bundlePath = getClass.getResource("/mleap-example-1").toString
 
	text.map(MleapMapFunction(bundlePath, FileBundleLoader)).print()
 
	env.execute()
  }
}

Never stop improving 

There is no such system/lib that cannot be enriched or improved. The same thing goes for this library. The main things that we would like to improve on are: 

  • test our generic UDF with more complicated ML models,
  • prepare examples of how to use it with Kubernetes,
  • add support for other ML model serializers similar to MLeap like: PMML, which will provide support for more ML libraries. 

Interested in ML and MLOps solutions? How to improve ML processes and scale project deliverability? Watch our MLOps demo and sign up for a free consultation.

streaming
apache flink
flink
stream processing
ML Model
MLeap
ML
22 February 2022

Want more? Check our articles

1 RsDrT5xOpdAcpehomqlOPg
Big Data Event

2³ Reasons To Speak at Big Data Tech Warsaw 2020 (February 27th, 2020)

Big Data Technology Warsaw Summit 2020 is fast approaching. This will be 6th edition of the conference that is jointly organised by Evention and…

Read more
dbt machine learning getindataobszar roboczy 1 4
Tutorial

dbt & Machine Learning? It is possible!

In one of our recent blog posts Announcing the GetInData Modern Data Platform - a self-service solution for Analytics Engineers we shared with you our…

Read more
włdek blogobszar roboczy 1 4x 100
Tutorial

Artificial Intelligence regulatory initiatives of EU countries

AI regulatory initiatives of EU countries On April 21, 2021, the EU Commission adopted a proposal for a regulation on artificial intelligence…

Read more
cloudwall getindata streaming
Success Stories

Streaming Analytics for the Digital Asset Risk Management System - Cloudwall Success Story

How to minimize data processing latency to hundreds of milliseconds when dealing with 100 mln messages per hour? How can data quality be secure and…

Read more
getindata success story izettle stream processing
Success Stories

Success Story: Fintech data platform gets a boost from stream processing

A partnership between iZettle and GetInData originated in the form of a two-day workshop focused on analyzing iZettle’s needs and exploring multiple…

Read more
1 6ZTvzJwCviqIJcV5WQC0Sg
Big Data Event

Truecaller, GetInData and Google’s contribution to Big Data Tech Warsaw Summit

GetInData, Google and Truecaller participate in the Big Data Tech Warsaw Summit 2019. It’s already less than two weeks to the 5th edition of Big Data…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.


What did you find most impressive about GetInData?

They did a very good job in finding people that fitted in Acast both technically as well as culturally.
Type the form or send a e-mail: hello@getindata.com
The administrator of your personal data is GetInData Poland Sp. z o.o. with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the Terms & Conditions. For more information on personal data processing and your rights please see Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy