Tutorial
8 min read

ETL 2.0 Why you should switch into stream processing

If you are looking at Nifi to help you in your data ingestions pipeline, there might be an interesting alternative.

Let’s assume we want to simply dump facts from Kafka to HDFS without any transformations - assuming  short latency isn’t crucial, but desirable. We have hundreds of topics from small to really huge ones, and all messages are in Avro format. Additionally some data is produced hourly or daily,  and some are produced online. Facts will be used to power tables on Hive as external tables which means there is a need of avoiding big amounts of small files. Otherwise HDFS along with Hive might have hard times with performance.

How to go about it? Fast research in Google suggests 3 options:

  • Apache Gobblin or Camus – MapReduce framework, so it’s batch, a bit of outdated technology - leave it for old-school programmers
  • Kafka HDFS 2 sink connector – Kafka Connect it’s great, scalable framework. That connector was used in one of the projects in GetInData, but had some bugs connected with Avro format. Due to that it consumed a lot of resources and there were difficulties during schema upgrade. It produced corrupted files during Hive Metastore problems etc. Using that in bigger projects won’t be easy.
  • Kafka HDFS 3 sink connector – it’s under the Confluent Enterprise License, you can test it for 30 days for free. It’s worth trying if you paid for the license.
  • Apache Flume – agent-based solution. It is hard to get Flume to process data exactly-once. Nevertheless, it's a deprecated technology (even by Cloudera)

So, in my opinion  none of the above seems to be good enough for the case. Let's build a well-fitted solution then!

In one of the projects in GetInData we use NIFI as a tool of choice for data ingestion, as it allows quite easily to build flow for dumping data. Messages can be buffered in a loop and merged before putting on HDFS. It works surprisingly well, but it does not guarantee at least once processing. 

You have to be aware that NIFI simply processes flow files (facts in this case), which are not shared or replicated between cluster nodes. Therefore there is a risk of data loss or incorrect order of data chunks, when node is down some data is not available and may be lost. There is no guarantee of recovery flow files after restoring a dead node. NIFI does not support high availability of data in this sense.

It is a real problem when you want to build some automation for reporting and there is a risk that source tables are not consistent. You have to:

  • monitor data consistency 
  • fix everything in case of problems, what means manual interventions and reconciliations

Monitoring of consistency requires gathering counts from Kafka topics and comparison with Hive tables or HDFS files. It's a batch task - it can be checked daily or hourly but you get information with delay. Reports may be inconsistent so analytics can’t rely on it.

There are more problems with NIFI. Maintenance of environments will be challenging as there is no CI/CD support. Stability depends on running processes, it’s worth to isolate core processes on dedicated clusters which is another extra effort.

Why not to use a leading, streaming framework like Flink? It is fast, scalable, gives exactly once processing for granted, works well with CI/CD pipelines. Moreover, it is easy to maintain Flink jobs with e.g., Ververica Platform, Kubernetes or good old Yarn. Auto-retry is your concern? No problem, Flink has a built-in auto-retry based on checkpointing.

Checkpoint in Flink world is a job state. Flink creates a periodic barrier which goes through flow. Task handles the barrier by writing its state to the resilient storage. In the end we have a consistent state of the whole flow. Conclusion is straightforward – make frequent checkpoints if you want to recover fast.

Let’s create flow for the study case and start with a very simple solution – Kafka consumer as source and file sink with bucketing assigner for writing to HDFS. Everything out of the box. Set the checkpoint interval to 60 seconds and it’s done! Not really, because Flink creates new files on checkpoints. That way it can guarantee processing exactly once. In a pessimistic scenario you can have “N” files on a checkpoint for every topic where “N” is sink parallelism. Thousands files per minute will kill HDFS and Hive performance. Remember to avoid small files while working with HDFS!

Flink SQL doesn’t solve the problem. You can try to by-pass the problem and write to the Hive managed table and enable compaction on it. It’s just a go-around solution, compaction is an expensive and asynchronous process. Read operation from fragmented tables may be very slow. Moreover it will be hard to configure it for thousands of topics.

The real problem is to disjoin checkpointing with new files creation. We can do it by writing in a file format which supports commit operations, like Iceberg, Delta Lake or Hudi.

Commit operation is built in the file format and Flink does not have to care about it but just writes records in streaming fashion and commits on checkpoint.

Unfortunately, in our case we have to consider other solutions. Handling any of these formats on Hive requires altering Tez processing engine by e.g. Spark. It would be too revolutionary change for the current data platform.

Flink is stateful and it is possible to buffer messages in a persistent state (e.g. RocksDB). Messages can be released periodically by timer. Latency is not critical in that case. You can write to state storage using KeyedProcessFunction and set a timer to trigger releasing. It works as needed, but it is slow. Let’s go through the problems and fix them.

  1.  Write to state storage is slow.
    Don’t write events one-by-one, use chunks instead. KeyedProcessFunction has to load the value state by key, it is good to group events before by time window with short time size. Avoid the count window because it uses a state counter (slow!) and doesn’t guarantee releasing all events in a determined time.
    The fastest way of writing to state is adding to ListState, because you avoid read operation.
  2. State is too big, reading it is slow and may cause Out Of Memory Exception (OOM).
    It’s possible to keep just meta information in value state connected with function key and load list state dynamically (e.g. in processElement method). I put the number of chunks and last chunk size as a Flink function state. List state descriptor is created based on last chunk number and loaded on demand. OnTimer will release records from all chunks one-by-one. Configure memory to load the whole chunk into heap.
  3. Serialization/deserialization Avro is slow and it’s executed between tasks.
    Don’t deserialize Avro on consumer. For proper distribution deserialize only meta information e.g. topic name, without Avro content. Store bytes content in state, deserialize only before write. Notice that it’s only an Avro dump, so no modifications are needed.
  4. Load isn’t balanced well between tasks.
    Some topics will be huge, some very small. I’ve grouped them based on a prepared configuration and distributed by key [group_name]_[random]. It’s good to make random ranges depending on parallelism and tune up empirically. That key is also used in ProcessKeyFunction for buffering/releasing messages.
  5. Sink writes to N files on the checkpoint.
    Distribute using keyBy with topic name.
  6. Sink write performance is slow, because of the previous step!
    Do the same trick like in point 4. My key is [topic_name]_[random] where random range is based on released events’ size. Keeping random range equal 1 enforce single thread write. Bigger range increases the probability of concurrent write.

ETL 2.0 Stream Processing

The flow is very simple, has good performance and is well balanced.

  • Source: Kafka consumer – consume kafka messages, decode topic name, add topic’s group information with random number for distribution. Output contains avro as bytes, without deserialization.
  • Collecting messages (small bulks) – time window function.
  • Collecting, releasing by cron – buffer messages in state and release by timer.
  • Distribute messages (boost) – handles group-by key generation based on events’ size (increase write parallelism when needed).
  • Sink Writer – Deserialize avro and store in proper format/location
That solution is a mix of streaming and batch approach. It has streaming advantages like easy maintenance and monitoring, no scheduling, it is faster and scalable, but in fact it is not a real streaming because of latency.

Notice that some exceptions may occur after storing state like data error (e.g., Avro deserialization on the last step) will stop whole flow, so proper exception handling is critical. Backfill may require extended checkpoint timeout/tolerable checkpoint failures.

Application was implemented using Flink 1.11. The latest release 1.12 introduced a lot of new features. Very interesting seems to be Batch execution mode in DataStream API. With that we could trigger jobs periodically, skipping buffering and releasing data in Flink state.

big data
apache nifi
avro
bigdatatech
camus
flink
hdfs
kafka
stream processing
big data project
12 May 2021

Want more? Check our articles

1 RsDrT5xOpdAcpehomqlOPg
Big Data Event

2³ Reasons To Speak at Big Data Tech Warsaw 2020 (February 27th, 2020)

Big Data Technology Warsaw Summit 2020 is fast approaching. This will be 6th edition of the conference that is jointly organised by Evention and…

Read more
getindata google data studio bigquery usage costs
Tutorial

Google Data Studio on BigQuery - usage and cost control

Data Studio is a reporting tool that comes along with other Google Cloud Platform products to bring out a simple yet reliable BI platform. There are…

Read more
kubeflow pipelines runing 5 minutes getindata blog

Kubeflow Pipelines up and running in 5 minutes

The Kubeflow Pipelines project has been growing in popularity in recent years. It's getting more prominent due to its capabilities - you can…

Read more
getindata monitoring alert data streaming platfrorm
Use-cases/Project

How to build continuous processing for real-time data streaming platform?

Real-time data streaming platforms are tough to create and to maintain. This difficulty is caused by a huge amount of data that we have to process as…

Read more
18nX38qlhR2rMM2cQzZ0U3A
Use-cases/Project

How to build Digital Marketing Platform making the best out of Google Cloud

Nowadays digital marketing is a competitive business and it’s easy to tell that we are way past the point when a catchy slogan or shiny banner would…

Read more
0LThQo4TotB93NHz6
Use-cases/Project

Streaming analytics better than classic batch — when and why?

While a lot of problems can be solved in batch, the stream processing approach can give you even more benefits. Today, we’ll discuss a real-world…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.

By submitting this form, you agree to our  Terms & Conditions