In part one of this blog post series, we have presented a business use case which inspired us to create an HTTP connector for Flink SQL.
The use case was:
As a data analyst, I want to enrich incoming data with a Machine Learning model for further processing.
The Machine Learning model data is served via the HTTP/GET method through an external web service system.
The logic should be expressed using SQL.
Whether it's a card payments stream, stock transactions stream or click stream from an online loan application form, enrichment is almost always needed to build a bigger context for downstream processing. This enrichment step usually involves polling data from an external system. Very often, this data can only be accessed via REST API.
Our HTTP connector allows us to use the familiar SQL JOIN query without needing to call any User Defined Function.
The SQL query for this use case is:
SELECT o.id, o.id2, c.msg, ml.uuid, ml.isActive
FROM Orders AS o
JOIN ML_Data FOR SYSTEM_TIME AS OF o.proc_time AS ml
ON
o.id = ml.id AND o.id2 = ml.id2
In this part, we would like to discuss some technical details of our connector. As a reminder, we have published it on GitHub as an open source - GitHub - getindata/flink-http-connector: Http Connector For Flink SQL
For details on how to configure the connector, how to use it in your pipeline and how to run an example project, please take a look at this README.md document - GitHub - getindata/flink-http-connector: Http Connector For Flink SQL
Flink Unified Source interface - maybe next time
Initially, we were hoping to use Flink’s new Unified Source Interface which was proposed in FLIP-27 (FLIP-27: Refactor Source Interface - Apache Flink - Apache Software Foundation ). This new API was introduced in Flink 1.12 and solves several problems that were presented in the previous API such as "work discovery". The “work discovery” term describes the logic needed to incorporate new data into the stream. For example, it could be a Kafka topic or partition discovery during runtime. It can also be active monitoring of a source folder for File Source or active polling of the email/Slack server to see if there are any new messages to process.
We were really hoping to use it, especially since this is now a recommended method for implementing custom connectors. Unfortunately, it turned out that we couldn’t.
Unlike the JDBC connector which is a direct Database connector, Web Services rather rarely have a REST endpoint that returns an entire set of data unless this set has a manageable size. In this case however, the common practice is to return data in numerated pages, following the HATEOAS response pattern. HATEOAS stands for Hypermedia As The Engine Of Application State and in the case of pagination, it is implemented by providing links to the previous and next pages. Those links are added to the API response.
In practice, the most common pattern simply uses filtering with HTTP GET with parameters, for example, using some kind of ID for which the data should be retrieved. In our case, the parameters used in SQL JOIN were needed to be used as this kind of ID. Passing Join parameters to the connector can be done using Flink’s Lookup Joins- Joins
Unfortunately, it turned out that Lookup Join, or rather a Lookup Table Source cannot be implemented using FLIP-27 API. The source connector implemented using a Unified Source Interface can only act as a Scan Source, meaning it will scan all rows from an external storage system during runtime. What we needed in our case was Lookup Table Source. I have started an e-mail thread about this on Flink’s user mailing list (here) where this limitation was confirmed.
In short, because we need to have a Lookup Table Source for Flink SQL, we cannot implement our connector using Unified Source Interface API since it only supports Scan Sources, and Scan Sources do not go well with REST APIs.
Lookup Function
Detailed UML diagram for HTTP connector:
In the core we have two main classes:
HttpTableLookupFunction
AsyncHttpTableLookupFunction
Both classes provide the link between Flink Runtime and the user code that executes HTTP calls.
As you can already see from the class name, our connector can work asynchronously.
Both classes, in order to communicate with Flink Core, have to implement a public void eval(...)
method. The surprising thing here was that there was no interface or abstract class that would enforce the implementation of this method. So, as a developer, you might completely miss this.
So, how did I find it?
Well, luckily this is mentioned in Javadoc for TableFunction
and AsyncTableFunction
abstract classes.
It looks like reading the documentation pays off in the end. :) You may wonder how Flink “knows” that it has to call eval
method since it is not a part of any interface or abstract method. Well it seems that it just assumes it. The Scala based TableFunctionCallGen
does simply this:
val functionCallCode =
s"""
|${parameters.map(_.code).mkString("\n")}
|$functionReference.eval(${parameters.map(_.resultTerm).mkString(", ")});
|""".stripMargin
In my opinion, it's a little bit surprising that implementing the eval method is not enforced by any Interface. It would be more intuitive and actually something that most programmers would expect to see.
Registering a New Source
To register a new source as a Table source, we need to add a factory class and register it for Java’s Service Provider Interfaces. In our case, this is the HttpLookupTableSourceFactory
The factory class has to implement the DynamicTableFactory
interface and in order to be discovered by Flink it must be added to this file:
src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
The HttpLookupTableSourceFactory
creates an instance of DynamicTableSource
which in our case is HttpLookupTableSource
.
The getLookupRuntimeProvider
method will be call by the Flink’s Core in order to get the implementation of Lookup Function.
Generally, depending on your need and use case you can choose one of two providers,
LookupRuntimeProvider
or ScanRuntimeProvider
. The sources implementing FLIP-27 are supported only by ScanRuntimeProvider
. In our use case we had to use LookupRuntimeProvider
.
The LookupRuntimeProvider
is further extended by TableFunctionProvider
and AsyncTableFunctionProvider
. We are using both, depending on connector’s asyncPolling
configuration parameter.
HTTP communication
Our http connector uses Java’s 11 HTTP client to send HTTP requests. It provides a comprehensive mechanism needed to communicate with the HTTP server and it's already implemented in JDK, so no extra dependencies are needed.
Details about HttpClient
can be found here.
However, for building the URI path we decided to use URIBuilder
class from 3rd party library org.apache.httpcomponents
from Apache (check here.)
We didn't want to code logic for building all HTTP URI variations, handling multiple request parameters etc. Using this library saved us some hassle.
Currently we only support GET methods and the HTTP 200 OK response code. We are hoping to enhance this part in the future.
HTTP Response model
We assume that the HTTP response will be sent back to us as a JSON Object. Flink already supports JSON format for defining SQL sources as stated in JSON . However, since our connector is still in the early phase, we only support String column types for now. Therefore, we decided to provide an alternative for translating JSON Response to Table Schema. Support for Flink JSON Format will be added in the future.
The alternative mapping mechanism that we built is based on the com.jayway.jsonpath
library and Json Path notation - JSON path syntax in detail . From my experience in working in Business, I have noticed that the Json Path notation can be fairly familiar to Business Analytics, especially when designing model conversion rules. In one of my previous projects, all mapping rules from raw format to the common model were prepared by BAs using the Json Path notation.
The conversion from Json to RowData
is done by HttpResultConverter
class.
The HttpResultConverter
uses the connector’s definition to look up alias definitions or root node definition. If none are found, it maps the column name directly to the JSON path format.
For complex structures, the user can define alias paths. The alias property has to follow the pattern of: field.COLUMN_NAME.path
. A similar convention can be found in Flink’s DataGen SQL connector.
The value for the alias path key is a json path string. HttpResultConverter
or every column checks if there is an alias. If there is, it uses the corresponding json path definition to get value from HTTP response.
For example, having below Table Definition:
CREATE TABLE Customers (
id STRING,
id2 STRING,
msg STRING,
uuid STRING,
isActive STRING,
balance STRING
) WITH (
'connector' = 'rest-lookup',
'url' = 'http://localhost:8080/client',
'field.isActive.path' = '$.details.isActive',
'field.balance.path' = '$.details.nestedDetails.balance'
)
Every column except isActive
and balance
will be converted directly to json path. For example id -> $.id
, he remaining two columns will use alias paths from table definition, meaning that value for the isActive
column will be taken from $.details.isActive
path and value for balance
column will be taken from $.details.nestedDetails.balance
path.
Asynchronous support
While implementing the process function that communicates with the external system using blocking calls, it is recommended to use Flink Async I/O - Async I/O . This helps with managing the communication delay with the external system and does not dominate the streaming application’s total work. The Enrichment process is a great example where such Asynchronous support is needed.
Luckily Flink’s Async I/O is also supported in Flink SQL. What we need to do is simply return AsyncTableFunctionProvider
from HttpLookupTableSource::getLookupRuntimeProvider
.
The AsyncTableFunctionProvider
has to provide an object that extends the abstract class AsyncTableFunction
.
In our case this is AsyncHttpTableLookupFunction
class.
The AsyncTableFunction
abstract class is very similar to TableFunction
. The main difference is the signature of that famous eval
method. Yes, the same eval
method that we simply need to "know" to override. Again, the only hint that we need to implement this method can be found in Javadoc of the AsyncTableFunction
. Again, it would be much more intuitive if there was just an abstract method that we needed to implement.
In the case of AsyncTableFunction
the eval
method’s signature accepts Join Keys and a CompletableFuture
object. The signature looks like this:
public void eval(CompletableFuture<Collection<RowData>> resultFuture, Object... keys)
Our implementation of eval method is based on HBaseAsyncTableFunction
. Actually this class is putted as an example in AsyncTableFunction
Javadoc.
The entire implementation looks like this:
public void eval(CompletableFuture<Collection<RowData>> resultFuture, Object... keys) {
CompletableFuture<RowData> future = new CompletableFuture<>();
future.completeAsync(() -> decorate.lookupByKeys(keys), pollingThreadPool);
future.whenCompleteAsync(
(result, throwable) -> {
if (throwable != null) {
log.error("Exception while processing Http Async request", throwable);
resultFuture.completeExceptionally(
new RuntimeException("Exception while processing Http Async request", throwable));
} else {
resultFuture.complete(Collections.singleton(result));
}
},
publishingThreadPool);
}
You can see that we used two separate thread pools. One is used by HttpClient
to make a HTTP request whilst the second one is used to publish results downstream through CompletableFuture resultFuture
. Having separate thread pools helps us avoid thread starvation on publishing results.
Plans For The Future
The first thing we would like to add to flink-http-connector is support for all Flink data types. This for sure would improve the usability of our connector.
The next thing would be to add support for Flink Json format which would go nicely with a well-established convention that the rest of the connectors follow regarding a complex schema definition and handling data format types like Json. We will keep our Json Path based converter since it may be helpful for users who are used to working with Json objects using Json Paths.
The last thing worth mentioning here would be actually implementing the HTTP connector using FLIP-27 for Data Stream API. The design would assume that the client will be able to plug in its custom work discovery and split creation logic strictly following the Dependency Injection pattern. This will be a challenging thing to do for SQL API, since we have to register a static factory that cannot have any dependency injection applied, at least not in its current form. However, there might be a way to get around this with a little bit of help from Spring Framework. This may sound really strange for some of the Flink guys, but there is a way to use Spring Framework as a Dependency Injection Framework with Flink. This, however, will be a subject for future blog posts.
Conclusion
In this blog post we wanted to present some technical details about our http-connector. We wanted to describe the decision process that we followed while designing this connector and why we chose to implement it that way.
We also wanted to show you what you need to do if you would like to implement your own SQL connector. We wanted to highlight a few steps that are crucial during the implementation of such a connector, such as implementing the eval
method or registering the factoring using Java’s Service Provider Interfaces.
We hope that you enjoyed reading this blog post and found something that might be useful in your future projects.
Have a Nice Day and Have Fun!
– --
Have you missed the first part of the blog post? Check Data Enrichment in Flink SQL using HTTP Connector For Flink - Part One and sign up for our newsletter to stay up to date!