Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.util.concurrent.TimeoutException: Connection lease request time out #2492

Closed
patpatpat123 opened this issue Mar 14, 2023 · 7 comments · Fixed by #2497
Closed

java.util.concurrent.TimeoutException: Connection lease request time out #2492

patpatpat123 opened this issue Mar 14, 2023 · 7 comments · Fixed by #2497
Labels
status: feedback-provided Feedback has been provided

Comments

@patpatpat123
Copy link

Hello team,

I wanted to reach out reporting an issue 100% reproducible with reactive spring data elasticsearch please.

The setup is simple:

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.1.0-M1</version>
        <relativePath/>
    </parent>

       <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>4.0.2-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>
@Configuration
@EnableReactiveElasticsearchRepositories(Constant.BASE_PACKAGE_ELASTIC)
public class ElasticConfiguration extends ReactiveElasticsearchConfiguration {

    @Override
    public final ClientConfiguration clientConfiguration() {
        return ClientConfiguration.builder().connectedTo("1xx.xx.xx.xx:9200").build();
    }

}

@Repository
public interface ElasticRepository extends ReactiveElasticsearchRepository<MyPojo, String> {
}

@org.springframework.data.elasticsearch.annotations.Document(indexName = Constant.ELASTIC_INDEX_NAME)
public record MyPojo(String name, String protocol, @Id String clientTs) {
}

The business logic is straightforward, using Reactor-Kafka; Spring Cloud Stream Kafka Reactive, or Spring Webflux, I will receive a flux of MyPojo. My goal is just to save them inside Elastic.

The rate is quite fast, therefore, I am using the reactive repository.

When using above setup, I will get 100%:

java.lang.RuntimeException: Connection lease request time out
	at org.springframework.data.elasticsearch.client.elc.ReactiveElasticsearchTemplate.translateException(ReactiveElasticsearchTemplate.java:670)
	at reactor.core.publisher.Flux.lambda$onErrorMap$27(Flux.java:7099)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
	at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:117)
	at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:64)
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
	at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
	at co.elastic.clients.transport.rest_client.RestClientTransport$1.onFailure(RestClientTransport.java:179)
	at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onDefinitiveFailure(RestClient.java:684)
	at org.elasticsearch.client.RestClient$1.failed(RestClient.java:422)
	at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137)
	at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.executionFailed(DefaultClientExchangeHandlerImpl.java:101)
	at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:432)
	at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.connectionRequestFailed(AbstractClientExchangeHandler.java:352)
	at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.access$100(AbstractClientExchangeHandler.java:64)
	at org.apache.http.impl.nio.client.AbstractClientExchangeHandler$1.failed(AbstractClientExchangeHandler.java:396)
	at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137)
	at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager$1.failed(PoolingNHttpClientConnectionManager.java:318)
	at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137)
	at org.apache.http.nio.pool.AbstractNIOConnPool.fireCallbacks(AbstractNIOConnPool.java:501)
	at org.apache.http.nio.pool.AbstractNIOConnPool.release(AbstractNIOConnPool.java:360)
	at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.releaseConnection(PoolingNHttpClientConnectionManager.java:393)
	at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.releaseConnection(AbstractClientExchangeHandler.java:247)
	at org.apache.http.impl.nio.client.MainClientExec.responseCompleted(MainClientExec.java:387)
	at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:173)
	at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
	at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
	at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
	at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:87)
	at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:40)
	at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:121)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
	at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.util.concurrent.TimeoutException: Connection lease request time out
	at org.apache.http.nio.pool.AbstractNIOConnPool.processPendingRequest(AbstractNIOConnPool.java:411)
	at org.apache.http.nio.pool.AbstractNIOConnPool.processNextPendingRequest(AbstractNIOConnPool.java:391)
	at org.apache.http.nio.pool.AbstractNIOConnPool.release(AbstractNIOConnPool.java:355)
	... 17 common frames omitted

The thing is:
I tried ``` .withConnectTimeout() .withSocketTimeout()`` set to some crazy value, still getting this error.

I tried switching to non reactive, and it works.

Finally, I will see this stack trace after processing many messages. Yet, I will see only 1 Hit in Elasticsearch.

May I ask what is the root cause of this issue please?

Thank you

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Mar 14, 2023
@sothawo
Copy link
Collaborator

sothawo commented Mar 14, 2023

Can you provide a compilable/runnable project?

With what call are you saving the objects?

@sothawo sothawo added status: waiting-for-feedback We need additional information before we can continue and removed status: waiting-for-triage An issue we've not yet triaged labels Mar 14, 2023
@patpatpat123
Copy link
Author

Hello @sothawo ,

Thank you for taking a look at this.
Attached to this is a zip with a minimal reproducible example.
I ran it on different machines, always reproducible.
I ran it against Elastic version 8.6.2 and 7+, always reproducible.
Could you please reproduce on your side?

timeout.zip

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Mar 15, 2023
@sothawo
Copy link
Collaborator

sothawo commented Mar 15, 2023

Ok, that's a nice and interesting problem.

First let me explain how the ReactiveElasticsearchTemplate - the default implementation of ReactiveElasticsearchOperations - principally works: It creates the request object and then uses from the Elasticsearch library the Transport class (the same that the imperative ElasticsearchClient provided by Elasticsearch uses), but our reactive client uses the ...Async() methods. In the case of storing an entity this is done in this function:

public <T> Mono<IndexResponse> index(IndexRequest<T> request) {
    Assert.notNull(request, "request must not be null");
        return Mono.fromFuture(transport.performRequestAsync(request, IndexRequest._ENDPOINT, transportOptions));
}

Whatever is done in this call is outside of Spring Data Elasticsearch . These async methods return a CompletableFuture that Spring Data Elasticsearch wraps in Mono.

When looking at your code:

public Function<Flux<String>, Flux<MyPojo>> consume() {
    return messages -> messages.flatMapSequential(one -> {
        return myElasticRepository.save(new MyPojo(one.substring(0, 3), one.substring(0, 3), one.substring(0, 3)));
    });
}

you take every single message, cobnvert that to an entity object and send that off to Elasticsearch, then returning the saved entity in a flux.

First, this is extremely inefficient, as you produce a separate network call for every single object.
And second, the network client that is used by Elasticsearch - looking at the stacktrace this is Apache Http client - is flooded or better even drowned by this many requests. Because of the asynchronous calls, the request must be sent off, and the the data connection to Elasticsearch must be parked somewhere until the answer comes in. So there is some pool used for this and the exception you see, is what you get when a new connection is requested, but cannot be acquired in time. So this is no timeout to set on the connection o somewhere else, this is an internal timeout when trying to get a connection from the pool.

To reproduce this, I used a sample application using a local Elasticsearch provided by docker, I did not bother to set up a Kafka. My test entity is this:

@Document(indexName = "foo")
public class Foo {
	@Id
	@Nullable
	private String id;
	@Nullable
	private String text;

	public static Foo of(int id) {
		var foo = new Foo();
		foo.setId("" + id);
		foo.setText("text-" + id);
		return foo;
	}

	@Nullable
	public String getId() {
		return id;
	}

	public void setId(@Nullable String id) {
		this.id = id;
	}

	@Nullable
	public String getText() {
		return text;
	}

	public void setText(@Nullable String text) {
		this.text = text;
	}
}

The repository is the same as in your case.

A simple test method to reproduce this error:

public Flux<Foo> test1() {
	return Flux.range(1, 1_000_000)
		.map(Foo::of)
		.flatMapSequential(repository::save);
}

So, what to do? The first step is to group the incoming request into batches of lets say 1000 entities, waiting at most one second:

public Flux<Foo> test1() {
	return Flux.range(1, 1_000_000)
		.map(Foo::of)
		.bufferTimeout(1000, Duration.ofSeconds(1))
		.flatMapSequential(repository::saveAll);
}

Nice try, but same error, but only later. Pushing in a million objects still leads to 1000 calls which is too much.

We need to add backpressure handling and this requires us to write quite some more code (we ignore returning a flux of entites here):

public Flux<Foo> test2() {
	Flux.range(1, 1_000_000)
		.map(Foo::of)
		.bufferTimeout(1000, Duration.ofSeconds(1))
		.subscribe(new Subscriber<>() {
			private Subscription subscription;

			@Override
			public void onSubscribe(Subscription subscription) {
				this.subscription = subscription;
				subscription.request(1);
			}

			@Override
			public void onNext(List<Foo> foos) {
				repository.saveAll(foos)
					.doOnComplete(() -> {
						subscription.request(1);
					})
					.subscribe();
			}

			@Override
			public void onError(Throwable throwable) {
				throwable.printStackTrace();
				subscription.cancel();
				throw new RuntimeException("oops, something went wrong", throwable);
			}

			@Override
			public void onComplete() {
			}
		});
	return Flux.empty();
}

We are adding backpressure handling here: We still batch the incoming objects into groups of 1000, but only request the first of that. When we get it, we store it in Elasticsearch , when that's done, we request the next batch and so on.

As a final step we provide the saved entities as a Flux to the caller, this requires a slightly modified handling at the end of the upstream flux:

public Flux<Foo> test3() {
	Sinks.Many<Foo> sink = Sinks.many().unicast().onBackpressureBuffer();

	Flux.range(1, 1_000_000)
		.map(Foo::of)
		.bufferTimeout(1000, Duration.ofSeconds(1))
		.subscribe(new Subscriber<>() {
			private Subscription subscription;
			volatile boolean upstreamComplete = false;

			@Override
			public void onSubscribe(Subscription subscription) {
				this.subscription = subscription;
				subscription.request(1);
			}

			@Override
			public void onNext(List<Foo> foos) {
				repository.saveAll(foos)
					.map(sink::tryEmitNext)
					.doOnComplete(() -> {
						if (!upstreamComplete) {
							subscription.request(1);
						} else {
							sink.tryEmitComplete();
						}
					})
					.subscribe();
			}

			@Override
			public void onError(Throwable throwable) {
				throwable.printStackTrace();
				subscription.cancel();
				sink.tryEmitError(throwable);
			}

			@Override
			public void onComplete() {
				upstreamComplete = true;
			}
		});

	return sink.asFlux();
}

I could insert 1 million docs into my local dockered Elasticsearch with that in 2.5 minutes (10 year old iMac) with no errors.

So no error in Spring Data Elasticsearch, but need of special reactive code.

@sothawo sothawo added status: waiting-for-feedback We need additional information before we can continue and removed status: feedback-provided Feedback has been provided labels Mar 15, 2023
@patpatpat123
Copy link
Author

Thank you @sothawo for the nice writing.

This worries me a little bit unfortunately. I was hoping I did something wrong configuring the repository or something like that.

I did another test, same code:

public Function<Flux<String>, Flux<MyPojo>> consume() {
    return messages -> messages.flatMapSequential(one -> {
        return myElasticRepository.save(new MyPojo(one.substring(0, 3), one.substring(0, 3), one.substring(0, 3)));
    });
}

Just replaced the reactive elastic repository with 1) a reactive cassandra repository 2) a reactive mongodb repository.

Exact same code, just interchanging the reactive repository. For both cassandra and mongo, I could insert the millions of entities with little to no cpu/memory footprint within no time.

I believe saving a flux of "something" inside elasticseach should be a common use case.

(I searched for tutorials online, and it seems a basic webflux app to save a flux also uses the same approach. does it mean many are encountering the same issue?)

Is there a better way to do this without all the boiler plate code?

Can I change my Configuration class (maybe changing the template, the operation, the restclient, else) in order to resolve this issue?

Our use case is quite simple, just saving a flux of "something" "somewhere". This somewhere being cassandra, mongo or other reactive databases. It seems we have to add this custom code on the save part just for elasticsearch. So I was wondering if I could get your help pointing out if there are other alternatives.

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Mar 15, 2023
@patpatpat123
Copy link
Author

Update: I tried again this time with myElasticRepository.saveAll(theFlux) (the flux being the flux coming from Kafka).

And again, unfortunately, same issue, while the mongo or elastic repository would save everything fine.

I can hardly believe I am the first to save a flux using spring data Elasticsearch reactive.

What am I doing wrong please?

Many thanks

@sothawo
Copy link
Collaborator

sothawo commented Mar 18, 2023

Comparing with Mongo or Cassandra makes no sense, Mongo has a client that communicates with the server using a native protocol, whereas Spring Data Elasticsearch needs to use the transport provided by Elasticsearch.

So as explained before, by sending every item as a separate request - like you did first - results in a HTTP request sent to the server, and that is not handled by the transport provided by Elasticsearch.

As for the saveAll(Publisher<T>) method: This indeed need a rewrite, I'll address that with #2496.

sothawo added a commit to sothawo/spring-data-elasticsearch that referenced this issue Mar 19, 2023
sothawo added a commit that referenced this issue Mar 19, 2023
Original Pull Request #2497
Closes #2496
Closes #2492
@patpatpat123
Copy link
Author

Many thanks @sothawo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: feedback-provided Feedback has been provided
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants