Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PendingTasks leak in TimedScheduler #3642

Closed
AChekroun opened this issue Nov 17, 2023 · 4 comments
Closed

PendingTasks leak in TimedScheduler #3642

AChekroun opened this issue Nov 17, 2023 · 4 comments
Assignees
Labels
area/observability good first issue Ideal for a new contributor, we'll help type/bug A general bug
Milestone

Comments

@AChekroun
Copy link

In TimedScheduler, micrometer's counter pendingTasks is never stop if the underlying scheduler reject the task.

Expected Behavior

Micrometer's counter pendingTasks must be stopped if the underlying scheduler throws a RejectedExecutionException

Actual Behavior

Micrometer's counter pendingTasks is not be stopped when the underlying scheduler throws a RejectedExecutionException, resulting in never ending LongRunningTask

Steps to Reproduce

    @Test
    void test() {
        CountDownLatch cdl = new CountDownLatch(1);
        SimpleMeterRegistry smr = new SimpleMeterRegistry();
        ExecutorService executorService =
            new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<>()
            );

        Scheduler originalScheduler = Schedulers.fromExecutorService(executorService); // 1 thread - SynchronousQueue
        Scheduler timedScheduler = Micrometer.timedScheduler(originalScheduler, smr, "timed_scheduler");
        var meterIdStr = "timed_scheduler.scheduler.tasks.pending";
        RequiredSearch requiredSearch = smr.get(meterIdStr);
        LongTaskTimer longTaskTimer = requiredSearch.longTaskTimer();

        Runnable supp = () -> {
            try {
                cdl.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        };

        // 1 - Runnable is successfully submitted, but will be blocked by CDL
        Assertions.assertDoesNotThrow(() -> timedScheduler.schedule(supp));

        // 2nd is rejected because scheduler has only 1 thread and is currently busy with exec 1
        Assertions.assertThrows(RejectedExecutionException.class, () -> timedScheduler.schedule(supp));
        cdl.countDown(); // release 1st

        assertEquals(0, longTaskTimer.activeTasks(), "Expected no active task since 2nd submission has been rejected");
    }

Possible Solution

Catch any RejectedExecutionException to stop any pending tasks

Your Environment

  • Reactor version(s) used:
    • reactor-core 3.6.0
    • reactor core-micrometer 1.1.0
  • JVM version (java -version): 17.0.6
  • Micrometer: 1.11.4
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Nov 17, 2023
@OlegDokuka OlegDokuka added type/bug A general bug good first issue Ideal for a new contributor, we'll help and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Nov 22, 2023
@OlegDokuka OlegDokuka added this to the 3.5.13 milestone Nov 22, 2023
@Nicolas125841
Copy link
Contributor

Hi @OlegDokuka, could I give this one a try?

I'm thinking of placing try-catch checks around the delegate.schedule calls in TimedScheduler to remove the pending status on rejection. This doesn't seem to affect the periodic version because it doesn't create a pending sample. Does that seem alright?

Thanks.

@OlegDokuka
Copy link
Contributor

Hi @OlegDokuka, could I give this one a try?

I'm thinking of placing try-catch checks around the delegate.schedule calls in TimedScheduler to remove the pending status on rejection. This doesn't seem to affect the periodic version because it doesn't create a pending sample. Does that seem alright?

Thanks.

please do!

Nicolas125841 added a commit to Nicolas125841/reactor-core that referenced this issue Dec 11, 2023
Fixes issue with TimedScheduler where tasks rejected by underlying scheduler would not be removed from pendingTaks count. Idea is to catch any RejectedExecutionException thrown when scheduling the task, stop its pending sample to remove it from the pending count, and rethrow the error so caller knows that the task was rejected.

Fixes reactor#3642.
Nicolas125841 added a commit to Nicolas125841/reactor-core that referenced this issue Dec 11, 2023
Fixes issue with TimedScheduler where tasks rejected by underlying
scheduler would not be removed from pendingTaks count. Idea is to
catch any RejectedExecutionException thrown when scheduling the
task, stop its pending sample to remove it from the pending count, and
rethrow the error so caller knows that the task was rejected.

Fixes reactor#3642.
Nicolas125841 added a commit to Nicolas125841/reactor-core that referenced this issue Dec 11, 2023
Fixes issue with TimedScheduler where tasks rejected by underlying
scheduler would not be removed from pendingTaks count. Idea is to
catch any RejectedExecutionException thrown when scheduling the
task, stop its pending sample to remove it from the pending count, and
rethrow the error so caller knows that the task was rejected.

Fixes reactor#3642
Nicolas125841 added a commit to Nicolas125841/reactor-core that referenced this issue Dec 11, 2023
Fixes issue with TimedScheduler where tasks rejected by underlying
scheduler would not be removed from pendingTaks count. Idea is to
catch any RejectedExecutionException thrown when scheduling the
task, stop its pending sample to remove it from the pending count, and
rethrow the error so caller knows that the task was rejected.

Fixes reactor#3642
Nicolas125841 added a commit to Nicolas125841/reactor-core that referenced this issue Dec 11, 2023
Fixes issue with TimedScheduler where tasks rejected by underlying
scheduler would not be removed from pendingTaks count. Idea is to
catch any RejectedExecutionException thrown when scheduling the
task, stop its pending sample to remove it from the pending count, and
rethrow the error so caller knows that the task was rejected.

Fixes reactor#3642
@violetagg violetagg modified the milestones: 3.5.13, 3.5.14 Dec 11, 2023
@BarexaS
Copy link

BarexaS commented Dec 13, 2023

Can changes by applied to all kind of exception? My case - I have schedulers metrics in DataDog, and starting from some point any type of scheduler can start produce incomplete scheduled tasks metric from 0 task to millions of them. I guess problem not only in RejectedExecutionException since I use only project reactor and subscribeOn/publishOn methods.

@chemicL
Copy link
Member

chemicL commented Dec 13, 2023

@BarexaS, the fix for this particular bug involves handling exceptions that we anticipate can happen in the API that is controlled by us, so we can stay focused on incremental improvements and get this fix incorporated in the current form to adhere to the original report. Please do open a new issue with a reproducible example which demonstrates the leak you observe, preferably with minimum dependencies.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/observability good first issue Ideal for a new contributor, we'll help type/bug A general bug
Projects
None yet
Development

No branches or pull requests

7 participants