Skip to content

Commit

Permalink
Fix a Many sink / EmitterProcessor subscriber disposal leak (#3029)
Browse files Browse the repository at this point in the history
The EmitterProcessor#remove method causes retaining of subscribers if
the removal is done in parallel, as the CAS failure doesn't cause a
new loop iteration.

This applies to direct instantiations of EmitterProcessor as well as
Sinks.many().onBackpressureBuffer sinks.

This commit fixes the method to loop back when the CAS fails.

Fixes #3028.
  • Loading branch information
simonbasle authored Apr 21, 2022
1 parent 1be0d59 commit adaec72
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -616,8 +616,8 @@ final void remove(FluxPublish.PubSubInner<T> inner) {
q.clear();
}
}
return;
}
return;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,8 +26,10 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand All @@ -38,24 +40,18 @@
import reactor.core.Scannable;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.AutoDisposingExtension;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.Assertions.*;
import static reactor.core.Scannable.Attr;
import static reactor.core.Scannable.Attr.BUFFERED;
import static reactor.core.Scannable.Attr.CANCELLED;
import static reactor.core.Scannable.Attr.CAPACITY;
import static reactor.core.Scannable.Attr.PREFETCH;
import static reactor.core.Scannable.Attr.TERMINATED;
import static reactor.core.scheduler.Schedulers.DEFAULT_POOL_SIZE;
import static reactor.core.Scannable.Attr.*;
import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
import static reactor.core.scheduler.Schedulers.DEFAULT_POOL_SIZE;

/**
* @author Stephane Maldini
Expand All @@ -64,6 +60,9 @@
@SuppressWarnings("deprecation")
public class EmitterProcessorTest {

@RegisterExtension
AutoDisposingExtension afterTest = new AutoDisposingExtension();

@Test
public void currentSubscriberCount() {
Sinks.Many<Integer> sink = EmitterProcessor.create();
Expand All @@ -79,6 +78,29 @@ public void currentSubscriberCount() {
assertThat(sink.currentSubscriberCount()).isEqualTo(2);
}

// see https://github.com/reactor/reactor-core/issues/3028
@Test
void concurrentSubscriberDisposalDoesntLeak() {
for (int repetition = 0; repetition < 20; repetition++) {
Scheduler disposeScheduler = afterTest.autoDispose(Schedulers.newParallel("concurrentSubscriberDisposalDoesntLeak", 5));

List<Disposable> toDisposeInMultipleThreads = new ArrayList<>();
Sinks.Many<Integer> sink = EmitterProcessor.create();

for (int i = 0; i < 10; i++) {
toDisposeInMultipleThreads.add(
sink.asFlux().subscribe(null, null, null, Context.of("inner", "#" + i))
);
}

toDisposeInMultipleThreads.forEach(disposable -> disposeScheduler.schedule(disposable::dispose));

Awaitility.await().atMost(Duration.ofSeconds(2))
.alias("no more subscribers")
.untilAsserted(() -> assertThat(sink.currentSubscriberCount()).as("subscriberCount").isZero());
}
}

//see https://github.com/reactor/reactor-core/issues/1364
@Test
public void subscribeWithSyncFusionUpstreamFirst() {
Expand Down

0 comments on commit adaec72

Please sign in to comment.