Flink with a metadata catalog
Have you worked with Flink SQL or Flink Table API? Do you find it frustrating to manage sources and sinks across different projects or repositories…
Read moreLet’s assume we want to simply dump facts from Kafka to HDFS without any transformations - assuming short latency isn’t crucial, but desirable. We have hundreds of topics from small to really huge ones, and all messages are in Avro format. Additionally some data is produced hourly or daily, and some are produced online. Facts will be used to power tables on Hive as external tables which means there is a need of avoiding big amounts of small files. Otherwise HDFS along with Hive might have hard times with performance.
How to go about it? Fast research in Google suggests 3 options:
In one of the projects in GetInData we use NIFI as a tool of choice for data ingestion, as it allows quite easily to build flow for dumping data. Messages can be buffered in a loop and merged before putting on HDFS. It works surprisingly well, but it does not guarantee at least once processing.
You have to be aware that NIFI simply processes flow files (facts in this case), which are not shared or replicated between cluster nodes. Therefore there is a risk of data loss or incorrect order of data chunks, when node is down some data is not available and may be lost. There is no guarantee of recovery flow files after restoring a dead node. NIFI does not support high availability of data in this sense.
It is a real problem when you want to build some automation for reporting and there is a risk that source tables are not consistent. You have to:
Monitoring of consistency requires gathering counts from Kafka topics and comparison with Hive tables or HDFS files. It's a batch task - it can be checked daily or hourly but you get information with delay. Reports may be inconsistent so analytics can’t rely on it.
There are more problems with NIFI. Maintenance of environments will be challenging as there is no CI/CD support. Stability depends on running processes, it’s worth to isolate core processes on dedicated clusters which is another extra effort.
Why not to use a leading, streaming framework like Flink? It is fast, scalable, gives exactly once processing for granted, works well with CI/CD pipelines. Moreover, it is easy to maintain Flink jobs with e.g., Ververica Platform, Kubernetes or good old Yarn. Auto-retry is your concern? No problem, Flink has a built-in auto-retry based on checkpointing.
Let’s create flow for the study case and start with a very simple solution – Kafka consumer as source and file sink with bucketing assigner for writing to HDFS. Everything out of the box. Set the checkpoint interval to 60 seconds and it’s done! Not really, because Flink creates new files on checkpoints. That way it can guarantee processing exactly once. In a pessimistic scenario you can have “N” files on a checkpoint for every topic where “N” is sink parallelism. Thousands files per minute will kill HDFS and Hive performance. Remember to avoid small files while working with HDFS!
Flink SQL doesn’t solve the problem. You can try to by-pass the problem and write to the Hive managed table and enable compaction on it. It’s just a go-around solution, compaction is an expensive and asynchronous process. Read operation from fragmented tables may be very slow. Moreover it will be hard to configure it for thousands of topics.
Commit operation is built in the file format and Flink does not have to care about it but just writes records in streaming fashion and commits on checkpoint.
Unfortunately, in our case we have to consider other solutions. Handling any of these formats on Hive requires altering Tez processing engine by e.g. Spark. It would be too revolutionary change for the current data platform.
Flink is stateful and it is possible to buffer messages in a persistent state (e.g. RocksDB). Messages can be released periodically by timer. Latency is not critical in that case. You can write to state storage using KeyedProcessFunction and set a timer to trigger releasing. It works as needed, but it is slow. Let’s go through the problems and fix them.
The flow is very simple, has good performance and is well balanced.
Notice that some exceptions may occur after storing state like data error (e.g., Avro deserialization on the last step) will stop whole flow, so proper exception handling is critical. Backfill may require extended checkpoint timeout/tolerable checkpoint failures.
Application was implemented using Flink 1.11. The latest release 1.12 introduced a lot of new features. Very interesting seems to be Batch execution mode in DataStream API. With that we could trigger jobs periodically, skipping buffering and releasing data in Flink state.
Have you worked with Flink SQL or Flink Table API? Do you find it frustrating to manage sources and sinks across different projects or repositories…
Read moreGetInData, Google and Truecaller participate in the Big Data Tech Warsaw Summit 2019. It’s already less than two weeks to the 5th edition of Big Data…
Read moreIntroduction At GetInData, we understand the value of full observability across our application stacks. For our Customers, we always recommend…
Read moreIntroduction Almost two decades ago, the first version of Spring framework was released. During this time, Spring became the bedrock on which the…
Read moreOutstanding customer experience is usually backed by robust data analytics. Same applies to Mamava, a business that celebrates and supports…
Read moreBuilding reliable machine learning pipelines puts a heavy burden on Data Scientists and Machine Learning engineers. It’s fairly easy to kick-off any…
Read moreTogether, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.
What did you find most impressive about GetInData?