Tutorial
8 min read

Data Enrichment in Flink SQL using HTTP Connector For Flink - Part One

HTTP Connector For Flink SQL 

In our projects at GetInData, we work a lot on scaling out our client's data engineering capabilities by enabling more people to build data-driven applications, not only software engineers. Our way to enable non-engineers is to give them a widely popular and higher-level data processing definition language, like SQL.

One of our company’s specialities is projects where we need to process intensive data streams. The processing itself is not simple transformations but also enrichment with data from external data sources, stream joins and complex aggregations. These, while still state-of-the-art are currently becoming increasingly more often solved with frameworks like Apache Flink, Spark or GCP Dataflow. That’s what we use in our projects.

But now, how can we give this power to non-engineers to define this complex processing in SQL? This is what we are intensively working on internally at GetInData.

In this blog post I will focus on the use case we came across: using Flink SQL to enrich data streams with data accessed by HTTP API. We will focus on the business side, whilst part two will highlight some technical details of this implementation.

flink connector http getindata

Apache Flink - SQL

The Apache Flink Platform is an open source project that supports low-latency stream processing on a large scale. Apache Flink is a cluster of nodes where stateful data processing jobs are distributed amongst the worker nodes. 

Flink provides ANSI standard-compliant SQL API. It is implemented through Flink-SQL which can be used to define data processing pipelines and express Data Sources, Sinks and data transformation functions, including Pattern Recognition.

Use case

The uses case we were working on was fairly straightforward:

"As a data analyst, I want to enrich incoming data with a Machine Learning model for further processing.

The Machine Learning model data is served via HTTP/GET API through an external web service system. 

The logic should be expressed using SQL."

You can also imagine another case:

"As a data analyst, I want to enrich incoming data streams about loans with detailed user metadata. 

The user data is kept in an external system and served via HTTP/GET API through an external web service system.

The logic should be expressed using SQL."

Any story, whether it's a card payments stream, stock transactions stream or click stream from an online loan application form, can be used here since enrichment is almost always needed to build up a bigger context for downstream processing. This enrichment step usually involves polling data from an external system. In many cases, this data can only be accessed by REST API. 

The problem

The enrichment with external data is usually implemented using User Defined Function - UDF. In this case the usage example could look something like this:

SELECT dedicatedOrdersEnrichmen(orderId) FROM orders

In the case of using an ML-based service the generic function could look like this:

SELECT genericEnrichment(orderId, "http://3rdpartyservice.com/service/ml"") FROM orders

Immediately we can spot two things:

  1. The user needs to know that dedicatedOrdersEnrichmen exists.
  2. The user needs to know what kind of parameters dedicatedOrdersEnrichmen expects. In other words, the user has to know how to use this UDF.

The challenge here is the reusability of dedicatedOrdersEnrichmen. Making this UDF very specific would make it more user-friendly. Fewer parameters would be required, making the query simpler but we would require a new UDF for each new case. New UDFs would require a skilled Java programmer to implement them. Since new UDFs have to be implemented, deployed and managed, this can increase time-to-market for such a functionality. 

On the other hand, we could implement it once in a generic and reusable way like in the genericEnrichment example, but that makes it much less user-friendly and more prone to errors. 

Finding a good balance between both would be hard and specific to every use case.

What If…

What if I told you that there is another way, a better way?

What if we could enrich it as if it was a standard SQL JOIN?

Let's break down the use case:

  • We have a table with initial data - orders.
  • We have a table with Machine Learning model data - ML_Data.
  • Records from both tables can be joined with a key.

The SQL query in that case would be:

SELECT Orders.\*, ML_Data.\* FROM Orders AS o JOIN ML_Data AS ml ON o.id = ml.id

Simple as that. 

Can we use the same approach for Flink-SQL? Well, now thanks to our http-flink-connector, we can. 

The flink-http-connector, which we made available as an Open Source allows us to define Flink SQL tables that acts as a data source for enrichment. Such a table can be referred to in the SQL JOIN query. You can check out the repository from here.

Use

With http-flink-connector we can define a Data table like so:

CREATE TABLE ML_Data (

  id STRING,

  id2 STRING,

  msg STRING,

  uuid STRING,

  isActive STRING,

  balance STRING

) WITH (

  'connector' = 'rest-lookup',

  'url' = 'http://localhost:8080/client'

)

This newly created table, as a matter of fact, is a new data source, created using pure Flink SQL without using any programming language like Java or Scala. Only SQL and few configuration parameters are enough to express that this table is backed by an external Web Service. 

We can define many such sources where each of them can have different schemas and use different external services. Those sources will act as standard SQL tables and can be used by analysts.

The url parameter defines the base URL for REST API used to fetch the data from the external system. Currently, only the HTTP/GET method is supported.

The Flink SQL query that would fulfill our use case has to use the so-called “Lookup Join”. Without getting too much into the details, the Lookup Join passes the JOIN arguments to the connector. The Connector can use those arguments to build the HTTP request.

The SQL for the enrichment join using two arguments id and id2 with the table backed by a http-connector would look like this:

SELECT o.id, o.id2, c.msg, ml.uuid, ml.isActive FROM Orders AS o

JOIN ML_Data FOR SYSTEM_TIME AS OF o.proc_time AS ml ON o.id = ml.id AND o.id2 = ml.id2

The query looks almost like a standard SQL query using the familiar concept of JOIN to express the business use case. 

The SYSTEM_TIME AS OF o.proc_time part is currently required for Flink to execute this as a Lookup Join and will be discussed a little bit more in Part Two of this Blog Post.

Conclusion

The SQL is used in everyday work in the Big Data world. Not having to context switch between its dialects and not needing to memorize specific commands or configuration options is something that could be appreciated by end users. 

In this Blog Post we described how the enrichment with data from an external system using its REST API can be expressed as a Flink SQL query using our open source http-flink-connector. The connector is based on Flink’s concept of Lookup Joins. 

In GetInData we fill the gap for a use case where the data cannot be accessed by a direct database connection but rather, what seems to be a more common use case, is exposed through Rest API of external Web Service.

Because we believe that this might be a useful tool for others, we decided to make this connector public under the open source license (HTTP Flink Connector). 

Please keep in mind that this connector is still in its early phase, although we hope it will develop in the future.

The implementation details and possible enhancements will be described in the second part of this blog post, so please stay tuned.

Happy Coding!!!

---

Did you like this blog post? Check out our other blogs and sign up for our newsletter to stay up to date!

big data
apache flink
flink
HTTP connector
flink connector
flink sql
14 December 2021

Want more? Check our articles

8e8a6167
Big Data Event

A Review of the Presentations at the DataMass Gdańsk Summit 2022

The 4th edition of DataMass, and the first one we have had the pleasure of co-organizing, is behind us. We would like to thank all the speakers for…

Read more
getindator data metrics shown on modern visualization being che 643c6b8e 8140 4873 b9b9 3188291a0ef9
Whitepaper

Data Quality Rules: enforcing reliability of datasets. Data Quality Assurance using AWS Glue DataBrew

In today's data-driven world, maintaining the quality and integrity of your data is paramount. Ensuring that organizations' datasets are accurate…

Read more
apache2xobszar roboczy 1 4
Tutorial

Introduction to GeoSpatial streaming with Apache Spark and Apache Sedona

We are  producing more and more geospatial data these days. Many companies struggle to analyze and process such data, and a lot of this data comes…

Read more
datamass getindata adoption genai
Big Data Event

A Review of the Presentations at the DataMass Gdańsk Summit 2023

The Data Mass Gdańsk Summit is behind us. So, the time has come to review and summarize the 2023 edition. In this blog post, we will give you a review…

Read more
1712737211456
Big Data Event

A Review of the Big Data Technology Warsaw Summit 2024! Part 1: Takeaways from Spotify, Dropbox, Ververica, Hellofresh and Agile Lab

It was epic, the 10th edition of the Big Data Tech Warsaw Summit - one of the most tech oriented data conferences in this field. Attending the Big…

Read more
mariusz blogobszar roboczy 1 4x 100
Tutorial

OAuth2-based authentication on Istio-powered Kubernetes clusters

You have just installed your first Kubernetes cluster and installed Istio to get the full advantage of Service Mesh. Thanks to really awesome…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.


What did you find most impressive about GetInData?

They did a very good job in finding people that fitted in Acast both technically as well as culturally.
Type the form or send a e-mail: hello@getindata.com
The administrator of your personal data is GetInData Poland Sp. z o.o. with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the Terms & Conditions. For more information on personal data processing and your rights please see Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy