Error handling in Kafka Consumers
A start article (and probably more will follow) for those working with spring and Kafka technologies.
Handling Error in your Consumers:
When you build your spring boot application and make use of Kafka in order to create some consumer, Spring provides on its own a listener container for asynchronous execution of POJO listeners. The provided listener container has three ways to handle a potential exception:
- Ignores it and moves to the next record.
- It can retry to process the same item from the listed topics/partitions of that listener.
- It can send the item to a dead letter topic.
If you do not set explicitly some error handler then by default any exception that is thrown from your listener method will be just ignored! This is why if your business case is that all the received items are expected to be processed successfully, then in case a non expected exception is thrown you can try to re-process this item until succeed. Otherwise you might leak of transactions without even noticing it!
How to do it? We can, override Spring Boot’s auto-configured container factory with our own and use the needed error handler:
@Bean
@Primary
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<byte[], byte[]>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<byte[], byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(60000L))); // per listener(/topic/partition)
factory.setConsumerFactory(consumerFactory());
return factory;
}Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
LOGGER.info("bootstrapServers for consumer config: {}", bootstrapServers);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-reporting-consumer");
return props;
}
Here Spring provides the whole magic with its own basic error handlers recoverers etc and in the above example I used the SeekToCurrentErrorHandler that seeks to the current offset for each topic in the remaining records. This error handler is used to rewind partitions after a message failure so that it can be replayed.
As a given parameter we gave a fixed back off instance (FixedBackOff provides a fixed interval between two attempts and a maximum number of retries) so that we retry infinite times every 1 minute. Of course you can modify accordingly with your business logic and provide even a recoverer.
Please note that if you define a number of retries and you do not specify the recoverer then after retries are exhausted, the item will be logged and ignored by letting listener continue fetching the next items.
Another thing that you should note is that in the above example I used a byte array deserializer for both key/value because I am doing the deserialization as part of the listener method. In case you want to receive already deserialized records directly in your listener then you might need to set a ErrorHandlingDeserializer for the cases where a deserialization exception occurs before Spring gets the record.