Multiple Kafka Stream Configurations in a Spring Boot Application

Spring Boot for Kafka provides the default configuration for Kafka streams and an annotation named “EnableKafkaStreams” to enable those Kafka streams default components.
In detail, there is a default config class “KafkaStreamsDefaultConfiguration” which is automatically imported when using the “EnableKafkaStreams” annotation (You can check the EnableKafkaStreams Javadoc for complete usage). This config class (“KafkaStreamsDefaultConfiguration”) registers a StreamsBuilderFactoryBean if a org.apache.kafka.streams.StreamsConfig with the name KafkaStreamsDefaultConfiguration# DEFAULT_STREAMS_CONFIG_BEAN_NAME is present in the application context. Otherwise an UnsatisfiedDependencyException} is thrown.

As you understand this is really useful for cases where as part of our application we use only one kStream or multiple but with the same configuration. Nevertheless when you need multiple kStreams in the same application with different configurations you would need to apply your own config and not rely on the default one.
For example a very important configuration that is needed when defining a kStream (as well as Kafka consumers) is the consumer group. For kStreams the “application.id” acts also as the consumer group name for the stream and it is needed in case we would like to point our stream application at an earlier or later offset (More details can be found here manage-consumer-groups)

Assuming you would like to have two kStreams with two different configs:
1) Do not use the “EnableKafkaStreams” annotation which only creates one factory bean (from the default configuration bean).
2) Specify your own stream builder beans

/**
* Configuration for Kafka Streams.
*/
@Configuration
public class KafkaStreamsConfig {
public static final String APP1_STREAMS_BUILDER_BEAN_NAME = "app1KafkaStreamsBuilder";
public static final String APP2_STREAMS_BUILDER_BEAN_NAME = "app2KafkaStreamsBuilder";
@Bean(name = APP1_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean app1KafkaStreamsBuilder() {
Map<String, Object> app1StreamsConfigProperties = commonStreamsConfigProperties();
app1StreamsConfigProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, app1ConsumerGroupName);
return new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(app1StreamsConfigProperties));
}
@Bean(name = APP2_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean app2KafkaStreamsBuilder() {
Map<String, Object> app2StreamsConfigProperties = commonStreamsConfigProperties();
app2StreamsConfigProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, app2ConsumerGroupName);
return new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(app2StreamsConfigProperties));
}
private Map<String, Object> commonStreamsConfigProperties() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, DefaultProductionExceptionHandler.class.getName());
return props;
}
}

3) Inject the above two stream builder beans (“app1KafkaStreamsBuilder”, “app2KafkaStreamsBuilder”) respectively to the components that create your desired Kafka stream topology.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Nick Drakopoulos

Nick Drakopoulos

Founder | Senior Software Engineer at NCODIFY : An innovative Tech Professional Services company specializing in Software Development.