Best Practices: Improving Fault-Tolerance in Apache Kafka Consumer

How to effectively manage client-side partial failures, avoid data loss and process errors Apache Kafka is the gold standard for building real-time data pipelines and streaming apps. Scalable, fault-tolerant and blazingly fast, countless organizations rely on Kafka to provide a unified, high-throughput, low-latency platform for processing massive data feeds. The downside? While the platform solves many big data issues, it can be complex to install, set up and maintain. As such, it’s important for developers to consider not just what the technology can do when the platform is running well, but how the code will behave during failures. In this post, we focus on solving client/consumer failures within Apache Kafka. These best practices will help engineers build code that will maintain performance when consumed from a high-throughput pipeline and also address failures without creating bottlenecks or lost data.

Achieving a Resilient and Fault-tolerant Processing Pipeline During a Kafka Consumer Implementation

At CrowdStrike, our platform must ingest trillions of events per week, in real time and without data loss. To increase the throughput of the Kafka processing pipelines, CrowdStrike leverages the power of Go. This language allows us to build performant, lightweight microservices that can scale horizontally and use multi-core instances efficiently. The built-in concurrency mechanism that Go provides is particularly useful when the event process is I/O bound. Each of our consumers has a series of concurrent workers. To maximize efficiency and avoid “hot spotting,” or overloading one partition with events, we typically use a “round-robin” assignment approach, which distributes events to a pool of workers. Kafka partitions allow us to have multiple instances of the consumer (as a microservice instance) so that when a batch of events is received from Kafka, the consumer will “deal” events to the workers concurrently; each worker receives only one event at a time. Using this approach, we achieve two basic advantages: 1. Code simplicity: the code in the event handler is scoped to only one message, so the logic is straightforward and easy to test; and 2. Granular error handling: this allows the worker to fail only one event (the consumer will automatically retry/redrive) and the system can continue processing. At CrowdStrike, we use Kafka to manage more than 500 billion events a day that come from sensors all over the world. That equates to a processing time of just a few milliseconds per event. Our approach of using concurrency and error handling — which helps us avoid mass multiple failures — is incredibly important to the overall performance of our system and our customers’ security.

A Deep Dive Into Failure Processing in Kafka

For the purposes of this article, we will consider a failure to be any unsuccessful attempt to process a specific event. The reasons may vary, from dependency failures, such as database outages or timeouts, to malformed payloads. The solution that we choose for these issues will depend on the type of failure we are experiencing, as well as the goal of the system. It is important to note that not all solutions apply to all use cases. For example, a system may sacrifice its accuracy by allowing a small portion of events to be lost in order to achieve a higher throughput. Another may need to process the events in a specific order, sacrificing throughput and also making it incompatible with a redrive (retry) system, which we will cover later in this article. When considering these best practices, it is important to take into account the goals of the code and ensure that the solution does not exacerbate the very problem the team is trying to solve. Typically speaking, our team experiences two basic failure types: partial failures and outages. What follows is an overview of our most common recovery techniques.

Partial failures

Partial failures occur when the system fails to process only a small portion of events. In writing the code for our internal systems that require a high throughput, we created a set of rules that will keep the events moving despite mass flow. One rule we created is a timeout. This is when we isolate individual events that are taking too long to process. When this happens, the system is trained to set the event aside and pick up the next message. The idea is that it is more important to maintain the performance of the overall system, as opposed to processing every single event. Timeouts can be applied to the batch level and/or for each event in particular. The side effect of not having “rogue events” is a predictable latency. This leads to a more deterministic throughput of the system, which is an important feature of any distributed system.
Side note: CrowdStrike’s microservices leverage the Golang driver built by Confluent (confluent-kafka-go and librdkafka) through our in-house built wrapper that implements all of the techniques mentioned in this article.
Another feature of our Kafka wrapper is the ability to retry specific messages that failed for any reason, including timeouts. The dispatcher that assigns events received from Kafka to the worker pool keeps a record of each message processing result and the amount of retries that were attempted. In our system, there are three levels of retries: one is at “runtime,” which is done while the batch is in progress; the second is using a “redrive system,” which identifies specific messages to be reprocessed later and the third is from cold storage. In our system, the retries can be combined. For example, we allow three runtime retries for each redrive and a maximum of five redrives, which means we allow 15 retries in total. In practice, the runtime retries cover 99% of the failures. In the unlikely scenario of the event failing all 15 retries, it is removed from the pipeline altogether and set aside in a “dead letter queue” system (which in our case is a cold persistent storage) followed by other retries and manual investigations.
Side note: We call a “redrive” system a message queue that stores the same messages as the topic it is used for. In CrowdStrike’s case, the redrive is usually a secondary Kafka topic. When a message fails, it is requeued in another topic, and a resulting advantage is that the consumer can prioritize which events are more important (the new or failed ones). To implement this kind of system, you can also use the same topic, or another messaging queue (like SQS) altogether. The redrive comes with two caveats: it adds code complexity (the consumer has to listen from two topics), and it is not compatible with consumers that need to consume the events in order.
The first type of retry is meant to fix “glitches” in the system, such as failed requests or timeouts. An exponential retry in the orders should take just a few milliseconds. The second retry that uses the redrive system usually fixes temporary malfunctions of the consumer dependencies. The best example would be a database rejecting requests for several minutes.
Side note: For readers that do not know how Kafka offsets work, here is a brief overview of the most common scenario: Consumer groups do not have the ability to acknowledge or retry a specific message from a Kafka topic. It has to advance its offsets, communicating to Kafka that all of those messages were processed. This means that the consumer group is always stuck at the oldest unprocessed message. By “redriving” the few failed events, we can acknowledge the messages from the topic because they will return later, through the redrive.
Pro tip: Do not consider malformed events as failures, as this can lead to unwanted side effects, especially when throttling, which leads to our next topic:

Outages

Because our systems need to process all of the messages in near real time, we have to match the consumer throughput with the rate of events created by the producers. When the system is running normally, we say that it is at “full throttle.” When failures occur, we can enforce a temporary artificial limitation at the consumer level. If the limit is smaller than the producer’s throughput, then the consumers will “lag” behind. As such, these techniques are only used as temporary solutions when things are not fully functioning. Manual intervention is very rare, but it can happen during these events when a medium or large outage affects other dependent systems. A simple solution can be to limit the number of instances and workers per instance when the database is partially unavailable. Automatic throttling is where the “magic” code lies. Each system can implement its own logic, which can increase or decrease the throttle level, based on its own metrics and flows. The default system typically increases the throttle for a failed event and decreases it during successful ones. In this system, each result carries a different weight on the throttle. As such, failures can be more important to manage than successful events. The effect of this technique is the automatic reaction to any dependency failure. Another use is when a dependency, which is another microservice, fails to respond for a couple of seconds. In this case, our consumer will automatically fail to process the events. The throttle will notice and reduce the throughput by waiting a few milliseconds between each event. The number of requests to the failed service will decrease, speeding up its recovery. This means that we will achieve an automatic “exponential back-off pressure strategy.” picture of a graph The throttle works very well with the redrive system we mentioned earlier. The throttle will slow down ingestion, but at the same time, the failed events will be retried, which prevents data loss.

Full stop

So what happens when too many events are set aside in a timeout? In most cases, the algorithm is fairly simple. If the system hits a certain threshold of failures in a limited amount of time, it presumes that the system is losing too many events and simply shuts down. It’s only natural to assume that if most of the events cannot be processed, there is no reason to proceed. After all, why would anyone want to stop a full system, leading to a large Kafka lag? This would be a legitimate question, and you have to weigh the pros and cons for each individual consumer group and system. On one hand, it can act like a safety mechanism, preventing the system from sending large amounts of data to the dead letter queue, which will require large efforts to reprocess. On the other, by combining this event with the auto-restart system, which is typically a default scenario in a container deployment, this process can act like an auto-repair threshold. This may be particularly helpful when the consumer runs out of memory or when the instance experiences a hardware failure, causing data loss. Within this category, there can also be false positives. Examples can include: considering malformed events as failures; a bug in one of the producers; or a schema change that can lead to a full system shutdown. Needless to say, the system should monitor and alert in such scenarios. In most real-time pipelines, stopped consumers are almost as critical as a database outage.

 

In writing our code to address each of these Kafka consumer failure scenarios, we have managed to maintain performance and avoid data loss. If the coding gods are on our side, we will soon be able to open source our Kafka wrapper package. It is written in Go, our main Cloud language, and has all of these techniques and more built in. We believe it will be a useful tool to many in the coding community. In the meantime, we hope these tips help shed some light on how to process consumer failures in Kafka. Does this work sound interesting to you? Visit CrowdStrike’s Engineering and Technology page to learn more about our engineering team, our culture and current open positions.

 

 

Additional Resources

Breaches Stop Here