Skip to content

Commit

Permalink
GH-9259: Fix Reactor context propagation on reactive reply (#9284)
Browse files Browse the repository at this point in the history
Fixes: #9259

The `Mono.toFuture()` does not propagate context to thread locals of the `CompletableFuture` consumer.
See `MonoToCompletableFuture`

* Fix `AbstractMessageProducingHandler` to convert reply `Mono` to `CompletableFuture` manually.
Use `doOnEach()` and set thread locals from the Reactor context manually around `replyFuture.complete()/completeExceptionally()`
* Add respective unit test into `WebFluxObservationPropagationTests` to ensure that same trace is used in downstream endpoints after WebFlux client reply

(cherry picked from commit 4a77dbc)
  • Loading branch information
artembilan authored and spring-builds committed Jun 28, 2024
1 parent 0b07c12 commit 4ead954
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2023 the original author or authors.
* Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@
import java.util.function.BiConsumer;

import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
Expand All @@ -46,6 +47,7 @@
import org.springframework.integration.routingslip.RoutingSlipRouteStrategy;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
Expand Down Expand Up @@ -365,6 +367,7 @@ private static Publisher<?> toPublisherReply(Object reply, @Nullable ReactiveAda
}
}

@SuppressWarnings("try")
private static CompletableFuture<?> toFutureReply(Object reply, @Nullable ReactiveAdapter reactiveAdapter) {
if (reactiveAdapter != null) {
Mono<?> reactiveReply;
Expand All @@ -376,7 +379,31 @@ private static CompletableFuture<?> toFutureReply(Object reply, @Nullable Reacti
reactiveReply = Mono.from(publisher);
}

return reactiveReply.publishOn(Schedulers.boundedElastic()).toFuture();
CompletableFuture<Object> replyFuture = new CompletableFuture<>();

reactiveReply
.publishOn(Schedulers.boundedElastic())
// TODO until Reactor supports context propagation from the MonoToCompletableFuture
.doOnEach((signal) -> {
try (AutoCloseable scope = IntegrationReactiveUtils
.setThreadLocalsFromReactorContext(signal.getContextView())) {

if (signal.isOnError()) {
replyFuture.completeExceptionally(signal.getThrowable());
}
else {
replyFuture.complete(signal.get());
}

}
catch (Exception ex) {
throw Exceptions.bubble(ex);
}
})
.contextCapture()
.subscribe();

return replyFuture;
}
else {
return toCompletableFuture(reply);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,33 @@
import io.micrometer.tracing.test.simple.SpansAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.config.EnableIntegrationManagement;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.webflux.dsl.WebFlux;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.web.SpringJUnitWebConfig;
import org.springframework.test.web.reactive.server.HttpHandlerConnector;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -81,6 +88,10 @@ public class WebFluxObservationPropagationTests {
@Autowired
private PollableChannel testChannel;

@Autowired
@Qualifier("webFluxRequestReplyClientFlow.input")
private MessageChannel webFluxRequestReplyClientFlowInput;

@BeforeEach
void setup() {
SPANS.clear();
Expand Down Expand Up @@ -122,6 +133,20 @@ void observationIsPropagatedWebFluxRequestReply() {
.haveSameTraceId();
}

@Test
void observationIsPropagatedWebFluxClientRequestReply() {
String result =
new MessagingTemplate()
.convertSendAndReceive(this.webFluxRequestReplyClientFlowInput, "test", String.class);

assertThat(result).isEqualTo("SOME REPLY");

// There is a race condition when we already have a reply, but the span in the last channel is not closed yet.
await().untilAsserted(() -> assertThat(SPANS.spans()).hasSize(5));
SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).collect(Collectors.toList()))
.haveSameTraceId();
}

@Configuration
@EnableWebFlux
@EnableIntegration
Expand Down Expand Up @@ -189,6 +214,29 @@ FluxMessageChannel webFluxRequestChannel() {
return new FluxMessageChannel();
}

@Bean
IntegrationFlow webFluxRequestReplyClientFlow(ObservationRegistry registry) {
ClientHttpConnector httpConnector =
new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.OK);

Mono<DataBuffer> replyData = Mono.just(response.bufferFactory().wrap("some reply".getBytes()));

return response.writeWith(replyData)
.then(Mono.defer(response::setComplete));
});
WebClient webClient =
WebClient.builder()
.clientConnector(httpConnector)
.observationRegistry(registry)
.build();

return f -> f
.handle(WebFlux.outboundGateway(message -> "/someRequest", webClient)
.expectedResponseType(String.class))
.<String, String>transform(String::toUpperCase);
}

@Bean
IntegrationFlow webFluxRequestReplyFlow(
@Qualifier("webFluxRequestChannel") FluxMessageChannel webFluxRequestChannel) {
Expand Down

0 comments on commit 4ead954

Please sign in to comment.