Tutorial
8 min read

ETL 2.0 Why you should switch into stream processing

If you are looking at Nifi to help you in your data ingestions pipeline, there might be an interesting alternative.

Let’s assume we want to simply dump facts from Kafka to HDFS without any transformations - assuming  short latency isn’t crucial, but desirable. We have hundreds of topics from small to really huge ones, and all messages are in Avro format. Additionally some data is produced hourly or daily,  and some are produced online. Facts will be used to power tables on Hive as external tables which means there is a need of avoiding big amounts of small files. Otherwise HDFS along with Hive might have hard times with performance.

How to go about it? Fast research in Google suggests 3 options:

  • Apache Gobblin or Camus – MapReduce framework, so it’s batch, a bit of outdated technology - leave it for old-school programmers
  • Kafka HDFS 2 sink connector – Kafka Connect it’s great, scalable framework. That connector was used in one of the projects in GetInData, but had some bugs connected with Avro format. Due to that it consumed a lot of resources and there were difficulties during schema upgrade. It produced corrupted files during Hive Metastore problems etc. Using that in bigger projects won’t be easy.
  • Kafka HDFS 3 sink connector – it’s under the Confluent Enterprise License, you can test it for 30 days for free. It’s worth trying if you paid for the license.
  • Apache Flume – agent-based solution. It is hard to get Flume to process data exactly-once. Nevertheless, it's a deprecated technology (even by Cloudera)

So, in my opinion  none of the above seems to be good enough for the case. Let's build a well-fitted solution then!

In one of the projects in GetInData we use NIFI as a tool of choice for data ingestion, as it allows quite easily to build flow for dumping data. Messages can be buffered in a loop and merged before putting on HDFS. It works surprisingly well, but it does not guarantee at least once processing. 

You have to be aware that NIFI simply processes flow files (facts in this case), which are not shared or replicated between cluster nodes. Therefore there is a risk of data loss or incorrect order of data chunks, when node is down some data is not available and may be lost. There is no guarantee of recovery flow files after restoring a dead node. NIFI does not support high availability of data in this sense.

It is a real problem when you want to build some automation for reporting and there is a risk that source tables are not consistent. You have to:

  • monitor data consistency 
  • fix everything in case of problems, what means manual interventions and reconciliations

Monitoring of consistency requires gathering counts from Kafka topics and comparison with Hive tables or HDFS files. It's a batch task - it can be checked daily or hourly but you get information with delay. Reports may be inconsistent so analytics can’t rely on it.

There are more problems with NIFI. Maintenance of environments will be challenging as there is no CI/CD support. Stability depends on running processes, it’s worth to isolate core processes on dedicated clusters which is another extra effort.

Why not to use a leading, streaming framework like Flink? It is fast, scalable, gives exactly once processing for granted, works well with CI/CD pipelines. Moreover, it is easy to maintain Flink jobs with e.g., Ververica Platform, Kubernetes or good old Yarn. Auto-retry is your concern? No problem, Flink has a built-in auto-retry based on checkpointing.

Checkpoint in Flink world is a job state. Flink creates a periodic barrier which goes through flow. Task handles the barrier by writing its state to the resilient storage. In the end we have a consistent state of the whole flow. Conclusion is straightforward – make frequent checkpoints if you want to recover fast.

Let’s create flow for the study case and start with a very simple solution – Kafka consumer as source and file sink with bucketing assigner for writing to HDFS. Everything out of the box. Set the checkpoint interval to 60 seconds and it’s done! Not really, because Flink creates new files on checkpoints. That way it can guarantee processing exactly once. In a pessimistic scenario you can have “N” files on a checkpoint for every topic where “N” is sink parallelism. Thousands files per minute will kill HDFS and Hive performance. Remember to avoid small files while working with HDFS!

Flink SQL doesn’t solve the problem. You can try to by-pass the problem and write to the Hive managed table and enable compaction on it. It’s just a go-around solution, compaction is an expensive and asynchronous process. Read operation from fragmented tables may be very slow. Moreover it will be hard to configure it for thousands of topics.

The real problem is to disjoin checkpointing with new files creation. We can do it by writing in a file format which supports commit operations, like Iceberg, Delta Lake or Hudi.

Commit operation is built in the file format and Flink does not have to care about it but just writes records in streaming fashion and commits on checkpoint.

Unfortunately, in our case we have to consider other solutions. Handling any of these formats on Hive requires altering Tez processing engine by e.g. Spark. It would be too revolutionary change for the current data platform.

Flink is stateful and it is possible to buffer messages in a persistent state (e.g. RocksDB). Messages can be released periodically by timer. Latency is not critical in that case. You can write to state storage using KeyedProcessFunction and set a timer to trigger releasing. It works as needed, but it is slow. Let’s go through the problems and fix them.

  1.  Write to state storage is slow.
    Don’t write events one-by-one, use chunks instead. KeyedProcessFunction has to load the value state by key, it is good to group events before by time window with short time size. Avoid the count window because it uses a state counter (slow!) and doesn’t guarantee releasing all events in a determined time.
    The fastest way of writing to state is adding to ListState, because you avoid read operation.
  2. State is too big, reading it is slow and may cause Out Of Memory Exception (OOM).
    It’s possible to keep just meta information in value state connected with function key and load list state dynamically (e.g. in processElement method). I put the number of chunks and last chunk size as a Flink function state. List state descriptor is created based on last chunk number and loaded on demand. OnTimer will release records from all chunks one-by-one. Configure memory to load the whole chunk into heap.
  3. Serialization/deserialization Avro is slow and it’s executed between tasks.
    Don’t deserialize Avro on consumer. For proper distribution deserialize only meta information e.g. topic name, without Avro content. Store bytes content in state, deserialize only before write. Notice that it’s only an Avro dump, so no modifications are needed.
  4. Load isn’t balanced well between tasks.
    Some topics will be huge, some very small. I’ve grouped them based on a prepared configuration and distributed by key [group_name]_[random]. It’s good to make random ranges depending on parallelism and tune up empirically. That key is also used in ProcessKeyFunction for buffering/releasing messages.
  5. Sink writes to N files on the checkpoint.
    Distribute using keyBy with topic name.
  6. Sink write performance is slow, because of the previous step!
    Do the same trick like in point 4. My key is [topic_name]_[random] where random range is based on released events’ size. Keeping random range equal 1 enforce single thread write. Bigger range increases the probability of concurrent write.

ETL 2.0 Stream Processing

The flow is very simple, has good performance and is well balanced.

  • Source: Kafka consumer – consume kafka messages, decode topic name, add topic’s group information with random number for distribution. Output contains avro as bytes, without deserialization.
  • Collecting messages (small bulks) – time window function.
  • Collecting, releasing by cron – buffer messages in state and release by timer.
  • Distribute messages (boost) – handles group-by key generation based on events’ size (increase write parallelism when needed).
  • Sink Writer – Deserialize avro and store in proper format/location
That solution is a mix of streaming and batch approach. It has streaming advantages like easy maintenance and monitoring, no scheduling, it is faster and scalable, but in fact it is not a real streaming because of latency.

Notice that some exceptions may occur after storing state like data error (e.g., Avro deserialization on the last step) will stop whole flow, so proper exception handling is critical. Backfill may require extended checkpoint timeout/tolerable checkpoint failures.

Application was implemented using Flink 1.11. The latest release 1.12 introduced a lot of new features. Very interesting seems to be Batch execution mode in DataStream API. With that we could trigger jobs periodically, skipping buffering and releasing data in Flink state.

big data
apache nifi
avro
bigdatatech
camus
flink
hdfs
kafka
stream processing
big data project
12 May 2021

Want more? Check our articles

power of big data ii obszar roboczy 1 3x 100
Tutorial

Power of Big Data: Healthcare

Welcome to another Power of Big Data series post. In the series, we present the possibilities offered by solutions related to the management, analysis…

Read more
extracting fling flame graphobszar roboczy 1 4
Tutorial

Extracting Flink Flame Graph data for offline analysis

Introduction - what are Flame Graphs? In Developer life there is a moment when the application that we create does not work as efficiently as we would…

Read more
ingartboard 1 1 100
Success Stories

Customer Story: Platform focused on centralizing data sources and democratization of data with ING

The client who needs Data Analytics Platform ING is a global bank with a European base, serving large corporations, multinationals and financial…

Read more
1 RsDrT5xOpdAcpehomqlOPg
Big Data Event

2³ Reasons To Speak at Big Data Tech Warsaw 2020 (February 27th, 2020)

Big Data Technology Warsaw Summit 2020 is fast approaching. This will be 6th edition of the conference that is jointly organised by Evention and…

Read more
datamass getindata adoption genai
Big Data Event

A Review of the Presentations at the DataMass Gdańsk Summit 2023

The Data Mass Gdańsk Summit is behind us. So, the time has come to review and summarize the 2023 edition. In this blog post, we will give you a review…

Read more
bloggcpobszar roboczy 1 4
Tutorial

Data isolation in tenant architecture on the Google Cloud Platform (GCP)

Multi-tenant architecture, also known as multi-tenancy, is a software architecture in which a single instance of software runs on a server and serves…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.


What did you find most impressive about GetInData?

They did a very good job in finding people that fitted in Acast both technically as well as culturally.
Type the form or send a e-mail: hello@getindata.com
The administrator of your personal data is GetInData Poland Sp. z o.o. with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the Terms & Conditions. For more information on personal data processing and your rights please see Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy