Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The sequence for acquiring the connection was not cancelled after the request ended. #3531

Open
qnnn opened this issue Dec 4, 2024 · 1 comment
Assignees
Labels
type/bug A general bug

Comments

@qnnn
Copy link

qnnn commented Dec 4, 2024

Expected Behavior

The sequence for acquiring the connection will be cancelled after the request ends.

Actual Behavior

The sequence for acquiring the connection was not cancelled after the request ended.

Steps to Reproduce

I recently encountered a small issue while using Spring Cloud Gateway. After the request reaches the responseTimeout value and the response is sent back (due to a connection timeout), the program continues to retry connecting to other DNS-resolved addresses. (For example, responseTimeout = 8s, connectionTimeout = 5s, and there are three addresses in the DNS resolution result.)

It seems that after the outer sequence is cancelled, the inner sequence responsible for acquiring the connection is not able to detect this cancellation. Does this behavior align with the phenomenon simulated in the following flow? (I’m just trying to learn.)

public static void main(String[] args) throws InterruptedException {
	execute();
	Thread.sleep(20000);
}

public static void execute() {
	// Similar to PooledConnectionAllocator#connectChannel
	Mono.<String>create(sink -> acquireConnection(sink))
			// Simulate request
			.flatMap(connection -> Mono.just("send request and receive response"))
			// Set response timeout to 8 seconds
			.timeout(Duration.ofSeconds(8), Mono.just("read timeout!"))
			.subscribe(System.out::println);
}

public static void acquireConnection(MonoSink<String> sink) {
	// Similar to TransportConnector#connect
	Mono.just("acquire connection")
			// Set connection timeout to 5 seconds
			.delayElement(Duration.ofSeconds(5))
			.doOnNext(connection -> {
				System.out.println("connection timeout!");
				throw new RuntimeException();
			})
			.onErrorResume(Exception.class, e ->
					Mono.defer(() -> Mono.just("acquire connection")
									.delayElement(Duration.ofSeconds(5))
									.doOnNext(connection -> {
										System.out.println("connection timeout!");
										throw new RuntimeException();
									})
							)
							// Retry connecting other DNS-resolved addresses.
							.retryWhen(RetrySpec.max(2))
			)
			.subscribe(connection -> sink.success(connection));
}

Possible Solution

Your Environment

  • Reactor version(s) used: 1.1.20
  • JVM version: 17
@qnnn qnnn added status/need-triage A new issue that still need to be evaluated as a whole type/bug A general bug labels Dec 4, 2024
@violetagg violetagg removed the status/need-triage A new issue that still need to be evaluated as a whole label Dec 4, 2024
@violetagg violetagg self-assigned this Dec 4, 2024
@qnnn
Copy link
Author

qnnn commented Dec 5, 2024

Perhaps we could register a cancel callback using sink.onCancel(Subscription::cancel) during onSubscribe?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

No branches or pull requests

2 participants