Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The sequence for acquiring the connection was not cancelled after the request ended. #3531

Closed
qnnn opened this issue Dec 4, 2024 · 6 comments
Assignees
Labels
status/invalid We don't feel this issue is valid

Comments

@qnnn
Copy link

qnnn commented Dec 4, 2024

Expected Behavior

The sequence for acquiring the connection will be cancelled after the request ends.

Actual Behavior

The sequence for acquiring the connection was not cancelled after the request ended.

Steps to Reproduce

I recently encountered a small issue while using Spring Cloud Gateway. After the request reaches the responseTimeout value and the response is sent back (due to a connection timeout), the program continues to retry connecting to other DNS-resolved addresses. (For example, responseTimeout = 8s, connectionTimeout = 5s, and there are three addresses in the DNS resolution result.)

It seems that after the outer sequence is cancelled, the inner sequence responsible for acquiring the connection is not able to detect this cancellation. Does this behavior align with the phenomenon simulated in the following flow? (I’m just trying to learn.)

public static void main(String[] args) throws InterruptedException {
	execute();
	Thread.sleep(20000);
}

public static void execute() {
	// Similar to PooledConnectionAllocator#connectChannel
	Mono.<String>create(sink -> acquireConnection(sink))
			// Simulate request
			.flatMap(connection -> Mono.just("send request and receive response"))
			// Set response timeout to 8 seconds
			.timeout(Duration.ofSeconds(8), Mono.just("read timeout!"))
			.subscribe(System.out::println);
}

public static void acquireConnection(MonoSink<String> sink) {
	// Similar to TransportConnector#connect
	Mono.just("acquire connection")
			// Set connection timeout to 5 seconds
			.delayElement(Duration.ofSeconds(5))
			.doOnNext(connection -> {
				System.out.println("connection timeout!");
				throw new RuntimeException();
			})
			.onErrorResume(Exception.class, e ->
					Mono.defer(() -> Mono.just("acquire connection")
									.delayElement(Duration.ofSeconds(5))
									.doOnNext(connection -> {
										System.out.println("connection timeout!");
										throw new RuntimeException();
									})
							)
							// Retry connecting other DNS-resolved addresses.
							.retryWhen(RetrySpec.max(2))
			)
			.subscribe(connection -> sink.success(connection));
}

Possible Solution

Your Environment

  • Reactor version(s) used: 1.1.20
  • JVM version: 17
@qnnn qnnn added status/need-triage A new issue that still need to be evaluated as a whole type/bug A general bug labels Dec 4, 2024
@violetagg violetagg removed the status/need-triage A new issue that still need to be evaluated as a whole label Dec 4, 2024
@violetagg violetagg self-assigned this Dec 4, 2024
@qnnn
Copy link
Author

qnnn commented Dec 5, 2024

Perhaps we could register a cancel callback using sink.onCancel(Subscription::cancel) during onSubscribe?

@violetagg
Copy link
Member

@qnnn I'll be working on this. It may need fixes in reactor pool.

@violetagg
Copy link
Member

Perhaps we could register a cancel callback using sink.onCancel(Subscription::cancel) during onSubscribe?

We do register cancel callback, see

public void onSubscribe(Subscription s) {
if (Operators.validate(subscription, s)) {
this.subscription = s;
cancellations.add(this);
if (!retried) {
sink.onCancel(cancellations);
}
s.request(Long.MAX_VALUE);
}
}

@qnnn
Copy link
Author

qnnn commented Jan 7, 2025

Perhaps we could register a cancel callback using sink.onCancel(Subscription::cancel) during onSubscribe?

We do register cancel callback, see

public void onSubscribe(Subscription s) {
if (Operators.validate(subscription, s)) {
this.subscription = s;
cancellations.add(this);
if (!retried) {
sink.onCancel(cancellations);
}
s.request(Long.MAX_VALUE);
}
}

@violetagg Thank you for your reply. I have also noticed the reactor-pool project and have attempted to submit a PR #256. Apart from the callback mentioned in your reply, do we need to register a callback for PooledConnectionInitializer and ClientTransportSubscriber as well? In the version I tried to modify, it seems that if callbacks are not registered for them, acquiring a connection cannot be canceled either.

@violetagg
Copy link
Member

violetagg commented Jan 7, 2025

@violetagg
Copy link
Member

@qnnn I'm closing this issue. Once an acquisition is started we do not want to stop the connection establishment, so that the connection (if established) will stay in the pool and reused.

@violetagg violetagg closed this as not planned Won't fix, can't repro, duplicate, stale Jan 13, 2025
@violetagg violetagg added status/invalid We don't feel this issue is valid and removed type/bug A general bug labels Jan 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/invalid We don't feel this issue is valid
Projects
None yet
Development

No branches or pull requests

2 participants