Time-related Configuration Settings and Assumptions in Camus
Camus, a MapReduce job that loads data from Kafka into HDFS, has a number of time-related configuration settings and assumptions. They control how many messages are consumed from Kafka in each Camus run and where the data is stored in HDFS. I summarize them in this blog post.
Detecting timestamps in messages sent to Kafka
Simply speaking, Camus assumes that the timestamp of when the message was generated (or sent to Kafka) is stored together with the message in a predefined field. If the timestamp isn’t set, Camus creates a new timestamp at the time the message is consumed from Kafka (i.e. the current timestamp).
Out of the box Camus supports the consumption of messages stored in Kafka in two formats: JSON and Avro. To let Camus parse these messages you must provide a decoder class (configured using camus.message.decoder.class) that is responsible for converting raw data pulled from Kafka into a Camus format.
- If you use JSON (that is slow to parse, hard to evolve and should be avoided), you can use the com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder class that assumes the timestamp is set in the camus.message.timestamp.field field (default value: timestamp) in the format of camus.message.timestamp.format (default value: [dd/MMM/yyyy:HH:mm:ss Z]). If a JSON message doesn’t have a timestamp or if the timestamp could not be parsed properly, then current time is used.
- If you use Avro (that is relatively fast and supports the schema evolution), then you can use either io.confluent.camus.etl.kafka.coders.AvroMessageDecoder or com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder. However, these decoders assume that the message was sent to Kafka in a special format with a header that contains metadata like a timestamp, a server name and the service name.
- If case you use vanilla Avro-serialized data in Kafka (sent by producers that are not aware that Camus will consume their messages), you can still implement own decoder that extracts the timestamp associated with each message from some field.
This extracted timestamp will be later used by Camus for two purposes that are described in the next sections. Therefore (and not only because of this), it’s highly recommended to have some meaningful timestamp associated with each message (typically it would be the time at which the record was generated or sent to Kafka).
Consuming messages from Kafka partitions
Camus has three time-related configuration options that control how much data is consumed from each Kafka partition in each run.
In other words, the messages with the timestamp between (ts, ts + kafka.max.pull.hrs), where ts is the timestamp of the first message that was produced after runTime - kafka.max.historical.days, will be consumed by a Camus task, assuming that the task is fast enough to finish within kafka.max.pull.minutes.per.task.
The default value of these three configuration settings is -1 which means “no limit” and they make Camus consume as much data as possible in each run, no matter how long it takes. You might want to override these values on several scenarios:
1. Ignoring too old messages
Imagine that you have an existing real-time pipeline that sends messages to Kafka, consumes and processes them using Storm and then sends the results to Cassandra (this is what e.g. Spotify does for real-time ads personalization). Now you also want to transport data from Kafka to HDFS to do some offline analysis. In this case, for the first Camus run, you can tweak kafka.max.historical.days to consume only recent messages rather than all historical ones that might be actually not needed by you and/or be too time-consuming to transfer.
2. Avoiding overlapping Camus runs
Camus is a batch job that is executed periodically (let’s say each 15 or 60 minutes) by the scheduler of your choice (e.g. Falcon or Luigi). Ideally, your scheduler should’t submit a new Camus job while the previous one is still running because you will be copying the same data twice. Also, to avoid the negative scenario where Camus runs slowly due to a single long-running map task (caused by e.g. a data-skew in partitioning semantics where speculative execution doesn’t help), you can set the value of kafka.max.pull.hrs to something smaller than the frequency of your Camus runs to ensure the Camus jobs don’t overlap.
Please note that if you forcibly shorten the execution of a Camus map task, then some leftover messages from the previous time-window might not be consumed yet. The next Camus job will pick old messages and write them to existing HDFS directories that might have been already processed by ETL jobs. In this case, you should have a “late data handling” strategy in place to re-process this data somehow.
3. Decreasing the number of file created by each map task
Last but not least, it’s worth noting that each map task run by Camus can write messages to multiple HDFS files in parallel. For example, if you consume data from last day from 3 topics and partition data by 15-minute boundaries, each map task will write data to up to 288 (3 * 4 * 24) files in parallel. Because map tasks can misbehave if they write to too many files in HDFS, the safeguard configuration setting kafka.max.pull.hrs was introduced to limit the number of partitions for which a map task writes a new file. According to this discussion, a Hadoop task can start misbehaving when it opens more than 30 files.
Retention in Kafka
Yet another time-related configuration setting that might impact a Camus job is log.retention.hours that specifies the retention period for the messages stored by Kafka brokers. In an ideal scenario, Camus job tries to consume messages from the place where the previous Camus job execution finished, but it might not always be the case. For instance, if the Camus job hasn’t been running for a period longer than log.retention.hours then some of the messages might have been already deleted by Kafka brokers and therefore be not available for consumption.
Writing data to HDFS
When writing data to HDFS Camus tasks automatically partition the output based on several properties, including the timestamp of each message.
An exemplary file generate by a Camus map task might look like:
Although a default partitioner partitions incoming data into hourly partitions, it’s still possible to implement the custom one and start using it by specifying appropriate class name in the etl.partitioner.class option.