Processing Error Handling in Kafka Streams

Introduction
A major concern when developing Kafka Streams applications is handling processing errors. Processing errors occur when the implemented logic in a Kafka Streams application fails to process a record correctly.
Let’s illustrate what processing errors are with a concrete example from our Supply Chain domain: a DeliveryBooked event containing missing data.
A DeliveryBooked event represents a scheduled truck delivery. Each event contains important details, such as the number of tires in the truck.
A valid DeliveryBooked record might look like this:
{
"deliveryId": "DEL12345",
"truckId": "TRK56789",
"numberOfTires": 18,
"destination": "Bordeaux"
}
However, in real-world scenarios, missing data is a common issue. Consider the following malformed event:
{
"deliveryId": "DEL12347",
"truckId": "TRK56791",
"numberOfTires": null,
"destination": "Paris"
}
If the application attempts to process this record without checking for null, it will trigger a NullPointerException
.
The question we will answer in this blog article is: How should we properly handle processing exceptions?
Uncaught Exception Handler
The first option that comes to mind when discussing exceptions in Kafka Streams is the Uncaught Exception Handler. It allows you to define how the application should behave when such errors occur. Using an enumeration, you can choose to:
- Replace the StreamThread
- Shut down the Kafka Streams instance
- Shut down all Kafka Streams instances sharing the same
application.id
KafkaStreams kafkaStreams = new KafkaStreams(topology, properties);
kafkaStreams.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
kafkaStreams.start();
A simple Kafka Streams topology that does not explicitly handle exceptions, allowing all errors to be managed by the UncaughtExceptionHandler, would look like this:
public static void buildTopology(StreamsBuilder streamsBuilder) {
streamsBuilder.stream("delivery_booked_topic", Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(value -> parseFromJson(value)) // JsonSyntaxException
.filter((key, value) -> {
if (value.getNumberOfTires() < 0) {
throw new InvalidDeliveryException("Number of tires cannot be negative");
}
return value.getNumberOfTires() >= 10;
}) // InvalidDeliveryException or NullPointerException
.mapValues(value -> parseToJson(value))
.to("filtered_delivery_booked_dsl_topic", Produced.with(Serdes.String(), Serdes.String()));
}
In this simple example, multiple exceptions can be thrown during operations on record values:
- A
JsonSyntaxException
may occur when converting a String to a DeliveryBooked Java object. - An
InvalidDeliveryException
may be raised on purpose if the number of tires is negative. - A
NullPointerException
may be thrown when verifying the number of tires if the value is missing.
Since this topology does not handle these exceptions, they are automatically passed to the UncaughtExceptionHandler, which then interrupts processing.
The Try-Catch Mechanism
What if the pipeline should not be interrupted by processing errors? What if the decision to fail or continue depends on the type of exception or the specific operation where it occurs?
To implement this logic, DSL operations must be wrapped in a custom try-catch mechanism. This provides control over whether to propagate the exception, causing the stream to fail, or suppress it to allow processing to continue.
public static void buildTopology(StreamsBuilder streamsBuilder) {
streamsBuilder.stream("delivery_booked_topic", Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(value -> {
try {
return parseFromJson(value);
} catch (Exception e) {
log.error("Error parsing value {}", value, e);
return null;
}
})
.filter((key, value) -> {
try {
if (value.getNumberOfTires() < 0) {
throw new InvalidDeliveryException("Number of tires cannot be negative");
}
return value.getNumberOfTires() >= 10;
} catch (Exception e) {
log.error("Error filtering value {}", value, e);
return false;
}
})
.mapValues(value -> {
try {
return parseToJson(value);
} catch (Exception e) {
log.error("Error parsing value {}", value, e);
return null;
}
})
.to("filtered_delivery_booked_dsl_topic", Produced.with(Serdes.String(), Serdes.String()));
}
The same approach is required for a custom Processor, wrapping the logic in both the process
method and the schedule
callback (if present).
public class CustomProcessor extends ContextualProcessor<String, String, String, String> {
@Override
public void init(ProcessorContext<String, String> context) {
super.init(context);
context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
try {
// Any business logic raising an exception
} catch (Exception e) {
log.error("Error scheduling punctuation", e);
}
});
}
@Override
public void process(Record<String, String> record) {
try {
DeliveryBooked value = parseFromJson(record.value());
if (value.getNumberOfTires() < 0) {
throw new InvalidDeliveryException("Number of tires cannot be negative");
}
if (value.getNumberOfTires() >= 10) {
context().forward(record.withValue(parseToJson(value)));
}
} catch (Exception e) {
log.error("Error processing record: {}", record, e);
}
}
}
Not very elegant, right? It requires a lot of try-catch code that must be manually written for each operation. This is where KIP-1033 comes to the rescue!
Processing Exception Handler
KIP-1033 introduces an enhanced processing error-handling mechanism, inspired by the Kafka Streams API's approach to deserialization and production error handling. It adds the ProcessingExceptionHandler, which manages processing errors by integrating the try-catch mechanism directly into the Kafka Streams API.
This ensures that exceptions thrown from any processor are automatically caught and forwarded to the handler, allowing it to execute custom error-handling logic.
This feature was introduced in Apache Kafka 3.9.0.
New Interface
The Processing Exception Handler can be configured using the processing.exception.handler
property, which specifies an implementation of the interface:
public interface ProcessingExceptionHandler extends Configurable {
ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception);
public static enum ProcessingHandlerResponse {
CONTINUE(1, "CONTINUE"),
FAIL(2, "FAIL");
}
}
This handler can manage processing exceptions originating from:
- A DSL operation
- A custom processor (e.g., from the
Processor#process
implementation or theProcessingContext#schedule
callback)
It allows you to define custom error-handling logic and decide whether to fail or continue processing the stream. The handler provides a ErrorHandlerContext containing metadata such as:
- The topic, partition, and offset of the record that triggered the exception
- The ID of the processor where the exception occurred
- The processor's input record that caused the exception
- The exception itself
Default Implementations
Two implementations of the Processing Exception Handler have been introduced:
LogAndFailProcessingExceptionHandler
(Default): Logs the exception and fails the stream.LogAndContinueProcessingExceptionHandler
: Logs the exception and continues processing the next records.
Custom Implementation
You can implement a custom Processing Exception Handler to define specific error-handling behavior. For example, you can decide whether to fail or continue processing based on the type of exception:
public class ExceptionTypeProcessingHandler implements ProcessingExceptionHandler {
@Override
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
if (exception instanceof JsonSyntaxException)
return ProcessingHandlerResponse.FAIL;
if (exception instanceof InvalidDeliveryException)
return ProcessingHandlerResponse.CONTINUE;
if (exception instanceof NetworkException)
return ProcessingHandlerResponse.FAIL;
return ProcessingHandlerResponse.CONTINUE;
}
@Override
public void configure(Map<String, ?> map) {
// Do nothing
}
}
In this example, we consider a business error (e.g., InvalidDeliveryException
) to be continuable because there's no chance it will be processed successfully, even with retries. In this case, we simply want to skip the record and continue processing. In contrast, a NetworkException
is not continuable and could be retried. Here, we may want to fail the pipeline and let our container orchestrator restart the application to retry processing the record that triggered the exception.
Alternatively, we can decide what to do depending on the processor where the exception occurs:
public class ProcessorIdProcessingHandler implements ProcessingExceptionHandler {
@Override
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
if (context.processorNodeId().equals("MAP_PROCESSOR"))
return ProcessingHandlerResponse.FAIL;
if (context.processorNodeId().equals("FILTER_PROCESSOR"))
return ProcessingHandlerResponse.CONTINUE;
if (context.processorNodeId().equals("SELECT_KEY_PROCESSOR"))
return ProcessingHandlerResponse.FAIL;
return ProcessingHandlerResponse.CONTINUE;
}
@Override
public void configure(Map<String, ?> map) {
// Do nothing
}
}
Or we could decide what to do based on the input record itself:
public class RecordTypeProcessingHandler implements ProcessingExceptionHandler {
@Override
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
if (record != null && record.value() instanceof DeliveryBooked deliveryBooked) {
return deliveryBooked.getNumberOfTires() == null ? ProcessingHandlerResponse.CONTINUE : ProcessingHandlerResponse.FAIL;
}
return ProcessingHandlerResponse.CONTINUE;
}
@Override
public void configure(Map<String, ?> map) {
// Do nothing
}
}
Dealing With Business Errors
Handling technical exceptions like NullPointerException
is important, but what about business errors? Sometimes, a missing or incorrect field does not trigger a Java exception but still represents an invalid state in your domain.
For example, imagine a business rule stating that a truck must have at least 6 tires to be considered valid. A malformed event like this:
{
"deliveryId": "DEL12348",
"truckId": "TRK56792",
"numberOfTires": -4,
"destination": "Lyon"
}
This would not cause a Java exception, but it would violate business logic.
Instead of manually handling business validation, we can throw a custom exception and let Kafka Streams manage it using the Processing Exception Handler.
public static void buildTopology(StreamsBuilder streamsBuilder) {
streamsBuilder.<String, DeliveryBooked>stream("delivery_booked_topic")
.mapValues(value -> {
if (value.getNumberOfTires() == null || value.getNumberOfTires() < 0) {
throw new InvalidDeliveryException(
"Invalid number of tires for deliveryId " + value.getDeliveryId());
}
return value;
});
}
Just like technical exceptions, business exceptions can also be handled using the Processing Exception Handler. Since the handler catches any exception, it will also catch exceptions like InvalidDeliveryException, allowing you to define custom error-handling logic.
This provides full flexibility to decide whether to tolerate business errors or strictly enforce validation rules in your Kafka Streams application.
Metrics
A legitimate question to ask is: How can you track records that raised an exception but were dropped to allow processing to continue?
When a record is dropped and processing continues, two JMX metrics are updated:
- dropped-records-total: The total number of dropped records.
- dropped-records-rate: The average number of dropped records per second.
Code Sample
A small code sample is available on GitHub:
It demonstrates:
- The use of the Processing Exception Handler in DSL operations.
- The use of the Processing Exception Handler in the Processor API.
Contributions
Here is the list of implemented Pull Requests
KAFKA-16448 Add timestamp to error handler context
KAFKA-16448 Update documentation
KAFKA-16448: Unify error-callback exception handling
KAFKA-16448: Unify class cast exception handling for both key and value
KAFKA-16448: Handle fatal user exception during processing error
KAFKA-16448: Fix processing exception handler
KAFKA-17034: Tweak some descriptions in FeatureUpdate
KAFKA-16448: Add ErrorHandlerContext in production exception handler
KAFKA-16448: Add ErrorHandlerContext in deserialization exception handler
KAFKA-16448: Handle processing exceptions in punctuate
KAFKA-16448: Add ProcessingExceptionHandler in Streams configuration
KAFKA-16448: Add ProcessingExceptionHandler interface and implementations
KAFKA-16448: Catch and handle processing exceptions
KAFKA-16448: Add ProcessingExceptionHandler in Streams configuration
KAFKA-16448: Add ProcessingExceptionHandler interface and implementations