Tutorial
8 min read

ETL 2.0 Why you should switch into stream processing

If you are looking at Nifi to help you in your data ingestions pipeline, there might be an interesting alternative.

Let’s assume we want to simply dump facts from Kafka to HDFS without any transformations - assuming  short latency isn’t crucial, but desirable. We have hundreds of topics from small to really huge ones, and all messages are in Avro format. Additionally some data is produced hourly or daily,  and some are produced online. Facts will be used to power tables on Hive as external tables which means there is a need of avoiding big amounts of small files. Otherwise HDFS along with Hive might have hard times with performance.

How to go about it? Fast research in Google suggests 3 options:

  • Apache Gobblin or Camus – MapReduce framework, so it’s batch, a bit of outdated technology - leave it for old-school programmers
  • Kafka HDFS 2 sink connector – Kafka Connect it’s great, scalable framework. That connector was used in one of the projects in GetInData, but had some bugs connected with Avro format. Due to that it consumed a lot of resources and there were difficulties during schema upgrade. It produced corrupted files during Hive Metastore problems etc. Using that in bigger projects won’t be easy.
  • Kafka HDFS 3 sink connector – it’s under the Confluent Enterprise License, you can test it for 30 days for free. It’s worth trying if you paid for the license.
  • Apache Flume – agent-based solution. It is hard to get Flume to process data exactly-once. Nevertheless, it's a deprecated technology (even by Cloudera)

So, in my opinion  none of the above seems to be good enough for the case. Let's build a well-fitted solution then!

In one of the projects in GetInData we use NIFI as a tool of choice for data ingestion, as it allows quite easily to build flow for dumping data. Messages can be buffered in a loop and merged before putting on HDFS. It works surprisingly well, but it does not guarantee at least once processing. 

You have to be aware that NIFI simply processes flow files (facts in this case), which are not shared or replicated between cluster nodes. Therefore there is a risk of data loss or incorrect order of data chunks, when node is down some data is not available and may be lost. There is no guarantee of recovery flow files after restoring a dead node. NIFI does not support high availability of data in this sense.

It is a real problem when you want to build some automation for reporting and there is a risk that source tables are not consistent. You have to:

  • monitor data consistency 
  • fix everything in case of problems, what means manual interventions and reconciliations

Monitoring of consistency requires gathering counts from Kafka topics and comparison with Hive tables or HDFS files. It's a batch task - it can be checked daily or hourly but you get information with delay. Reports may be inconsistent so analytics can’t rely on it.

There are more problems with NIFI. Maintenance of environments will be challenging as there is no CI/CD support. Stability depends on running processes, it’s worth to isolate core processes on dedicated clusters which is another extra effort.

Why not to use a leading, streaming framework like Flink? It is fast, scalable, gives exactly once processing for granted, works well with CI/CD pipelines. Moreover, it is easy to maintain Flink jobs with e.g., Ververica Platform, Kubernetes or good old Yarn. Auto-retry is your concern? No problem, Flink has a built-in auto-retry based on checkpointing.

Checkpoint in Flink world is a job state. Flink creates a periodic barrier which goes through flow. Task handles the barrier by writing its state to the resilient storage. In the end we have a consistent state of the whole flow. Conclusion is straightforward – make frequent checkpoints if you want to recover fast.

Let’s create flow for the study case and start with a very simple solution – Kafka consumer as source and file sink with bucketing assigner for writing to HDFS. Everything out of the box. Set the checkpoint interval to 60 seconds and it’s done! Not really, because Flink creates new files on checkpoints. That way it can guarantee processing exactly once. In a pessimistic scenario you can have “N” files on a checkpoint for every topic where “N” is sink parallelism. Thousands files per minute will kill HDFS and Hive performance. Remember to avoid small files while working with HDFS!

Flink SQL doesn’t solve the problem. You can try to by-pass the problem and write to the Hive managed table and enable compaction on it. It’s just a go-around solution, compaction is an expensive and asynchronous process. Read operation from fragmented tables may be very slow. Moreover it will be hard to configure it for thousands of topics.

The real problem is to disjoin checkpointing with new files creation. We can do it by writing in a file format which supports commit operations, like Iceberg, Delta Lake or Hudi.

Commit operation is built in the file format and Flink does not have to care about it but just writes records in streaming fashion and commits on checkpoint.

Unfortunately, in our case we have to consider other solutions. Handling any of these formats on Hive requires altering Tez processing engine by e.g. Spark. It would be too revolutionary change for the current data platform.

Flink is stateful and it is possible to buffer messages in a persistent state (e.g. RocksDB). Messages can be released periodically by timer. Latency is not critical in that case. You can write to state storage using KeyedProcessFunction and set a timer to trigger releasing. It works as needed, but it is slow. Let’s go through the problems and fix them.

  1.  Write to state storage is slow.
    Don’t write events one-by-one, use chunks instead. KeyedProcessFunction has to load the value state by key, it is good to group events before by time window with short time size. Avoid the count window because it uses a state counter (slow!) and doesn’t guarantee releasing all events in a determined time.
    The fastest way of writing to state is adding to ListState, because you avoid read operation.
  2. State is too big, reading it is slow and may cause Out Of Memory Exception (OOM).
    It’s possible to keep just meta information in value state connected with function key and load list state dynamically (e.g. in processElement method). I put the number of chunks and last chunk size as a Flink function state. List state descriptor is created based on last chunk number and loaded on demand. OnTimer will release records from all chunks one-by-one. Configure memory to load the whole chunk into heap.
  3. Serialization/deserialization Avro is slow and it’s executed between tasks.
    Don’t deserialize Avro on consumer. For proper distribution deserialize only meta information e.g. topic name, without Avro content. Store bytes content in state, deserialize only before write. Notice that it’s only an Avro dump, so no modifications are needed.
  4. Load isn’t balanced well between tasks.
    Some topics will be huge, some very small. I’ve grouped them based on a prepared configuration and distributed by key [group_name]_[random]. It’s good to make random ranges depending on parallelism and tune up empirically. That key is also used in ProcessKeyFunction for buffering/releasing messages.
  5. Sink writes to N files on the checkpoint.
    Distribute using keyBy with topic name.
  6. Sink write performance is slow, because of the previous step!
    Do the same trick like in point 4. My key is [topic_name]_[random] where random range is based on released events’ size. Keeping random range equal 1 enforce single thread write. Bigger range increases the probability of concurrent write.

ETL 2.0 Stream Processing

The flow is very simple, has good performance and is well balanced.

  • Source: Kafka consumer – consume kafka messages, decode topic name, add topic’s group information with random number for distribution. Output contains avro as bytes, without deserialization.
  • Collecting messages (small bulks) – time window function.
  • Collecting, releasing by cron – buffer messages in state and release by timer.
  • Distribute messages (boost) – handles group-by key generation based on events’ size (increase write parallelism when needed).
  • Sink Writer – Deserialize avro and store in proper format/location
That solution is a mix of streaming and batch approach. It has streaming advantages like easy maintenance and monitoring, no scheduling, it is faster and scalable, but in fact it is not a real streaming because of latency.

Notice that some exceptions may occur after storing state like data error (e.g., Avro deserialization on the last step) will stop whole flow, so proper exception handling is critical. Backfill may require extended checkpoint timeout/tolerable checkpoint failures.

Application was implemented using Flink 1.11. The latest release 1.12 introduced a lot of new features. Very interesting seems to be Batch execution mode in DataStream API. With that we could trigger jobs periodically, skipping buffering and releasing data in Flink state.

big data
apache nifi
avro
bigdatatech
camus
flink
hdfs
kafka
stream processing
big data project
12 May 2021

Want more? Check our articles

1 RsDrT5xOpdAcpehomqlOPg
Big Data Event

2³ Reasons To Speak at Big Data Tech Warsaw 2020 (February 27th, 2020)

Big Data Technology Warsaw Summit 2020 is fast approaching. This will be 6th edition of the conference that is jointly organised by Evention and…

Read more
getindata 1000 followers

5 reasons to follow us on Linkedin. Celebrating 1,000 followers on our profile!

We are excited to announce that we recently hit the 1,000+ followers on our profile on Linkedin. We would like to send a special THANK YOU :) to…

Read more
blog7

5 main data-related trends to be covered at Big Data Tech Warsaw 2021 Part II

Trend 4. Larger clouds over the Big Data landscape  A decade ago,  only a few companies ran their Big Data infrastructure and pipelines in the public…

Read more
blog6

5 main data-related trends to be covered at Big Data Tech Warsaw 2021. Part I.

A year is definitely a long enough time to see new trends or technologies that get more traction. The Big Data landscape changes increasingly fast…

Read more
18nX38qlhR2rMM2cQzZ0U3A
Use-cases/Project

How to build Digital Marketing Platform making the best out of Google Cloud

Nowadays digital marketing is a competitive business and it’s easy to tell that we are way past the point when a catchy slogan or shiny banner would…

Read more
nifiobszar roboczy 1 3 3x 100
Tutorial

Apache NiFi: A Complete Guide E-book.

We are proud to present you our first e-book, created by GetInData specialists. Apache NiFi: A Complete Guide is the result of long and fruitful work…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.

The administrator of your personal data is GetInData Sp. z o.o. Sp.k with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the  Terms & Conditions. For more information on personal data processing and your rights please see  Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy