Avoiding The Mess In The Hadoop Cluster (Part 1)

This blog series is based on the talk “Simplified Data Management and Process Scheduling in Hadoop” that we gave at Big Data Technical Conference in Poland in February 2015. Because the talk was very well received by the audience, we decided to convert it into blog series.

In the first part we describe possible open-source solutions for data cataloguing, data discovery and process scheduling such as Apache Hive, HCatalog and Apache Falcon.

Mess in the Hadoop cluster

We start with a true story that happened around year ago. That day I wanted to use Hive to analyse terabytes of data available in the 3-year old Hadoop cluster to find answer to my simple, but important question. My query was straightforward – just joining three production tables and counting the number of occurrences of some event.

I estimated that roughly 10 minutes are needed to implement this simple query. Unfortunately, the problems arose very quickly and slowed me down badly. First of all, I wasn’t able to quickly discover what datasets I should process and where they are located. When looking at HDFS and Hive, I saw many directories and tables with the names that suggested me that it’s the data that I needed, but I wasn’t 100% sure of that. Some of them looked like junk, like samples, like duplicates and like production datasets. To learn what a given dataset is about I had to send a group email to all analysts because it wasn’t stated anywhere who is the exact and true owner of a given dataset. There was no good documentation available, so that I simply guessed the meaning of the fields and their relationships with the fields from related tables. After several hours, a few emails and confirmations from +2 analysts, I found the data that I needed!

I implemented my HiveQL query very quickly, but it was running very slowly. It eventually finished successfully and returned the numbers that I wanted to see. I thought that it would be nice to schedule this query every week to check later if the numbers are still great. In this case, the frequent scheduling of that query would require me to implement a small piece of Python code in the framework called Luigi and modify the Unix’s crontab files that trigger the computation. It’s not that difficult, but slightly time-consuming, so that I gave up again and decided that I could simply run this query manually whenever it’s needed.

Wild Wild West

This made me sad. I had all data in Hadoop needed to solve a (simple) business problem, but I wasted not only my time by searching for and understanding the input data, but also somebody else’s time by asking where the input data is.

The core reason for the above is the lack of good practices that are introduced early enough and continuously enforced. It’s surprising, but proper data management and easy scheduling of processes are serious challenges that all data-driven companies face, but many of them under-prioritise. Although turning a blind eye to these aspects might not cause troubles when your data is small, it definitely becomes a nightmare when the number of your datasets, processes and data analyst is large. At scale, this kind of big data technical debt becomes more expensive!

Knowing some of the possible problems caused by bad practices, my colleagues and I started thinking how to avoid them and what open-source tools could possibly address them. Thankfully, there is a number of useful (but still less adopted) tools from the Hadoop ecosystem that help you to discover datasets quicker, schedule processes easier, smoothly migrate from one file format to another (without even touching your parsing code) and many others. We will cover them in this blog series.

Data catalogue

First of all, your should give identity to your datasets, so that analysts can easily find the data with no time-consuming investigation, understand the meaning of their fields and be sure that what a given dataset is about.

Perhaps you might want to hear about an innovative tool that automatically makes HDFS self-documented? Well, the tool that we propose isn’t revolutionary at all. Our simple idea is to add each production dataset to Hive. By creating Hive table on top of your data in HDFS, you can supply a lot of information about it: its name, fields, types and comments, the location, the data format, various properties in form of the key-value pairs and meaningful description of the dataset.

A nice side effect of adding your production datasets to Hive is that everyone can process them using SQL-like queries in Hive, Presto, Impala, Spark SQL and take advantage of BI tools that often integrate with Hadoop through Hive.

Reading data from Hive tables

When your datasets are nicely abstracted by Hive tables, you should benefit from this abstraction as often as possible. Therefore, the Hive Metastore becomes increasingly important and should be always up and running. Apart from that, all frameworks that wish to process your data should integrate with Hive to learn where a given dataset is located, what its file format is and so on. Obviously, HiveQL queries integrate with Hive perfectly, but in case of other tools like Spark, Scalding, Pig, MapReduce or Sqoop, you need some kind of adapter that knows how to talk to Hive.

hcatalogIn case of Pig, Scalding and MapReduce (and some others), HCatalog becomes this adapter. Thanks to HCatalog, you don’t need to hardcode or parametrize the path or format of the dataset – just specify the name of the dataset which is the same as its Hive table.

Currently, Spark Core doesn’t integrate with HCatalog well. This is not a big reason to worry about, though, because you can use Spark SQL in the Hive context instead. Spark SQL allows you to fetch the interesting data from the Hive table using an SQL-like query. Later, you can seamlessly process this data using the Spark Core API in Scala, Java or Python.

Web UI to your data

Thanks to Hive, you have a central and nicely documented repository of our datasets. Thanks to HCatalog (or Spark SQL), your Hive tables can be accessed by the most popular Big Data frameworks. Unfortunately, neither Hive nor HCatalog offer out-of-the box web UI to search, discover and learn about your datasets.

There is a small temptation to implement such a web UI by yourself because it doesn’t seem to be that hard. Alternative approach is to use some ready-to-use solution and one of them is Apache Falcon.

falcon-feedFalcon allows you to define datasets and submit them to the Falcon server, so it can “manage” them. In Falcon’s nomenclature, datasets are called feeds. For each feed, you can specify which Hive table (or HDFS dataset) it corresponds to. You can tag dataset and later use these tags when searching. You can define who is the owner of of the dataset, write its description, specify the group that a given dataset belongs to etc. This information is somewhat redundant to what you specify in Hive tables, but there is the idea that Falcon could scan the Hive Metastore and automatically create feeds for each Hive table and inherit its properties. As we also see later, Falcon lets you supply additional properties of your datasets that mean nothing to Hive, but are used by Falcon for popular data management tasks (e.g. retention) which we cover later.

In Falcon, a dataset can be described by writing relatively short XML file, or filling a simple form in the web UI. This new (and improved) web UI is currently in the code-review phase under the FALCON-790 ticket, but it should be committed to the trunk before the end of 2015Q1 (hopefully).

falcon-search-webuiOnce feeds are added to Falcon, you can see and search them by name, tag and status using the web UI and CLI. Currently, the web UI isn’t mind-blowing, but probably there isn’t anything more suitable in open-source projects today. Naturally, it can be extended by the community, so that additional search criteria are possible e.g. field name, field comment, partition field, file format, ownership, compression codec, directory name, total size, the last modification or access time, the number of applications that process the feed.

Automatic data retention

Apart from the nice web UI, Falcon offers multiple useful features related to data management. For example, you can define the retention period for a dataset to enforce that each instance of this dataset should be automatically removed from Hive/HDFS after a specified period of time since its creation or last access. Thanks to that you won’t keep old or unused datasets on disks, and you won’t have to schedule the spontaneous “HDFS cleaning day” to urgently remove useless data when the free HDFS disk space falls below the critical threshold (let’s say, 10% of total disk space) and/or implement own cleaning scripts for that.

Process scheduling

You tested your Hive query, run it once and saw that it works. Now you want to schedule it periodically.

There are a couple of scheduling tools especially built for Hadoop. While Oozie, Azkaban and Luigi (which is often combined with cron) are probably the most popular and widely-adopted ones, the project that we found interesting is … Falcon.

falcon-new-processAssuming that your datasets are described as feeds, Falcon allows you to define and schedule processes that process these feeds. For each process, you define what the input and output feeds are, how often a process should be executed, how to retry it when something fails (i.e. how many times to retry and what time intervals are), what the type of the process is, and many others. Out of the box, Falcon can schedule Hive queries, Pig scripts and Oozie workflows (a native support for other frameworks like Spark or MapReduce is to be added soon, but you can still schedule them through Oozie shell/ssh actions).

process-instances-falconThe integration between Falcon and Oozie is very close. Falcon isn’t actually a scheduler built from scratch, it simply delegates most of the scheduling responsibilities to Oozie, but gives us a bit nicer, shorter and more powerful scheduling API.

Falcon also helps you to keep track of the execution of the recent instances of your process. It shows you which instances finished successfully, which failed, which still wait for the execution time and/or the input dataset, which time-outed and so on. For these events, Falcon sends JMS messages – you can consume them from ActiveMQ and possibly convert into email and/or alert to let yourself or colleagues know when something important happens.

Please note that I don’t claim that scheduling a process in Falcon is easier than in Luigi, Azkaban or Oozie. It definitely requires some effort, time, practice and discipline, but once you do so, you get many benefits for free (some of them will be described in the remaining part of this blog post series).

falcon-landing-page

Data lineage

Since Falcon has a wealth of information about your processes and their input and output feeds, it becomes easy to keep track how your data is transformed through the pipeline. Falcon displays this information in form of graph that shows the “lineage”. You can navigate through this graph by clicking each vertex. The lineage helps you quickly answer questions such as where the data came from, where and how it is currently used, what the consequences of removing it are – everything without the need of sending emails to your colleagues.

falcon-lineage

Another useful (but still in the code-review phase) feature is “triaging” issues related to data-processing. Imagine that your dataset hasn’t been generated yet and you want to discover why. In this scenario, Falcon could easily walk up the dependency tree to identify the root cause such as a failed or still running parent process and display it in a consumable form to the users.

The second part

In the second (and last) part of this blog series, we explain how to smoothly change the format of your datasets, highlight move advanced features of Falcon such as SLA, data backups, disaster recovery, late data handling as well as future enhancements and ideas. We will describe some disadvantages of Falcon as well. Stay tuned!

Acknowledges

I would like to thank Josh Baer and Piotr Krewski for a technical review of this article.

SHARE THIS!
Post by Adam Kawa

Adam became a fan of Big Data after implementing his first Hadoop job in 2010. Since then he has been working with Hadoop at Spotify (where he had proudly operated one of the largest and fastest-growing Hadoop clusters in Europe for two years), Truecaller, Authorized Cloudera Training Partner and finally now at GetInData. He works with technologies like Hadoop, Hive, Spark, Flink, Kafka, HBase and more. He has helped a number of companies ranging from fast-growing startups to global corporations. Adam regularly blogs about Big Data and he also is a frequent speaker at major Big Data conferences and meetups. He is the co-founder of Stockholm HUG and the co-organizer of Warsaw HUG.

8 Responses to Avoiding The Mess In The Hadoop Cluster (Part 1)

  1. Ajay Yadav

    Excellent Article! I would like to add one more point – Data flow / Relationships in an entire pipeline.

    Data Lineage that you have mentioned currently shows only one level of dependencies/data-flow. With https://issues.apache.org/jira/browse/FALCON-256 you can visualise data flow in your entire pipeline as well. Very useful to visualise how data is flowing between various processes and through which feeds.

  2. Dmitry Kholodilov

    Hi Adam!

    Do you know any solution to the problem of adding Hive partitions? E.g. we have 100+ datasets partitioned by date (daily or hourly) and we want to create Hive partition for each new directory in HDFS. One solution is to demand all producers to have explicit “add partition” step by it is not very scalable and reliable (we have a problem when Hive metastore is down – bunch of partitions has to be re-created somehow ).

    Dmitry.

    • Vick

      Good question! Just after I wrote my own program to solve the problem you mention, I found MSCK. The quote below is from the confluence Hive manual.

      Recover Partitions (MSCK REPAIR TABLE)

      Hive stores a list of partitions for each table in its metastore. If, however, new partitions are directly added to HDFS (say by using hadoop fs -put command), the metastore (and hence Hive) will not be aware of these partitions unless the user runs ALTER TABLE table_name ADD PARTITION commands on each of the newly added partitions.

      However, users can run a metastore check command with the repair table option:

      MSCK REPAIR TABLE table_name;

      which will add metadata about partitions to the Hive metastore for partitions for which such metadata doesn’t already exist. In other words, it will add any partitions that exist on HDFS but not in metastore to the metastore. See HIVE-874 for more details.

      PS – It took four tries to get this darned CAPTCHA – It sucks!

Leave a Reply

Your email address will not be published. Required fields are marked *

Blue Captcha Image
Refresh

*