Tutorial
13 min read

Data Enrichment in Flink SQL using HTTP Connector For Flink - Part Two

In part one of this blog post series, we have presented a business use case which inspired us to create an HTTP connector for Flink SQL.

The use case was:
As a data analyst, I want to enrich incoming data with a Machine Learning model for further processing. 

The Machine Learning model data is served via the HTTP/GET method through an external web service system.

The logic should be expressed using SQL.

Whether it's a card payments stream, stock transactions stream or click stream from an online loan application form, enrichment is almost always needed to build a bigger context for downstream processing. This enrichment step usually involves polling data from an external system. Very often, this data can only be accessed via REST API.

Our HTTP connector allows us to use the familiar SQL JOIN query without needing to call any User Defined Function.

The SQL query for this use case is:

SELECT o.id, o.id2, c.msg, ml.uuid, ml.isActive 
FROM Orders AS o 
JOIN ML_Data FOR SYSTEM_TIME AS OF o.proc_time AS ml 
ON 
  o.id = ml.id AND o.id2 = ml.id2

In this part, we would like to discuss some technical details of our connector. As a reminder, we have published it on GitHub as an open source - GitHub - getindata/flink-http-connector: Http Connector For Flink SQL

For details on how to configure the connector, how to use it in your pipeline and how to run an example project, please take a look at this README.md document - GitHub - getindata/flink-http-connector: Http Connector For Flink SQL

Flink Unified Source interface - maybe next time

Initially, we were hoping to use Flink’s new Unified Source Interface which was proposed in FLIP-27 (FLIP-27: Refactor Source Interface - Apache Flink - Apache Software Foundation ). This new API was introduced in Flink 1.12 and solves several problems that were presented in the previous API such as "work discovery". The “work discovery” term describes the logic needed to incorporate new data into the stream. For example, it could be a Kafka topic or partition discovery during runtime. It can also be active monitoring of a source folder for File Source or active polling of the email/Slack server to see if there are any new messages to process.

We were really hoping to use it, especially since this is now a recommended method for implementing custom connectors. Unfortunately, it turned out that we couldn’t.

Unlike the JDBC connector which is a direct Database connector, Web Services rather rarely have a REST endpoint that returns an entire set of data unless this set has a manageable size. In this case however, the common practice is to return data in numerated pages, following the HATEOAS response pattern. HATEOAS stands for Hypermedia As The Engine Of Application State and in the case of pagination, it is implemented by providing links to the previous and next pages. Those links are added to the API response. 

In practice, the most common pattern simply uses filtering with HTTP GET with parameters, for example, using some kind of ID for which the data should be retrieved. In our case, the parameters used in SQL JOIN were needed to be used as this kind of ID. Passing Join parameters to the connector can be done using Flink’s Lookup Joins- Joins


Unfortunately, it turned out that Lookup Join, or rather a Lookup Table Source cannot be implemented using FLIP-27 API. The source connector implemented using a Unified Source Interface can only act as a Scan Source, meaning it will scan all rows from an external storage system during runtime. What we needed in our case was Lookup Table Source. I have started an e-mail thread about this on Flink’s user mailing list (here) where this limitation was confirmed.

In short, because we need to have a Lookup Table Source for Flink SQL, we cannot implement our connector using Unified Source Interface API since it only supports Scan Sources, and Scan Sources do not go well with REST APIs.

Lookup Function

Detailed UML diagram for HTTP connector:

getindata-big-data-blog-flink-http-connector-uml-diagram

In the core we have two main classes:

  • HttpTableLookupFunction
  • AsyncHttpTableLookupFunction

Both classes provide the link between Flink Runtime and the user code that executes HTTP calls.

As you can already see from the class name, our connector can work asynchronously.

Both classes, in order to communicate with Flink Core, have to implement a public void eval(...) method. The surprising thing here was that there was no interface or abstract class that would enforce the implementation of this method. So, as a developer, you might completely miss this. 

So, how did I find it?
Well, luckily this is mentioned in Javadoc for TableFunction and AsyncTableFunction abstract classes.

getindata-blog-big-data-flink-http-connector-table-function

It looks like reading the documentation pays off in the end. :) You may wonder how Flink “knows” that it has to call evalmethod since it is not a part of any interface or abstract method. Well it seems that it just assumes it. The Scala based TableFunctionCallGen does simply this:

    val functionCallCode =
      s"""
        |${parameters.map(_.code).mkString("\n")}
        |$functionReference.eval(${parameters.map(_.resultTerm).mkString(", ")});
        |""".stripMargin

In my opinion, it's a little bit surprising that implementing the eval method is not enforced by any Interface. It would be more intuitive and actually something that most programmers would expect to see.

Registering a New Source

To register a new source as a Table source, we need to add a factory class and register it for Java’s Service Provider Interfaces. In our case, this is the HttpLookupTableSourceFactory

The factory class has to implement the DynamicTableFactory interface and in order to be discovered by Flink it must be added to this file: 

src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory

getindata-blog-big-data-flink-http-connector-dynamic-table-function

The HttpLookupTableSourceFactory creates an instance of DynamicTableSource which in our case is HttpLookupTableSource.

getindata-big-data-blog-flink-http-connector-flink-sql-httplookup

The getLookupRuntimeProvider method will be call by the Flink’s Core in order to get the implementation of Lookup Function.

Generally, depending on your need and use case you can choose one of two providers,
LookupRuntimeProvider or ScanRuntimeProvider. The sources implementing FLIP-27 are supported only by ScanRuntimeProvider. In our use case we had to use LookupRuntimeProvider.

The LookupRuntimeProvider is further extended by TableFunctionProvider and AsyncTableFunctionProvider. We are using both, depending on connector’s asyncPolling configuration parameter.

HTTP communication

Our http connector uses Java’s 11 HTTP client to send HTTP requests. It provides a comprehensive mechanism needed to communicate with the HTTP server and it's already implemented in JDK, so no extra dependencies are needed.

Details about HttpClient can be found here.

However, for building the URI path we decided to use URIBuilder class from 3rd party library org.apache.httpcomponents from Apache (check here.)


We didn't want to code logic for building all HTTP URI variations, handling multiple request parameters etc. Using this library saved us some hassle. 

Currently we only support GET methods and the HTTP 200 OK response code. We are hoping to enhance this part in the future.

HTTP Response model

We assume that the HTTP response will be sent back to us as a JSON Object. Flink already supports JSON format for defining SQL sources as stated in JSON . However, since our connector is still in the early phase, we only support String column types for now. Therefore, we decided to provide an alternative for translating JSON Response to Table Schema. Support for Flink JSON Format will be added in the future.

The alternative mapping mechanism that we built is based on the com.jayway.jsonpath library and Json Path notation - JSON path syntax in detail . From my experience in working in Business, I have noticed that the Json Path notation can be fairly familiar to Business Analytics, especially when designing model conversion rules. In one of my previous projects, all mapping rules from raw format to the common model were prepared by BAs using the Json Path notation. 

The conversion from Json to RowData is done by HttpResultConverter class.
The HttpResultConverter  uses the connector’s definition to look up alias definitions or root node definition. If none are found, it maps the column name directly to the JSON path format. 

For complex structures, the user can define alias paths. The alias property has to follow the pattern of: field.COLUMN_NAME.path. A similar convention can be found in Flink’s DataGen SQL connector.

The value for the alias path key is a json path string. HttpResultConverter or every column checks if there is an alias. If there is, it uses the corresponding json path definition to get value from HTTP response.

For example, having below Table Definition:

CREATE TABLE Customers (
  id STRING,
  id2 STRING,
  msg STRING,
  uuid STRING,
  isActive STRING,
  balance STRING
) WITH (
  'connector' = 'rest-lookup',
  'url' = 'http://localhost:8080/client',
  'field.isActive.path' = '$.details.isActive',
  'field.balance.path' = '$.details.nestedDetails.balance'
)

Every column except isActive and balance will be converted directly to json path. For example id -> $.id, he remaining two columns will use alias paths from table definition, meaning that value for the isActive column will be taken from $.details.isActive path and value for balance column will be taken from $.details.nestedDetails.balance path.

Asynchronous support

While implementing the process function that communicates with the external system using blocking calls, it is recommended to use Flink Async I/O - Async I/O . This helps with managing the communication delay with the external system and does not dominate the streaming application’s total work. The Enrichment process is a great example where such Asynchronous support is needed. 

Luckily Flink’s Async I/O is also supported in Flink SQL. What we need to do is simply return AsyncTableFunctionProvider from HttpLookupTableSource::getLookupRuntimeProvider.
The AsyncTableFunctionProvider has to provide an object that extends the abstract class AsyncTableFunction.
In our case this is AsyncHttpTableLookupFunction class.

The AsyncTableFunction abstract class is very similar to TableFunction. The main difference is the signature of that famous eval method. Yes, the same eval method that we simply need to "know" to override. Again, the only hint that we need to implement this method can be found in Javadoc of the AsyncTableFunction. Again, it would be much more intuitive if there was just an abstract method that we needed to implement. 

In the case of AsyncTableFunction the evalmethod’s signature accepts Join Keys and a CompletableFuture object. The signature looks like this:

public void eval(CompletableFuture<Collection<RowData>> resultFuture, Object... keys)

Our implementation of eval method is based on HBaseAsyncTableFunction. Actually this class is putted as an example in AsyncTableFunction Javadoc.

The entire implementation looks like this:

public void eval(CompletableFuture<Collection<RowData>> resultFuture, Object... keys) {

    CompletableFuture<RowData> future = new CompletableFuture<>();
    future.completeAsync(() -> decorate.lookupByKeys(keys), pollingThreadPool);

    future.whenCompleteAsync(
        (result, throwable) -> {
          if (throwable != null) {
            log.error("Exception while processing Http Async request", throwable);
            resultFuture.completeExceptionally(
                new RuntimeException("Exception while processing Http Async request", throwable));
          } else {
            resultFuture.complete(Collections.singleton(result));
          }
        },
        publishingThreadPool);
  }

You can see that we used two separate thread pools. One is used by HttpClient to make a HTTP request whilst the second one is used to publish results downstream through CompletableFuture resultFuture. Having separate thread pools helps us avoid thread starvation on publishing results.

Plans For The Future

The first thing we would like to add to flink-http-connector is support for all Flink data types. This for sure would improve the usability of our connector.

The next thing would be to add support for Flink Json format which would go nicely with a well-established convention that the rest of the connectors follow regarding a complex schema definition and handling data format types like Json. We will keep our Json Path based converter since it may be helpful for users who are used to working with Json objects using Json Paths.

The last thing worth mentioning here would be actually implementing the HTTP connector using FLIP-27 for Data Stream API. The design would assume that the client will be able to plug in its custom work discovery and split creation logic strictly following the Dependency Injection pattern. This will be a challenging thing to do for SQL API, since we have to register a static factory that cannot have any dependency injection applied, at least not in its current form. However, there might be a way to get around this with a little bit of help from Spring Framework. This may sound really strange for some of the Flink guys, but there is a way to use Spring Framework as a Dependency Injection Framework with Flink. This, however, will be a subject for future blog posts.

Conclusion

In this blog post we wanted to present some technical details about our http-connector. We wanted to describe the decision process that we followed while designing this connector and why we chose to implement it that way. 

We also wanted to show you what you need to do if you would like to implement your own SQL connector. We wanted to highlight a few steps that are crucial during the implementation of such a connector,  such as implementing the eval method or registering the factoring using Java’s Service Provider Interfaces.

We hope that you enjoyed reading this blog post  and found something that might be useful in your future projects.

Have a Nice Day and Have Fun!

– --

Have you missed the first part of the blog post? Check Data Enrichment in Flink SQL using HTTP Connector For Flink - Part One and sign up for our newsletter to stay up to date!

big data
technology
flink
HTTP connector
flink sql
12 January 2022

Want more? Check our articles

transfer legacy pipeline modern using gitlab cicd
Tutorial

How we helped our client to transfer legacy pipeline to modern one using GitLab's CI/CD - Part 3

Please dive in the third part of a blog series based on a project delivered for one of our clients. Please click part I, part II to read the…

Read more
kedro snowflake getindata
Tutorial

From 0 to MLOps with ❄️ Snowflake Data Cloud in 3 steps with the Kedro-Snowflake plugin

MLOps on Snowflake Data Cloud MLOps is an ever-evolving field, and with the selection of managed and cloud-native machine learning services expanding…

Read more
radiodataquantum
Radio DaTa Podcast

Data Journey with Yetunde Dada & Ivan Danov (QuantumBlack) – Kedro (an open-source MLOps framework) – introduction, benefits, use-cases, data & insights used for its development

In this episode of the RadioData Podcast, Adam Kawa talks with Yetunde Dada & Ivan Danov  about QuantumBlack, Kedro, trends in the MLOps landscape e.g…

Read more
modern data stack gcp workflowsobszar roboczy 1 4
Tutorial

GCP Workflows - how can you integrate a lightweight managed orchestrator in Modern Data Stack? GID Modern Data Platform

Modern Data Stack has been around for some time already. Both tools and integration patterns have become more mature and battle tested. We shared our…

Read more
noweobszar roboczy 1 3

GetInData in 2022 - achievements and challenges in Big Data world

Time flies extremely fast and we are ready to summarize our achievements in 2022. Last year we continued our previous knowledge-sharing actions and…

Read more
backendobszar roboczy 1 2 3x 100
Tutorial

Data Mesh as a proper way to organise data world

Data Mesh as an answer In more complex Data Lakes, I usually meet the following problems in organizations that make data usage very inefficient: Teams…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.


What did you find most impressive about GetInData?

They did a very good job in finding people that fitted in Acast both technically as well as culturally.
Type the form or send a e-mail: hello@getindata.com
The administrator of your personal data is GetInData Poland Sp. z o.o. with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the Terms & Conditions. For more information on personal data processing and your rights please see Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy