Kafka Streams Dead Letter Queue in Spring Kafka 4.1
Introduction
Spring Kafka 4.1 raises the Apache Kafka baseline to 4.2.0. With this release, it introduces Spring-friendly integration for Kafka Streams KIP-1033 (processing exception handling) and KIP-1034 (dead letter queue in Kafka Streams), both originally proposed by Michelin and available respectively since Apache Kafka 3.9 and 4.2.
New Exception Handlers
Spring Kafka 4.1 now provides implementations for processing and production exception handlers:
RecoveringDeserializationExceptionHandlerRecoveringProcessingExceptionHandlerRecoveringProductionExceptionHandler
These exception handler implementations share two characteristics:
- They can leverage the Kafka Streams native dead letter queue mechanism introduced by KIP-1034.
- They follow the same recovery logic: resume the stream and forward the failed record to a dead letter queue topic if one of the available recovery strategies is enabled. Otherwise, fail the stream without forwarding any record to a dead letter queue topic.
The provided exception handler implementations can be activated using the Spring Kafka Streams properties:
spring:
kafka:
streams:
properties:
deserialization.exception.handler: "org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler"
processing.exception.handler: "org.springframework.kafka.streams.RecoveringProcessingExceptionHandler"
production.exception.handler: "org.springframework.kafka.streams.RecoveringProductionExceptionHandler"Or programmatically, using the KafkaStreamsConfiguration bean:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration configs(KafkaProperties kafkaProperties) {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildStreamsProperties());
props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, RecoveringDeserializationExceptionHandler.class);
props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, RecoveringProcessingExceptionHandler.class);
props.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, RecoveringProductionExceptionHandler.class);
return new KafkaStreamsConfiguration(props);
}Recovery Strategies
The recovery strategies are used by the exception handlers to resolve a dead letter queue topic name and resume the stream, while forwarding the failed record to that topic.
There are three recovery strategies, in order of priority:
- An implementation of the
KafkaStreamsDeadLetterDestinationResolvernew functional interface. - The
errors.dead.letter.queue.topic.nameKafka Streams property, introduced by KIP-1034. - An implementation of the
ConsumerRecordRecovererinterface, typically theDeadLetterPublishingRecovererimplementation provided.
If none of the above strategies are enabled, the exception handlers simply fail without forwarding anything to the dead letter queue.
Dead Letter Destination Resolver
The dead letter destination resolver for Kafka Streams is a new functional interface introduced in Spring Kafka 4.1. It allows developers to define a dead letter queue topic-partition logic in a Spring-friendly way:
@Bean
public KafkaStreamsDeadLetterDestinationResolver resolver() {
return (context, message, exception) -> {
if (message.value() instanceof DeliveryBooked deliveryBooked && deliveryBooked.numberOfTires() == null) {
return new TopicPartition("null-number-of-tires-dlq-topic", -1);
}
if (exception instanceof InvalidDeliveryException) {
return new TopicPartition("invalid-delivery-dlq-topic", -1);
}
if (context.processorNodeId().equals("select-key-processor")) {
return new TopicPartition("select-key-processor-dlq-topic", -1);
}
return new TopicPartition("default-dlq-topic", 0);
};
}By defining a KafkaStreamsDeadLetterDestinationResolver bean, it is possible to configure dead letter queue repartition logic based on three input parameters:
- The error handler context, which contains various information such as the topic, partition, and offset of the failed record, the ID of the processor node that triggered the exception, and the original key and value (as raw byte arrays) of the record as it was at the input of the sub-topology.
- The failed record, as it is at the input of the processor that triggered the exception.
- The exception itself.
The destination resolver can be set to one of the provided exception handlers programmatically using the KafkaStreamsConfiguration bean and the DLQ_DESTINATION_RESOLVER key:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration configs(KafkaProperties kafkaProperties, KafkaStreamsDeadLetterDestinationResolver resolver) {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildStreamsProperties());
props.put(RecoveringDeserializationExceptionHandler.DLQ_DESTINATION_RESOLVER, resolver);
props.put(RecoveringProcessingExceptionHandler.DLQ_DESTINATION_RESOLVER, resolver);
props.put(RecoveringProductionExceptionHandler.DLQ_DESTINATION_RESOLVER, resolver);
return new KafkaStreamsConfiguration(props);
}When configured for a recovering exception handler, it uses it as the primary option to route failed records to a dead letter topic using the native Kafka Streams DLQ.
Dead Letter Topic Name
The dead letter topic name is the second, more straightforward option for recovering exception handlers to route failed records to a dead letter topic.
There are two ways to define a dead letter topic name. First, by using the errors.dead.letter.queue.topic.name property (introduced by KIP-1034) directly in the Spring Kafka Streams properties:
spring:
kafka:
streams:
properties:
errors.dead.letter.queue.topic.name: "default-dlq-topic"Or in a more programmatic way, through the StreamsBuilderFactoryBean:
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return sfb -> sfb.setDeadLetterTopicName("default-dlq-topic");
}If either of these solutions is defined, all configured recovering exception handlers will automatically resume the stream by forwarding failed records to this topic.
Dead Letter Publishing Recoverer
Already available prior to Spring Kafka 4.1 and used by the deserialization exception handler, the dead letter publishing recoverer is now leveraged by all provided recovering exception handlers. If defined, it is used when neither a dead letter destination resolver nor a dead letter topic name is defined.
The dead letter publishing recoverer takes a KafkaTemplate that forwards failed records to the specified dead letter topic-partition:
@Bean
public DeadLetterPublishingRecoverer recoverer(KafkaTemplate<byte[], byte[]> kafkaTemplate) {
return new DeadLetterPublishingRecoverer(kafkaTemplate, (record, ex) -> new TopicPartition("default-dlq-topic", 0));
}The KafkaTemplate must be of type <byte[], byte[]> and use ByteArraySerializer since the raw source record is sent to the dead letter topic. The recoverer can be set to the recovering exception handlers using the KafkaStreamsConfiguration bean and the RECOVERER key:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration configs(KafkaProperties kafkaProperties, DeadLetterPublishingRecoverer recoverer) {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildStreamsProperties());
props.put(RecoveringDeserializationExceptionHandler.RECOVERER, recoverer);
props.put(RecoveringProcessingExceptionHandler.RECOVERER, recoverer);
props.put(RecoveringProductionExceptionHandler.RECOVERER, recoverer);
return new KafkaStreamsConfiguration(props);
}Dead Letter Record
All recovery options publish the source record to the dead letter topic. It corresponds to the record as it appears at the input of the sub-topology, with its original key, value and headers as they were before any transformations by the processors in the sub-topology.
The values are provided directly by the ErrorHandlerContext, given in input to the exception handlers, introduced by KIP-1033:
sourceRawKey: The unserialized raw key of the source recordsourceRawValue: The unserialized raw value of the source recordheaders: The headers of the source record
In addition, dead letter record headers are also populated with custom Spring Kafka headers:
kafka_dlt-exception-fqcn: The exception namekafka_dlt-exception-cause-fqcn: The exception cause namekafka_dlt-exception-message: The exception messagekafka_dlt-exception-stacktrace: The exception stack tracekafka_dlt-original-topic: The original source topickafka_dlt-original-partition: The original source partitionkafka_dlt-original-offset: The original source offsetkafka_dlt-original-timestamp: The original source timestampkafka_dlt-original-timestamp-type: The original timestamp type (for deserialization errors)
Code Sample
All the examples above are available in a GitHub repository:
The repository contains 4 modules, for each purpose.
- Dead Letter Destination Resolver
- Dead Letter Publishing Recoverer
- Dead Letter Topic Name
- Recovering Exception Handlers
Each module can be run using its associated Docker Compose, or directly from the IDE, with a separate Kafka server running alongside (for example, using Confluent's Docker Compose).
Kafka records can be produced in the input topic to test the dead letter queue routing using Michelin’s Kafkagen CLI and the sample records provided in the .kafkagen directory. Check the README.md file of each module for more details.
Links
Contributions:
- GH-4328: Expose native Kafka Streams DLQ configuration
- GH-4430: Wrong record content passed to
KafkaStreamsDeadLetterDestinationResolver
Spring Boot documentation: