Kafka Streams Dead Letter Queue in Spring Kafka 4.1

Kafka Streams Dead Letter Queue in Spring Kafka 4.1

Introduction

Spring Kafka 4.1 raises the Apache Kafka baseline to 4.2.0. With this release, it introduces Spring-friendly integration for Kafka Streams KIP-1033 (processing exception handling) and KIP-1034 (dead letter queue in Kafka Streams), both originally proposed by Michelin and available respectively since Apache Kafka 3.9 and 4.2.

New Exception Handlers

Spring Kafka 4.1 now provides implementations for processing and production exception handlers:

  • RecoveringDeserializationExceptionHandler
  • RecoveringProcessingExceptionHandler
  • RecoveringProductionExceptionHandler

These exception handler implementations share two characteristics:

  • They can leverage the Kafka Streams native dead letter queue mechanism introduced by KIP-1034.
  • They follow the same recovery logic: resume the stream and forward the failed record to a dead letter queue topic if one of the available recovery strategies is enabled. Otherwise, fail the stream without forwarding any record to a dead letter queue topic.

The provided exception handler implementations can be activated using the Spring Kafka Streams properties:

spring:
  kafka:
    streams:
      properties:
        deserialization.exception.handler: "org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler"
        processing.exception.handler: "org.springframework.kafka.streams.RecoveringProcessingExceptionHandler"
        production.exception.handler: "org.springframework.kafka.streams.RecoveringProductionExceptionHandler"

Or programmatically, using the KafkaStreamsConfiguration bean:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration configs(KafkaProperties kafkaProperties) {
    Map<String, Object> props = new HashMap<>(kafkaProperties.buildStreamsProperties());
    props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, RecoveringDeserializationExceptionHandler.class);
    props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, RecoveringProcessingExceptionHandler.class);
    props.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, RecoveringProductionExceptionHandler.class);
    return new KafkaStreamsConfiguration(props);
}

Recovery Strategies

The recovery strategies are used by the exception handlers to resolve a dead letter queue topic name and resume the stream, while forwarding the failed record to that topic.

There are three recovery strategies, in order of priority:

  • An implementation of the KafkaStreamsDeadLetterDestinationResolver new functional interface.
  • The errors.dead.letter.queue.topic.name Kafka Streams property, introduced by KIP-1034.
  • An implementation of the ConsumerRecordRecoverer interface, typically the DeadLetterPublishingRecoverer implementation provided.

If none of the above strategies are enabled, the exception handlers simply fail without forwarding anything to the dead letter queue.

Dead Letter Destination Resolver

The dead letter destination resolver for Kafka Streams is a new functional interface introduced in Spring Kafka 4.1. It allows developers to define a dead letter queue topic-partition logic in a Spring-friendly way:

@Bean
public KafkaStreamsDeadLetterDestinationResolver resolver() {
    return (context, message, exception) -> {
        if (message.value() instanceof DeliveryBooked deliveryBooked && deliveryBooked.numberOfTires() == null) {
            return new TopicPartition("null-number-of-tires-dlq-topic", -1);
        }
    
        if (exception instanceof InvalidDeliveryException) {
            return new TopicPartition("invalid-delivery-dlq-topic", -1);
        }
    
        if (context.processorNodeId().equals("select-key-processor")) {
            return new TopicPartition("select-key-processor-dlq-topic", -1);
        }
    
        return new TopicPartition("default-dlq-topic", 0);
    };
}

By defining a KafkaStreamsDeadLetterDestinationResolver bean, it is possible to configure dead letter queue repartition logic based on three input parameters:

  • The error handler context, which contains various information such as the topic, partition, and offset of the failed record, the ID of the processor node that triggered the exception, and the original key and value (as raw byte arrays) of the record as it was at the input of the sub-topology.
  • The failed record, as it is at the input of the processor that triggered the exception.
  • The exception itself.

The destination resolver can be set to one of the provided exception handlers programmatically using the KafkaStreamsConfiguration bean and the DLQ_DESTINATION_RESOLVER key:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration configs(KafkaProperties kafkaProperties, KafkaStreamsDeadLetterDestinationResolver resolver) {
    Map<String, Object> props = new HashMap<>(kafkaProperties.buildStreamsProperties());
    props.put(RecoveringDeserializationExceptionHandler.DLQ_DESTINATION_RESOLVER, resolver);
    props.put(RecoveringProcessingExceptionHandler.DLQ_DESTINATION_RESOLVER, resolver);
    props.put(RecoveringProductionExceptionHandler.DLQ_DESTINATION_RESOLVER, resolver);
    return new KafkaStreamsConfiguration(props);
}

When configured for a recovering exception handler, it uses it as the primary option to route failed records to a dead letter topic using the native Kafka Streams DLQ.

Dead Letter Topic Name

The dead letter topic name is the second, more straightforward option for recovering exception handlers to route failed records to a dead letter topic.

There are two ways to define a dead letter topic name. First, by using the errors.dead.letter.queue.topic.name property (introduced by KIP-1034) directly in the Spring Kafka Streams properties:

spring:
  kafka:
    streams:
      properties:
        errors.dead.letter.queue.topic.name: "default-dlq-topic"

Or in a more programmatic way, through the StreamsBuilderFactoryBean:

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return sfb -> sfb.setDeadLetterTopicName("default-dlq-topic");
}

If either of these solutions is defined, all configured recovering exception handlers will automatically resume the stream by forwarding failed records to this topic.

Dead Letter Publishing Recoverer

Already available prior to Spring Kafka 4.1 and used by the deserialization exception handler, the dead letter publishing recoverer is now leveraged by all provided recovering exception handlers. If defined, it is used when neither a dead letter destination resolver nor a dead letter topic name is defined.

The dead letter publishing recoverer takes a KafkaTemplate that forwards failed records to the specified dead letter topic-partition:

@Bean
public DeadLetterPublishingRecoverer recoverer(KafkaTemplate<byte[], byte[]> kafkaTemplate) {
    return new DeadLetterPublishingRecoverer(kafkaTemplate, (record, ex) -> new TopicPartition("default-dlq-topic", 0));
}

The KafkaTemplate must be of type <byte[], byte[]> and use ByteArraySerializer since the raw source record is sent to the dead letter topic. The recoverer can be set to the recovering exception handlers using the KafkaStreamsConfiguration bean and the RECOVERER key:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration configs(KafkaProperties kafkaProperties, DeadLetterPublishingRecoverer recoverer) {
    Map<String, Object> props = new HashMap<>(kafkaProperties.buildStreamsProperties());
    props.put(RecoveringDeserializationExceptionHandler.RECOVERER, recoverer);
    props.put(RecoveringProcessingExceptionHandler.RECOVERER, recoverer);
    props.put(RecoveringProductionExceptionHandler.RECOVERER, recoverer);
    return new KafkaStreamsConfiguration(props);
}

Dead Letter Record

All recovery options publish the source record to the dead letter topic. It corresponds to the record as it appears at the input of the sub-topology, with its original key, value and headers as they were before any transformations by the processors in the sub-topology.

The values are provided directly by the ErrorHandlerContext, given in input to the exception handlers, introduced by KIP-1033:

In addition, dead letter record headers are also populated with custom Spring Kafka headers:

  • kafka_dlt-exception-fqcn: The exception name
  • kafka_dlt-exception-cause-fqcn: The exception cause name
  • kafka_dlt-exception-message: The exception message
  • kafka_dlt-exception-stacktrace: The exception stack trace
  • kafka_dlt-original-topic: The original source topic
  • kafka_dlt-original-partition: The original source partition
  • kafka_dlt-original-offset: The original source offset
  • kafka_dlt-original-timestamp: The original source timestamp
  • kafka_dlt-original-timestamp-type: The original timestamp type (for deserialization errors)

Code Sample

All the examples above are available in a GitHub repository:

GitHub - michelin/spring-kafka-streams-dead-letter-queue: Code sample for Spring-friendly Kafka Streams Dead Letter Queue (KIP-1034) in Spring Kafka.
Code sample for Spring-friendly Kafka Streams Dead Letter Queue (KIP-1034) in Spring Kafka. - michelin/spring-kafka-streams-dead-letter-queue

The repository contains 4 modules, for each purpose.

  • Dead Letter Destination Resolver
  • Dead Letter Publishing Recoverer
  • Dead Letter Topic Name
  • Recovering Exception Handlers

Each module can be run using its associated Docker Compose, or directly from the IDE, with a separate Kafka server running alongside (for example, using Confluent's Docker Compose).

Kafka records can be produced in the input topic to test the dead letter queue routing using Michelin’s Kafkagen CLI and the sample records provided in the .kafkagen directory. Check the README.md file of each module for more details.

Contributions:

Spring Boot documentation: