Avoiding The Mess In The Hadoop Cluster (Part 2)
In the first part of this blog series I described a few challenges that I had to face to quickly implement a simple Hive query and schedule it periodically on the Hadoop cluster. These challenges include data cataloguing, data discovery, data lineage and process scheduling. I also explained how they can be addressed using existing open-source projects such as Falcon, HCatalog and Hive.
In the second part I will explain how to smoothly change the file format of your datasets, highlight more advanced features of Falcon such as data backups, disaster recovery, late data handling and list possible future enhancements that I think could be useful.
Once my query was implemented and scheduled daily, I was only half-way done. The remaining part of the task was to reduce the execution time of this query from 6 hours to 10 minutes. To achieve this goal I removed stop-the-world garbage collection pauses, configured the query to run map-side joins, switched the execution engine from MapReduce to Tez, changed the file format from Avro to ORC. You can find more details and benchmarks in the slides from my talk at Hadoop Summit San Jose 2015.
Thanks to above optimisations my query became much faster. Finally I could open the beer, execute the Hive query and receive the results before the bottle gets empty. And this was actually what I did. I also started to think how my company and colleagues could benefit from my learnings. One obvious, but important learning was that significant performance boost can be easily gained by changing the file format from the row-oriented (Avro) to the columnar (ORC in this particular case, but Parquet is also great).
Switching file formats
Why not to change the file format to the column oriented one, so that data gets processed faster and less disk space in HDFS is consumed? Well, the most important reason against this idea is that many business-critical applications will immediately fail. Please look at your production code and check how often you expect data to come in some pre-defined format. If you read data using format-specific readers (e.g. AvroStorage in Pig or sc.textFile in Spark), this code will obviously stop working after you switch file format. Having this in mind, you can either:
- don’t upgrade to a new format (and supposedly process your data longer, read more data from disks and transfer more data over the network), or
- start working hard to refactor the reader code in your applications to read data properly in newer format
As you see both options are not too compelling…. The good news is that there is a way to make it never happen again. This is possible with HCatalog.
HCatalog provides a set of libraries for other frameworks that make it possible to read data from Hive tables. If you create Hive tables for your production datasets then you can use HCatalog to access them providing name of the corresponding Hive table.
With HCatalog you don’t specify the format or location of you datasets – just the name of the Hive table that you want to read data from or write data to. This has a powerful advantage – if the format and/or location of the files change, you don’t have to refactor reader code, thus you can easily switch from one file format to another without the risk of breaking the production applications. “Refactor data, not just code” as my colleague, David Whiting, recommends in his great talk about scaling for developer productivity.
When talking about Falcon so far we have focused only on its core and most important features. Falcon has much more to offer, though. The mission of Falcon is to automate and simplify various tasks of how data is managed and processes scheduled on a Hadoop cluster. Let’s have a look at few of such features!
Late data handling
Nice feature which comes out-of-the box in Falcon is handling input data that arrives late. Falcon allows us to specify how long we can wait for input data and what to do when data is showing late. Let’s assume that each night we want to run a batch job that processes data from the previous day that comes as 24 hourly partitions. We can configure our process to start at 3 AM – even if not all 24 hourly partitions from previous day are available, we simply process partial datasets. But when missing partitions eventually arrive, Falcon will re-process the full dataset again and override previously calculated (incomplete) results. For many types of analysis, this approach is valuable because often you are forced to start computation at some point (e.g. closing the books in case of financial data) or want to have some results earlier, even if they are incomplete (e.g. fresh music recommendations). You can read more about this feature in the documentation of process specification.
Backup and disaster recovery
Yet another important but often ignored aspect of data management is BDR (backup and disaster recovery). The frequently provided excuse why we don’t do it is lack of available machines to build a backup Hadoop cluster. This is a valid excuse, but it’s often caused by the need to have more nodes in production cluster to keep a lot of useless datasets there. Thanks to data retention, which is also supported by Falcon, and switching to columnar file formats, which becomes easier with Hive and HCatalog, we can easily slim HDFS down and allocate a few machines for a smaller backup cluster. With two Hadoop clusters in hand Falcon let’s you specify which datasets you want to replicate and how frequently – everything else will be done automatically.
If we periodically replicate the raw and final datasets we get a really simple, but quite powerful, solution to disaster recovery. When our production cluster goes down due to planned maintenance, temporary lack of electricity or any unfortunate disaster then we can quickly switch our processing to a backup cluster. The backup cluster will contain some of final datasets that we can start processing instantaneously. Final datasets that Falcon hadn’t finished to replicate are likely to be quickly re-generated based on the raw datasets that the backup cluster has. If raw data is missing in a backup cluster, you might be able to re-consume it from your message broker (eg. Apache Kafka).
If you are interested in pure backups without disaster recovery, it might be not the most efficient to dedicate nicely equipped machines with multi-core CPU and tens or hundreds of RAM to the backup cluster. In such case you can consider to backup your data to the public cloud storage targets. Thankfully, Falcon supports data replication from Hadoop to Amazon S3 and Microsoft Azure (and vice versa).
Read more details feed replications in Falcon‘s official documentation. It lets you replicate HDFS directories as well as Hive tables.
Although Falcon provides a lot of value, there is still room for improvements. Here is my subjective list of ideas that I would find both useful and exciting:
Automatic registration of datasets in HDFS and Hive
For each HDFS directory or Hive dataset that you want to use with Falcon, you must first define a feed that describes this dataset and submit it to Falcon. You can define a feed by writing a simple XML file or navigating through the new Web UI. No matter which method you choose – you need to specify several properties such as name, description, the Hive table or HDFS path it corresponds to, tags and so on. This information is somewhat redundant to what you might already have in description of your Hive tables. Therefore, Falcon could instead scan the Hive Metastore and HDFS and semi-automatically create feeds for each Hive table and pre-populate its properties. There is already a JIRA ticket for that.
More types of processes
Currently, Falcon schedules Hive queries, Pig scripts and Oozie workflows out of the box. If you want to schedule a Spark application, a Camus job (a MapReduce job that copies events from Kafka to HDFS) or a Sqoop job, it’s still possible, but you need to take advantage of an inconvenient workaround which is defining a simple, yet unpleasant, Oozie workflow that fires a Shell or SSH action with a command that submits your job (in case of Shell action – yet another disadvantage is that you need to deploy Spark client on each slave node; in case of SSH action – you probably end up submitting all Spark applications from a single node).
There was a loose idea to add native support for Spark or MapReduce in Falcon. It might be now a better time to do so since Oozie 4.2.0 already provides Spark action, so Falcon can just delegate this responsibility to it.
Going beyond Hadoop feeds
For several reasons including data discovery and data lineage, it would be useful to provide support for other data sources or destinations that are commonly used with Hadoop e.g. Kafka topics, Cassandra keyspaces, MySQL tables, Solr indexes and so on. You can track progress on this feature in a JIRA ticket.
Going beyond batch processing
From the day it was built until now, Falcon has been using to coordinate and schedule batch jobs running on top of Hadoop. Wouldn’t it be useful to get support for streaming feed and streaming process that can be executed by Storm, Samza, Spark Streaming or Twitter Heron? The relevant discussion has already started and there are some ideas presented.
Support for HDFS snapshots
Although Falcon already provides data retention and replication, it doesn’t support automatic creation of HDFS snapshots. While creating HDFS snapshots on your own is not a difficult task, you would probably delegate it to Falcon happily.
Currently Falcon doesn’t provide email notifications once scheduled feed/process instance finishes or fails. On the other hand, Falcon has JMS notifications and creates a JMS topic for every process and feed that is scheduled (the name of the topic is same as the process or feed name). This means that you can configure Falcon to push messages to ActiveMQ and then, for instance, use Apache Camel to consume on the required topic and route messages to a specified email account.
In theory it looks great and it can even fulfil your needs (i.g. be notified when the process fails), but it’s inconvenient in the long run. First and foremost, Falcon is a declarative solution and the users do not want to write JMS consumers. Secondly, you need to maintain the list of recipients in Camel (a yet another system), while it would be easier to define it in Falcon’s definition of process. Notifications could be much simpler, if Falcon could seamlessly integrate with some alerting tool or at least provide an additional tag in the process definition that lets you specify where an email should be sent when the process fails or SLA is not met.
The JIRA that tackles this problem is already created.
Further web UI improvements
During “Falcon bi-weekly meeting” that I attended in February 2015, one of Falcon committers told that Falcon is considered to be good as a platform, but not yet good as an end product. What he meant was that the backend and CLI tools are quite solid and rich, but the front-end still lacks many UI features, so that you end up using CLI tools very often.
Fortunately Falcon Web UI gets really better and better, especially with a new features that come with the recent releases of 0.6.1. Some future ideas include wider search capabilities (e.g. searching by column name, ownership, file format, compression codec, total size, the last modification or access time, the number of applications that process the feed) and improved dashboards (e.g. easily see the status of the last instance of each process/feed to check, for instance, if the nightly ETL finished flawlessly). Thankfully, many of UI improvements are in the roadmap.
Falcon was originated at InMobi several years ago and today it forms the backbone of InMobi’s mobile data analytics pipelines. According to the blog post on Falcon, InMobi had 400+ workflows and 2000+ data feeds managed by Falcon in January 2015. Falcon is also used at Expedia and at some undisclosed companies. I used Falcon at two Swedish companies and my experience with this project is very positive.
Falcon became Top Level Apache Project in December 2014. Currently it has 17 contributors from 3 companies. The mailing list is very active and I always got responses very quickly.
Last but not least, Falcon is already included in HDP (Hortonworks Data Platform) and possible to install using Apache Ambari. You can also build it from sources and use with CDH (Cloudera Distribution of Hadoop).
Falcon and Atlas
Although Falcon seems to provide useful solutions for problems of scheduling and data management, yet another project called Apache Atlas (incubating) has been recently proposed. According to its website, Atlas is “data governance and metadata framework for Hadoop”. Atlas aims to deliver capabilities like data classification, search of data and metadata, data lineage, data life-cycle management, centralised auditing, access policy enforcer as well as REST API for everything and supposedly nice web UI. As you see, some of these capabilities overlap with what Falcon offers.
How does Falcon compare to Atlas?
Atlas solely focuses on data governance part (not necessarily process scheduling). Its unique components are Knowledge Store (a scalable metadata service) and Audit Store that capture all information about data stored in Hadoop and provide interfaces for searching, lineage, classification, auditing and so on. Interestingly, Atlas will delegate data-lifecycle management tasks such as replication, retention, late date handling to … Falcon (similarly, enforcement of security policy rules are to be delegated to Apache Ranger). The possible scenario is that instead of collecting own set of metadata about datasets, Falcon will simply ask Atlas for them. Last but not least, the integration with Atlas should accelerate the development and maturity of Falcon.
When my previously mentioned colleague, David Whiting, shared his notes from a very popular Big Data conference that was held in November 2014, I read “(still) nobody is talking about the hard problems of scheduling and data management”. From what I also see as a data consultant at GetInData, proper data management and easy scheduling of processes are challenges that all data-driven companies face, but surprisingly many of them under-prioritise.
It’s nice that Falcon offers solutions that address many of these challenges!
A couple years ago, I was impressed by the sizes of Hadoop clusters and I wanted work with as big Hadoop cluster as possible Today, I think that not big cluster is something to be proud of but the clean and transparent one. If you make it dirty like Augean stables with no solutions for data governance aspects to know which, how and when data is processed and managed, it won’t be as useful as it could be. In consequence, the user productivity decreases significantly with the growing technical debt.