Apache NiFi, a big data processing engine with graphical WebUI, was created to give non-programmers the ability to swiftly and codelessly create data pipelines and free them from those dirty, text-based methods of implementation. Unfortunately, we live in a world of trade-offs, and those features come with a price. The purpose of our blog series is to present our experience and lessons learned when working with production NiFi pipelines. This will be organised into the following articles:
It’s a known fact that you can’t build a tool that is good in every aspect, you need to decide what you want to prioritize. Those decisions will have a huge impact on what your tool will be capable of doing and how well it will perform. In this post we will take a look at these decisions and their consequences in Apache Nifi.
You are on the council but… - masterless architecture
There is probably no one in the Big Data world who hasn't heard of master-worker architecture. The idea is fairly simple, in the cluster we have two specialized types of nodes; workers that do all the calculations and masters that are responsible for tasks like distributing jobs, checking their status and doing health checks on worker nodes. NiFi creators decided to move forward from that idea, all nodes doing the processing but some of them also having additional duties. The first is the coordinator node which is responsible for managing the cluster, receiving heartbeats and keeping the latest flow version. The second is the primary node which handles specific types of processing tasks.
The location of the coordinator and primary node is set dynamically during startup in a process called election. The obvious benefit of dynamic selection is that if one of those nodes fails, the other one can be elected and take its place, which happens automatically. Unfortunately, this also means that they have to take time to elect a new node for the role, which is usually a bit lengthy process.
The difference between these approaches can be seen when we want to handle tasks that have to be executed on just one machine. When we have a master, it can dynamically delegate one of the workers to perform this task. NiFi solves this by executing those tasks only on the primary node. This feature is called processing isolation and whilst in general, it solves this issue pretty well, it brings its own problems to the table.
Boxes don’t touch - independent workers
If we conceptualize the processing by MapReduce, we can clearly see the coupling between the elements. All the workers are connected to the master node, which gives them a job to execute, syncs the results, gives another one and so on. In the case of NiFi we have few worker nodes that are just there and by some coincidence all execute the same flow, with no coupling whatsoever.
If we don’t want to get into the specifics of architecture, processing by NiFi is a lot easier to understand, plus you can represent it a lot easier on the canvas of webui. This “simple” way of distributing work on a cluster is good for data transformation, handling messages from queues or just general data processing. Without any synchronisation we achieve great scalability (because we can just add another node that coincidentally does the same thing as the rest) but at the same time there are limitations to what we can achieve in a simple way. One obvious limitation is data aggregation, if we want to aggregate data from all of the nodes, we only really have two options. First, get all the partial results on one machine;it's doable but we have to take care of data completeness ourselves, so definitely not simple. Secondly, we need to send the partial results to some external tool, but can we call it NiFi processing then though? NiFi also lacks some failsafes popular in master-worker architectures, for example built-in retry policies or moving the load between nodes in case failure. Another issue is that unless we explicitly split a large file, it will be processed by a single worker. If a file is too big for a single machine processing will fail no matter how big our cluster is.
Distributed but local, dual nature of flowfiles
Flowfiles are the abstraction that is put over any data processed by NiFi, it can be a file, event or any other chunk of data. It’s composed of two parts, content - data stored in entity and attributes - data describing the entity. They are designed in such a way that it's possible to access the history of the flowfile, rerun it or check it’s lineage.
NiFi puts huge emphasis on durability. There are multiple mechanisms implemented to provide those features. Data of the flowfiles is stored in 3 directories on disk,
- flowfile repository - persists attributes of the flowfile
- content repository - stores all data contained in the flowfile
- provenance repository - stores information about the history of the flowfile
Note: In general the attributes are stored in RAM memory of Nifi nodes so they can be accessed quickly, but to avoid losing them in the case of application failure, they are also saved on disk.
Another method to achieve durability is the use of a write-ahead log. While the processor is processing a flow file, it writes all the changes to temporary structures and only when it’s done, it assigns those values to the flow file. Thanks to this, no changes are applied to the actual flow file and in case of failure, it gets rolled back as all the changes are discarded.
Unfortunately as we remember, the boxes don’t touch so this means the flowfile is bound to a single node on NiFi. Currently when the node is down, there is no way of accessing its data, so all its flow files are unavailable until it’s started and connected back to the cluster. Another drawback is the fact that the flowfile content has to physically be in local storage, which leads to additional latency because of copying time and losing all the benefits of distributed sources like HDFS or S3.
Anatomy of the flow file
NiFi provides access to all change history of a flow file. To achieve such a feature means it would have to either collect the changes between versions or full versions. The second approach was chosen because it optimises read speed, which is the operation performed more often. Everytime you “modify” a flowfile, you are actually creating a new flowfile and discarding the old one.
As attributes change more often than content, it made sense to store them separately to avoid the unnecessary rewriting of content. Additional possible optimization is sharing a reference to content by flow files, that way multiple flow files can share content which is indifferent to the changes within them. The reference is defined by 4 values:
- Container - defines one of the content repository locations, defined in properties
- Section - directory aggregating claims
- Claim - file, contains content of one or more flowfiles
- Offset - position in claim (in bytes)
Claims are files containing content of usually multiple flow files, it’s optimization aiming to limit the load on the filesystem by decreasing the amount of inodes that NiFi uses. One question arises while thinking about the structure that is referred to by multiple entities which is - when can we remove it? NiFi has a garbage-collector-like mechanism for claims, if there are no flow files that point to the claim, it's deleted or archived (depending on configuration). This mechanism is the root of one of the harder-to-debug problems for new nifi users. The “different size flowfile issue” replacing “small files issue” occurring in HDFS.
Here we have two parallel flows, one takes the files and writes them to the disk, another one does some query on a database. The first one deals with large flow files but for a short period of time, so the content should be dropped quickly. The second one only executes the query so flow files are small but it takes long for a flowfile to be processed. Let’s assume the worst case scenario, Every flow file from the first flow appears just after the flow file from the second one. On the webui you can see that queues take 10% of your content repository capacity, but there is an error about it being full and the flow crashes. Why did it happen? Let’s see what the situation looked like from the flow files perspective
- A small flowfile created a separate claim
- A big flow file saw a small claim, so it attached itself to it
- The next small flowfile saw a big claim (small and big flow file) so it created a new one
- The next big flowfile saw small claim, so it attached itself to it
That process went on for a while and now we have a lot of claims with the content of one big flow file and small one which cannot be deleted until the small ones are processed. After enough time, the storage required to contain all those claims was larger than the content repository and an error occurred. That issue can also happen when we leave unused process groups with flowfiles in them.
Conclusion
What we loved? NiFi provides great features in terms of keeping lineage, durability and flow debugging with provenance and flowfile immutability.
What we hated? The lack of built-in capability to move flowfiles from failed nodes and the sharing resources between them can greatly limit what we can do with Nifi, also the “different size flowfile issue” is quite a trap for new developers.