Apache Kafka’s distributed, durable, and high-throughput nature makes it a natural fit for streaming many types of data. Hiya uses Kafka for a number of critical use cases, such as asynchronous data processing, cross-region replication, storing service logs, and more. Since Kafka is a central component of so many pipelines, it’s crucial that we use it in a way that ensures message delivery.
In this article I’ll share some of our best practices for ensuring consistency and maximizing availability when using Kafka. This material is based off a brown bag presentation I gave for our engineering org on the same topic. The slides for the original presentation can be found here.
Before we continue, let’s review some of the fundamentals of Kafka.
When writing messages into Kafka, we write them into a topic. Topics contain logically related messages which are divided into a number of partitions. Partitions serve as our unit or ordering, replication, and parallelism. While topics are the interesting component from a business perspective, operationally speaking, partitions are the star of the show.
Topics are configured with a replication-factor, which determines the number of copies of each partition we have. All replicas of a partition exist on separate brokers (the nodes of the Kafka cluster). This means that we cannot have more replicas of a partition than we have nodes in the cluster.
A replica is either the leader of its partition, or a follower of the leader. A follower can either be in-sync with the leader (contains all the partition-leader’s messages, except for messages within a small buffer window), or out-of-sync. The set of all in-sync replicas (including the partition-leader) is referred to as the ISR.
This information is summarized by the following illustration of a topic with a partition-count of 3 and a replication-factor of 2:
Understanding Broker Outages
In order to properly tolerate a broker outage, we must first understand the behavior of a Kafka cluster during a broker outage. Let’s explore this in enough depth to address our concerns around partition availability and consistency.
During a broker outage, all partition replicas on the broker become unavailable, so the affected partitions’ availability is determined by the existence and status of their other replicas. If a partition has no additional replicas, the partition becomes unavailable. If a partition has additional replicas that are in-sync, one of these in-sync replicas will become the interim partition leader. Finally, if the partition has addition replicas but none are in-sync, we have a choice to make: either we choose to wait for the partition leader to come back online–sacrificing availability — or allow an out-of-sync replica to become the interim partition leader–sacrificing consistency.
Once the broker comes back online, all its replicas will come back in sync with their partition-leaders. Once caught up, any replicas that were originally partition-leaders will become partition-leaders once again. It is worth noting that during the catch-up phase, the replicas are checking for missing messages based solely on message offsets. This detail will become important later on.
Handling Broker Outages
Replication-Factor and Minimum ISR Size
Based on this behavior, we can see that a partition must have an extra in-sync replica available to survive the loss of the partition leader. In order to satisfy this requirements, we can configure our topic’s `min.insync.replicas` setting to 2. If we want all topics to have this setting by default, we can configure our brokers with an identical setting.
In order to have a minimum ISR size of 2, our replication-factor must be at least this value. However, we actually need to be more strict. If we only have 2 replicas and then lose a broker, our ISR size shrinks to 1–below our desired minimum. Therefore, we must set our replication-factor to be greater than the minimum ISR size (at least 3). To make this the topic default, we can configure our brokers by setting `default.replication.factor` to 3.
With these settings, our topics are configured to tolerate a single broker failure, but our producers still have a role to play to ensure consistency. When a producer sends a produce request, the conditions for the request to be considered a success is determined by a producer configuration parameter called `acks`, which can be set to one of the following values:
- 0: return success at the moment the request is sent
- 1: return success once the partition leader acknowledges the request
- all: return success once all replicas in the ISR acknowledge the request
Consider this situation where this setting is set to either 0 or 1. The producer sends a message X to a partition, then the partition leader dies shortly after. In this situation, X will get written to one replica, but not the others. Despite this, our other replicas are still in-sync (messages within the buffer window are not required to be in-sync) and one will become the interim partition leader. Now, when the producer sends a messages Y to the partition, it will be written with the same offset as message X. Once the down broker comes back online, the partition leader will not realize that it is missing message Y, and our partition has become inconsistent.
If we had set `acks` to `all` instead, then the request with message X would have either been written to all our replicas or the request would have failed. In either case we avoid the demonstrated inconsistency.
While the specified settings do give us consistency with high availability, there is still a detail that needs to be addressed: these settings provide us with at-least-once message delivery. On other words, we will end up with some duplicate messages in our topics. To see this, consider the following scenario.
Using the recommended configuration, our producer writes a message, which gets written to the partition leader. Before the follower replicas fetch the message, the request times out. Shortly after, the follower replicas fetch the message. In this situation, the message was written successfully to all replicas, but the producer considers the request to have failed, and retries. After a successful retry, we’ve written the message to our partition twice.
Thankfully, at-least-once message delivery does not have to be a problem. It does, however, require consideration in how we design our consumers. In particular, consumers should be designed to handle messages idempotently. This can be accomplished in a variety of ways, such as by including a unique id to each message, then tracking the processed ids. If we come across a message with an id that we’ve already processed, we simply skip over the message.
If you would rather design around exactly-once message delivery, the good news is that Kafka does support it! However, it is not straightforward, so I will not explain it in this article. Instead, consult this article from Confluent’s Neha Narkhede.
Based on what we’ve learned so far, we can ensure that even if a broker goes down, we can ensure that all partitions of a topic remain available without sacrificing consistency. This will save us from many failures, but not all! We still need to be concerned about the possibility of an entire datacenter outage.
Thankfully Kafka makes it very easy to handle datacenter outages using a feature called Rack Awareness. The idea is simple: each broker is configured with a label describing which “rack” (or datacenter) the broker is within. Then, when Kafka assigns replicas across different brokers, it spreads our replicas across the available racks. If we have 3 Kafka brokers spread across 3 datacenters, then a partition with 3 replicas will never have multiple replicas in the same datacenter. With this configuration, datacenter outages are not significantly different from broker outages.
Of course, we can always experience a combination of failure modes. What happens when multiple brokers fail in different AZs? What happens when multiple AZs fail? I will not dive into these questions, but if they intrigue you, consider them an exercise to the reader.
A brief note on Zookeeper
While these settings can help ensure consistency and high uptime in your Kafka topics, remember that Kafka is dependent on Zookeeper. Therefore, a Kafka cluster is at most as reliable as the Zookeeper it depends on. Make sure to put some thought into how your Zookeeper is configured to reach your HA requirements.