Tutorial
8 min read

ETL 2.0 Why you should switch into stream processing

If you are looking at Nifi to help you in your data ingestions pipeline, there might be an interesting alternative.

Let’s assume we want to simply dump facts from Kafka to HDFS without any transformations - assuming  short latency isn’t crucial, but desirable. We have hundreds of topics from small to really huge ones, and all messages are in Avro format. Additionally some data is produced hourly or daily,  and some are produced online. Facts will be used to power tables on Hive as external tables which means there is a need of avoiding big amounts of small files. Otherwise HDFS along with Hive might have hard times with performance.

How to go about it? Fast research in Google suggests 3 options:

  • Apache Gobblin or Camus – MapReduce framework, so it’s batch, a bit of outdated technology - leave it for old-school programmers
  • Kafka HDFS 2 sink connector – Kafka Connect it’s great, scalable framework. That connector was used in one of the projects in GetInData, but had some bugs connected with Avro format. Due to that it consumed a lot of resources and there were difficulties during schema upgrade. It produced corrupted files during Hive Metastore problems etc. Using that in bigger projects won’t be easy.
  • Kafka HDFS 3 sink connector – it’s under the Confluent Enterprise License, you can test it for 30 days for free. It’s worth trying if you paid for the license.
  • Apache Flume – agent-based solution. It is hard to get Flume to process data exactly-once. Nevertheless, it's a deprecated technology (even by Cloudera)

So, in my opinion  none of the above seems to be good enough for the case. Let's build a well-fitted solution then!

In one of the projects in GetInData we use NIFI as a tool of choice for data ingestion, as it allows quite easily to build flow for dumping data. Messages can be buffered in a loop and merged before putting on HDFS. It works surprisingly well, but it does not guarantee at least once processing. 

You have to be aware that NIFI simply processes flow files (facts in this case), which are not shared or replicated between cluster nodes. Therefore there is a risk of data loss or incorrect order of data chunks, when node is down some data is not available and may be lost. There is no guarantee of recovery flow files after restoring a dead node. NIFI does not support high availability of data in this sense.

It is a real problem when you want to build some automation for reporting and there is a risk that source tables are not consistent. You have to:

  • monitor data consistency 
  • fix everything in case of problems, what means manual interventions and reconciliations

Monitoring of consistency requires gathering counts from Kafka topics and comparison with Hive tables or HDFS files. It's a batch task - it can be checked daily or hourly but you get information with delay. Reports may be inconsistent so analytics can’t rely on it.

There are more problems with NIFI. Maintenance of environments will be challenging as there is no CI/CD support. Stability depends on running processes, it’s worth to isolate core processes on dedicated clusters which is another extra effort.

Why not to use a leading, streaming framework like Flink? It is fast, scalable, gives exactly once processing for granted, works well with CI/CD pipelines. Moreover, it is easy to maintain Flink jobs with e.g., Ververica Platform, Kubernetes or good old Yarn. Auto-retry is your concern? No problem, Flink has a built-in auto-retry based on checkpointing.

Checkpoint in Flink world is a job state. Flink creates a periodic barrier which goes through flow. Task handles the barrier by writing its state to the resilient storage. In the end we have a consistent state of the whole flow. Conclusion is straightforward – make frequent checkpoints if you want to recover fast.

Let’s create flow for the study case and start with a very simple solution – Kafka consumer as source and file sink with bucketing assigner for writing to HDFS. Everything out of the box. Set the checkpoint interval to 60 seconds and it’s done! Not really, because Flink creates new files on checkpoints. That way it can guarantee processing exactly once. In a pessimistic scenario you can have “N” files on a checkpoint for every topic where “N” is sink parallelism. Thousands files per minute will kill HDFS and Hive performance. Remember to avoid small files while working with HDFS!

Flink SQL doesn’t solve the problem. You can try to by-pass the problem and write to the Hive managed table and enable compaction on it. It’s just a go-around solution, compaction is an expensive and asynchronous process. Read operation from fragmented tables may be very slow. Moreover it will be hard to configure it for thousands of topics.

The real problem is to disjoin checkpointing with new files creation. We can do it by writing in a file format which supports commit operations, like Iceberg, Delta Lake or Hudi.

Commit operation is built in the file format and Flink does not have to care about it but just writes records in streaming fashion and commits on checkpoint.

Unfortunately, in our case we have to consider other solutions. Handling any of these formats on Hive requires altering Tez processing engine by e.g. Spark. It would be too revolutionary change for the current data platform.

Flink is stateful and it is possible to buffer messages in a persistent state (e.g. RocksDB). Messages can be released periodically by timer. Latency is not critical in that case. You can write to state storage using KeyedProcessFunction and set a timer to trigger releasing. It works as needed, but it is slow. Let’s go through the problems and fix them.

  1.  Write to state storage is slow.
    Don’t write events one-by-one, use chunks instead. KeyedProcessFunction has to load the value state by key, it is good to group events before by time window with short time size. Avoid the count window because it uses a state counter (slow!) and doesn’t guarantee releasing all events in a determined time.
    The fastest way of writing to state is adding to ListState, because you avoid read operation.
  2. State is too big, reading it is slow and may cause Out Of Memory Exception (OOM).
    It’s possible to keep just meta information in value state connected with function key and load list state dynamically (e.g. in processElement method). I put the number of chunks and last chunk size as a Flink function state. List state descriptor is created based on last chunk number and loaded on demand. OnTimer will release records from all chunks one-by-one. Configure memory to load the whole chunk into heap.
  3. Serialization/deserialization Avro is slow and it’s executed between tasks.
    Don’t deserialize Avro on consumer. For proper distribution deserialize only meta information e.g. topic name, without Avro content. Store bytes content in state, deserialize only before write. Notice that it’s only an Avro dump, so no modifications are needed.
  4. Load isn’t balanced well between tasks.
    Some topics will be huge, some very small. I’ve grouped them based on a prepared configuration and distributed by key [group_name]_[random]. It’s good to make random ranges depending on parallelism and tune up empirically. That key is also used in ProcessKeyFunction for buffering/releasing messages.
  5. Sink writes to N files on the checkpoint.
    Distribute using keyBy with topic name.
  6. Sink write performance is slow, because of the previous step!
    Do the same trick like in point 4. My key is [topic_name]_[random] where random range is based on released events’ size. Keeping random range equal 1 enforce single thread write. Bigger range increases the probability of concurrent write.

ETL 2.0 Stream Processing

The flow is very simple, has good performance and is well balanced.

  • Source: Kafka consumer – consume kafka messages, decode topic name, add topic’s group information with random number for distribution. Output contains avro as bytes, without deserialization.
  • Collecting messages (small bulks) – time window function.
  • Collecting, releasing by cron – buffer messages in state and release by timer.
  • Distribute messages (boost) – handles group-by key generation based on events’ size (increase write parallelism when needed).
  • Sink Writer – Deserialize avro and store in proper format/location
That solution is a mix of streaming and batch approach. It has streaming advantages like easy maintenance and monitoring, no scheduling, it is faster and scalable, but in fact it is not a real streaming because of latency.

Notice that some exceptions may occur after storing state like data error (e.g., Avro deserialization on the last step) will stop whole flow, so proper exception handling is critical. Backfill may require extended checkpoint timeout/tolerable checkpoint failures.

Application was implemented using Flink 1.11. The latest release 1.12 introduced a lot of new features. Very interesting seems to be Batch execution mode in DataStream API. With that we could trigger jobs periodically, skipping buffering and releasing data in Flink state.

big data
apache nifi
avro
bigdatatech
camus
flink
hdfs
kafka
stream processing
big data project
12 May 2021

Want more? Check our articles

flink metadata catalog
Tutorial

Flink with a metadata catalog

Have you worked with Flink SQL or Flink Table API? Do you find it frustrating to manage sources and sinks across different projects or repositories…

Read more
1 6ZTvzJwCviqIJcV5WQC0Sg
Big Data Event

Truecaller, GetInData and Google’s contribution to Big Data Tech Warsaw Summit

GetInData, Google and Truecaller participate in the Big Data Tech Warsaw Summit 2019. It’s already less than two weeks to the 5th edition of Big Data…

Read more
running observability kubernetesobszar roboczy 1 4
Tutorial

Running Observability Stack on Grafana

Introduction At GetInData, we understand the value of full observability across our application stacks. For our Customers, we always recommend…

Read more
writing flink jobs using springobszar roboczy 1 4

Writing Flink jobs using the Spring dependency injection framework

Introduction Almost two decades ago, the first version of Spring framework was released. During this time, Spring became the bedrock on which the…

Read more
mamava getindata cloud google bigquery prostooleh
Success Stories

Success story: Breastfeeding supported with modern IoT and app features

Outstanding customer experience is usually backed by robust data analytics. Same applies to Mamava, a business that celebrates and supports…

Read more
runningkedroeverywhereobszar roboczy 1 4
Tutorial

Running Kedro… everywhere? Machine Learning Pipelines on Kubeflow, Vertex AI, Azure and Airflow

Building reliable machine learning pipelines puts a heavy burden on Data Scientists and Machine Learning engineers. It’s fairly easy to kick-off any…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.


What did you find most impressive about GetInData?

They did a very good job in finding people that fitted in Acast both technically as well as culturally.
Type the form or send a e-mail: hello@getindata.com
The administrator of your personal data is GetInData Poland Sp. z o.o. with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the Terms & Conditions. For more information on personal data processing and your rights please see Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy