From b0da8321a304cff1c43926870e60eec663eac350 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 23 Dec 2024 14:15:07 -0500 Subject: [PATCH] GH-3690: Fix observation leak in the `KafkaMessageListenerContainer` Fixes: https://github.com/spring-projects/spring-kafka/issues/3690 When `this.listener` is an instance of `RecordMessagingMessageListenerAdapter`, we rely on its logic to call `invoke()` from super class to handle observation lifecycle this or other way. However, Spring Integration's `KafkaMessageDrivenChannelAdapter` use its own `IntegrationRecordMessageListener` extension of the `RecordMessagingMessageListenerAdapter` without calling super `invoke()`. The problem apparent from Spring Cloud Stream Kafka Binder, where an observation is enabled. * Fix `KafkaMessageListenerContainer` to check for exact type of `this.listener` before making decision to close an observation here, or propagate it down to the `RecordMessagingMessageListenerAdapter` --- .../listener/KafkaMessageListenerContainer.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 09947227b..fe18836a6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -966,8 +966,8 @@ else if (listener instanceof MessageListener) { this.wasIdlePartition = new HashMap<>(); this.kafkaAdmin = obtainAdmin(); - if (this.listener instanceof RecordMessagingMessageListenerAdapter rmmla) { - rmmla.setObservationRegistry(observationRegistry); + if (isListenerAdapterObservationAware()) { + ((RecordMessagingMessageListenerAdapter) this.listener).setObservationRegistry(observationRegistry); } } @@ -1228,6 +1228,10 @@ else if (timeout instanceof String str) { } } + private boolean isListenerAdapterObservationAware() { + return this.listener != null && RecordMessagingMessageListenerAdapter.class.equals(this.listener.getClass()); + } + private void subscribeOrAssignTopics(final Consumer subscribingConsumer) { if (KafkaMessageListenerContainer.this.topicPartitions == null) { ConsumerRebalanceListener rebalanceListener = new ListenerConsumerRebalanceListener(); @@ -2772,7 +2776,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco catch (RuntimeException e) { failureTimer(sample, cRecord); recordInterceptAfter(cRecord, e); - if (!(this.listener instanceof RecordMessagingMessageListenerAdapter)) { + if (!isListenerAdapterObservationAware()) { observation.error(e); } if (this.commonErrorHandler == null) { @@ -2800,7 +2804,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco } } finally { - if (!(this.listener instanceof RecordMessagingMessageListenerAdapter)) { + if (!isListenerAdapterObservationAware()) { observation.stop(); } observationScope.close();