From d1042594657ffc868d6e50b9eb73a554abd9f8f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Thu, 21 Apr 2022 11:32:40 +0200 Subject: [PATCH] Fix a Many sink / EmitterProcessor subscriber disposal leak The EmitterProcessor#remove method causes retaining of subscribers if the removal is done in parallel, as the CAS failure doesn't cause a new loop iteration. This applies to direct instantiations of EmitterProcessor as well as Sinks.many().onBackpressureBuffer sinks. This commit fixes the method to loop back when the CAS fails. Fixes #3028. --- .../core/publisher/EmitterProcessor.java | 4 +- .../core/publisher/EmitterProcessorTest.java | 44 ++++++++++++++----- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java index 08589e3500..cbe565e96a 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -616,8 +616,8 @@ final void remove(FluxPublish.PubSubInner inner) { q.clear(); } } + return; } - return; } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/EmitterProcessorTest.java b/reactor-core/src/test/java/reactor/core/publisher/EmitterProcessorTest.java index 3ff6468911..b3286e1241 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/EmitterProcessorTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/EmitterProcessorTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,8 +26,10 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import org.reactivestreams.Processor; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -38,24 +40,18 @@ import reactor.core.Scannable; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import reactor.test.AutoDisposingExtension; import reactor.test.StepVerifier; import reactor.test.subscriber.AssertSubscriber; import reactor.util.annotation.Nullable; import reactor.util.concurrent.Queues; import reactor.util.context.Context; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.assertj.core.api.Assertions.fail; +import static org.assertj.core.api.Assertions.*; import static reactor.core.Scannable.Attr; -import static reactor.core.Scannable.Attr.BUFFERED; -import static reactor.core.Scannable.Attr.CANCELLED; -import static reactor.core.Scannable.Attr.CAPACITY; -import static reactor.core.Scannable.Attr.PREFETCH; -import static reactor.core.Scannable.Attr.TERMINATED; -import static reactor.core.scheduler.Schedulers.DEFAULT_POOL_SIZE; +import static reactor.core.Scannable.Attr.*; import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST; +import static reactor.core.scheduler.Schedulers.DEFAULT_POOL_SIZE; /** * @author Stephane Maldini @@ -64,6 +60,9 @@ @SuppressWarnings("deprecation") public class EmitterProcessorTest { + @RegisterExtension + AutoDisposingExtension afterTest = new AutoDisposingExtension(); + @Test public void currentSubscriberCount() { Sinks.Many sink = EmitterProcessor.create(); @@ -79,6 +78,29 @@ public void currentSubscriberCount() { assertThat(sink.currentSubscriberCount()).isEqualTo(2); } + // see https://github.com/reactor/reactor-core/issues/3028 + @Test + void concurrentSubscriberDisposalDoesntLeak() { + for (int repetition = 0; repetition < 20; repetition++) { + Scheduler disposeScheduler = afterTest.autoDispose(Schedulers.newParallel("concurrentSubscriberDisposalDoesntLeak", 5)); + + List toDisposeInMultipleThreads = new ArrayList<>(); + Sinks.Many sink = EmitterProcessor.create(); + + for (int i = 0; i < 10; i++) { + toDisposeInMultipleThreads.add( + sink.asFlux().subscribe(null, null, null, Context.of("inner", "#" + i)) + ); + } + + toDisposeInMultipleThreads.forEach(disposable -> disposeScheduler.schedule(disposable::dispose)); + + Awaitility.await().atMost(Duration.ofSeconds(2)) + .alias("no more subscribers") + .untilAsserted(() -> assertThat(sink.currentSubscriberCount()).as("subscriberCount").isZero()); + } + } + //see https://github.com/reactor/reactor-core/issues/1364 @Test public void subscribeWithSyncFusionUpstreamFirst() {