8 min read

ETL 2.0 Why you should switch into stream processing

If you are looking at Nifi to help you in your data ingestions pipeline, there might be an interesting alternative.

Let’s assume we want to simply dump facts from Kafka to HDFS without any transformations - assuming  short latency isn’t crucial, but desirable. We have hundreds of topics from small to really huge ones, and all messages are in Avro format. Additionally some data is produced hourly or daily,  and some are produced online. Facts will be used to power tables on Hive as external tables which means there is a need of avoiding big amounts of small files. Otherwise HDFS along with Hive might have hard times with performance.

How to go about it? Fast research in Google suggests 3 options:

  • Apache Gobblin or Camus – MapReduce framework, so it’s batch, a bit of outdated technology - leave it for old-school programmers
  • Kafka HDFS 2 sink connector – Kafka Connect it’s great, scalable framework. That connector was used in one of the projects in GetInData, but had some bugs connected with Avro format. Due to that it consumed a lot of resources and there were difficulties during schema upgrade. It produced corrupted files during Hive Metastore problems etc. Using that in bigger projects won’t be easy.
  • Kafka HDFS 3 sink connector – it’s under the Confluent Enterprise License, you can test it for 30 days for free. It’s worth trying if you paid for the license.
  • Apache Flume – agent-based solution. It is hard to get Flume to process data exactly-once. Nevertheless, it's a deprecated technology (even by Cloudera)

So, in my opinion  none of the above seems to be good enough for the case. Let's build a well-fitted solution then!

In one of the projects in GetInData we use NIFI as a tool of choice for data ingestion, as it allows quite easily to build flow for dumping data. Messages can be buffered in a loop and merged before putting on HDFS. It works surprisingly well, but it does not guarantee at least once processing. 

You have to be aware that NIFI simply processes flow files (facts in this case), which are not shared or replicated between cluster nodes. Therefore there is a risk of data loss or incorrect order of data chunks, when node is down some data is not available and may be lost. There is no guarantee of recovery flow files after restoring a dead node. NIFI does not support high availability of data in this sense.

It is a real problem when you want to build some automation for reporting and there is a risk that source tables are not consistent. You have to:

  • monitor data consistency 
  • fix everything in case of problems, what means manual interventions and reconciliations

Monitoring of consistency requires gathering counts from Kafka topics and comparison with Hive tables or HDFS files. It's a batch task - it can be checked daily or hourly but you get information with delay. Reports may be inconsistent so analytics can’t rely on it.

There are more problems with NIFI. Maintenance of environments will be challenging as there is no CI/CD support. Stability depends on running processes, it’s worth to isolate core processes on dedicated clusters which is another extra effort.

Why not to use a leading, streaming framework like Flink? It is fast, scalable, gives exactly once processing for granted, works well with CI/CD pipelines. Moreover, it is easy to maintain Flink jobs with e.g., Ververica Platform, Kubernetes or good old Yarn. Auto-retry is your concern? No problem, Flink has a built-in auto-retry based on checkpointing.

Checkpoint in Flink world is a job state. Flink creates a periodic barrier which goes through flow. Task handles the barrier by writing its state to the resilient storage. In the end we have a consistent state of the whole flow. Conclusion is straightforward – make frequent checkpoints if you want to recover fast.

Let’s create flow for the study case and start with a very simple solution – Kafka consumer as source and file sink with bucketing assigner for writing to HDFS. Everything out of the box. Set the checkpoint interval to 60 seconds and it’s done! Not really, because Flink creates new files on checkpoints. That way it can guarantee processing exactly once. In a pessimistic scenario you can have “N” files on a checkpoint for every topic where “N” is sink parallelism. Thousands files per minute will kill HDFS and Hive performance. Remember to avoid small files while working with HDFS!

Flink SQL doesn’t solve the problem. You can try to by-pass the problem and write to the Hive managed table and enable compaction on it. It’s just a go-around solution, compaction is an expensive and asynchronous process. Read operation from fragmented tables may be very slow. Moreover it will be hard to configure it for thousands of topics.

The real problem is to disjoin checkpointing with new files creation. We can do it by writing in a file format which supports commit operations, like Iceberg, Delta Lake or Hudi.

Commit operation is built in the file format and Flink does not have to care about it but just writes records in streaming fashion and commits on checkpoint.

Unfortunately, in our case we have to consider other solutions. Handling any of these formats on Hive requires altering Tez processing engine by e.g. Spark. It would be too revolutionary change for the current data platform.

Flink is stateful and it is possible to buffer messages in a persistent state (e.g. RocksDB). Messages can be released periodically by timer. Latency is not critical in that case. You can write to state storage using KeyedProcessFunction and set a timer to trigger releasing. It works as needed, but it is slow. Let’s go through the problems and fix them.

  1.  Write to state storage is slow.
    Don’t write events one-by-one, use chunks instead. KeyedProcessFunction has to load the value state by key, it is good to group events before by time window with short time size. Avoid the count window because it uses a state counter (slow!) and doesn’t guarantee releasing all events in a determined time.
    The fastest way of writing to state is adding to ListState, because you avoid read operation.
  2. State is too big, reading it is slow and may cause Out Of Memory Exception (OOM).
    It’s possible to keep just meta information in value state connected with function key and load list state dynamically (e.g. in processElement method). I put the number of chunks and last chunk size as a Flink function state. List state descriptor is created based on last chunk number and loaded on demand. OnTimer will release records from all chunks one-by-one. Configure memory to load the whole chunk into heap.
  3. Serialization/deserialization Avro is slow and it’s executed between tasks.
    Don’t deserialize Avro on consumer. For proper distribution deserialize only meta information e.g. topic name, without Avro content. Store bytes content in state, deserialize only before write. Notice that it’s only an Avro dump, so no modifications are needed.
  4. Load isn’t balanced well between tasks.
    Some topics will be huge, some very small. I’ve grouped them based on a prepared configuration and distributed by key [group_name]_[random]. It’s good to make random ranges depending on parallelism and tune up empirically. That key is also used in ProcessKeyFunction for buffering/releasing messages.
  5. Sink writes to N files on the checkpoint.
    Distribute using keyBy with topic name.
  6. Sink write performance is slow, because of the previous step!
    Do the same trick like in point 4. My key is [topic_name]_[random] where random range is based on released events’ size. Keeping random range equal 1 enforce single thread write. Bigger range increases the probability of concurrent write.

ETL 2.0 Stream Processing

The flow is very simple, has good performance and is well balanced.

  • Source: Kafka consumer – consume kafka messages, decode topic name, add topic’s group information with random number for distribution. Output contains avro as bytes, without deserialization.
  • Collecting messages (small bulks) – time window function.
  • Collecting, releasing by cron – buffer messages in state and release by timer.
  • Distribute messages (boost) – handles group-by key generation based on events’ size (increase write parallelism when needed).
  • Sink Writer – Deserialize avro and store in proper format/location
That solution is a mix of streaming and batch approach. It has streaming advantages like easy maintenance and monitoring, no scheduling, it is faster and scalable, but in fact it is not a real streaming because of latency.

Notice that some exceptions may occur after storing state like data error (e.g., Avro deserialization on the last step) will stop whole flow, so proper exception handling is critical. Backfill may require extended checkpoint timeout/tolerable checkpoint failures.

Application was implemented using Flink 1.11. The latest release 1.12 introduced a lot of new features. Very interesting seems to be Batch execution mode in DataStream API. With that we could trigger jobs periodically, skipping buffering and releasing data in Flink state.

big data
apache nifi
stream processing
big data project
12 May 2021

Want more? Check our articles


Enabling Hive on Spark on CDH 5.14 — a few problems (and solutions)

Recently I’ve had an opportunity to configure CDH 5.14 Hadoop cluster of one of GetInData’s customers to make it possible to use Hive on Spark…

Read more
big data technology warsaw summit 2021 adam kawa przemysław gamdzyk
Big Data Event

The Big Data Technology Summit 2021 - review of presentations

Since 2015, the beginning of every year is quite intense but also exciting for our company, because we are getting closer and closer to the Big Data…

Read more
0 pjPVaAnArwat2ZH8
Big Data Event

Big Data Tech Warsaw Summit 2019 summary

It’s been already more than a month after Big Data Tech Warsaw Summit 2019, but it’s spirit is still among us — that’s why we’ve decided to prolong it…

Read more
mariusz blogobszar roboczy 1 4x 100

OAuth2-based authentication on Istio-powered Kubernetes clusters

You have just installed your first Kubernetes cluster and installed Istio to get the full advantage of Service Mesh. Thanks to really awesome…

Read more
mamava getindata cloud google bigquery prostooleh
Success Stories

Success story: Breastfeeding supported with modern IoT and app features

Outstanding customer experience is usually backed by robust data analytics. Same applies to Mamava, a business that celebrates and supports…

Read more
obszar roboczy 12 23blogcdci

Different generations of CICD tools

What is CICD? It is an acronym for Continuous Integration Continuous Delivery / Deployment. CICD can be also described as the methodology focused on…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.

The administrator of your personal data is GetInData Sp. z o.o. Sp.k with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the  Terms & Conditions. For more information on personal data processing and your rights please see  Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy