Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot invoke "MultiSubscriber.onCompletion()" because "this.downstream" is null #705

Closed
indalaterre opened this issue Oct 6, 2021 · 6 comments · Fixed by #712
Closed
Assignees
Labels
bug Something isn't working
Milestone

Comments

@indalaterre
Copy link
Contributor

indalaterre commented Oct 6, 2021

Hello, I upgraded Quarkus version from 1.12.2.Final to 2.3.0.Final (Mutiny 1.0.0) and now I'm getting a NPE when executing this Mutiny stream.

This happens equally if I use transformToUniAndConcatenate or call operators

Multi<Item> searchResults =
     super.search(paginationFilter, filters)
         .onItem()
         .call(item -> dataRepository.getPrice(item, filters))

Implementation of search from Elasticsearch

this.getClient().searchAsync(searchRequest, RequestOptions.DEFAULT)
	.onFailure().invoke(exception -> log.error("unable to query", exception))
	.map(SearchResponse::getHits)
	.map(SearchHits::getHits)
	.onItem().transformToMulti(hits -> Multi.createFrom().items(hits))
	.map(hit -> {
		Map<String, Object> map = hit.getSourceAsMap();
		return getJsonUtils().fromMap(map, getEntityClass());
	});

Implementation of getPrice to MongoDB

Item externalItem
return super.find(collection, query, null, null)
	.group().intoLists().of(2)
	.map(item -> addPrice(item, externalItem))
	.onCompletion().ifEmpty().continueWith(externalItem)
	.toUni();
@jponge
Copy link
Member

jponge commented Oct 6, 2021

From the Zulip conversation, the stack trace was:

2021-10-04 21:53:13,615 ERROR [io.qua.ver.cor.run.VertxCoreRecorder] (vert.x-eventloop-thread-19) Uncaught exception received by Vert.x: java.lang.NullPointerException: Cannot invoke "io.smallrye.mutiny.subscription.MultiSubscriber.onCompletion()" because "this.downstream" is null
        at io.smallrye.mutiny.operators.multi.MultiBufferOp$BufferExactProcessor.onCompletion(MultiBufferOp.java:119)
        at io.smallrye.mutiny.operators.multi.MultiEmitOnOp$MultiEmitOnProcessor.isDoneOrCancelled(MultiEmitOnOp.java:248)
        at io.smallrye.mutiny.operators.multi.MultiEmitOnOp$MultiEmitOnProcessor.run(MultiEmitOnOp.java:188)
        at io.quarkus.mongodb.impl.Wrappers.lambda$toMulti$2(Wrappers.java:30)
        at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:100)
        at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:63)
        at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:38)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
        at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:293)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)

@jponge
Copy link
Member

jponge commented Oct 6, 2021

I need to reproduce this in a pure Mutiny test, but it's likely due to cancellation being tracked through both the upstream subscription and the cancellation request, depending where we are in the code.

@jponge jponge added the bug Something isn't working label Oct 6, 2021
@jponge jponge self-assigned this Oct 6, 2021
@jponge
Copy link
Member

jponge commented Oct 6, 2021

It's indeed what I thought, given this reproducer:

        AssertSubscriber<List<Integer>> sub = AssertSubscriber.create();
        AtomicInteger counter = new AtomicInteger();

        Multi.createFrom().range(1, 1000)
                .emitOn(Infrastructure.getDefaultExecutor())
                .group().intoLists().of(100)
                .onItem().invoke(() -> {
                    if (counter.incrementAndGet() == 9) {
                        sub.cancel();
                    }
                })
                .runSubscriptionOn(Infrastructure.getDefaultExecutor())
                .subscribe().withSubscriber(sub);

        sub.request(Long.MAX_VALUE);
        await().atMost(5, TimeUnit.SECONDS).untilAtomic(counter, equalTo(9));

I get a similar stack trace that confirms my intuition:

Exception in thread "pool-1-thread-2" java.lang.NullPointerException: Cannot invoke "io.smallrye.mutiny.subscription.MultiSubscriber.onItem(Object)" because "this.downstream" is null
	at io.smallrye.mutiny.operators.multi.MultiSubscribeOnOp$SubscribeOnProcessor.onItem(MultiSubscribeOnOp.java:88)
	at io.smallrye.mutiny.operators.multi.MultiOnItemInvoke$MultiOnItemInvokeProcessor.onItem(MultiOnItemInvoke.java:41)
	at io.smallrye.mutiny.operators.multi.MultiBufferOp$BufferExactProcessor.onItem(MultiBufferOp.java:107)
	at io.smallrye.mutiny.operators.multi.MultiEmitOnOp$MultiEmitOnProcessor.run(MultiEmitOnOp.java:199)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

@jponge jponge added this to the 1.1.1 milestone Oct 6, 2021
jponge added a commit that referenced this issue Oct 7, 2021
We used to have an interpretation of the RS TCK that it had to be null on cancellation to release the subscriber.
It's actually not necessary (and NPE-prone) since operators are instantiated per-subscription, so the *publisher* does not actually keep references on cancelled subscribers.

Fixes #705
@jponge
Copy link
Member

jponge commented Oct 7, 2021

@indalaterre May I ask you to build locally the code in #712?

It should look something like:

git clone https://github.com/smallrye/smallrye-mutiny.git
cd smallrye-mutiny
git checkout -b fix/705 origin/fix/705
mvn install -DskipTests

Then in your Maven project you should manually add Mutiny as:

<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny</artifactId>
<version>999-SNAPSHOT</version>

🙏

jponge added a commit that referenced this issue Oct 7, 2021
We used to have an interpretation of the RS TCK that it had to be null on cancellation to release the subscriber.
It's actually not necessary (and NPE-prone) since operators are instantiated per-subscription, so the *publisher* does not actually keep references on cancelled subscribers.

Fixes #705
jponge added a commit that referenced this issue Oct 7, 2021
We used to have an interpretation of the RS TCK that it had to be null on cancellation to release the subscriber.
It's actually not necessary (and NPE-prone) since operators are instantiated per-subscription, so the *publisher* does not actually keep references on cancelled subscribers.

Fixes #705
@indalaterre
Copy link
Contributor Author

Yeah!! Perfect it works 🥳

@jponge
Copy link
Member

jponge commented Oct 7, 2021

Great!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants