From 45c86cc178a943937a71b0f34692ea4711d738cc Mon Sep 17 00:00:00 2001 From: Matej Novotny Date: Mon, 19 Feb 2024 14:53:32 +0100 Subject: [PATCH] Quartz - prevent memory leak when Job instance is a @Dependent bean (cherry picked from commit 6dcfaca7b62b6119ed454e8fafc58fe389cbefcc) --- .../quartz/test/DependentBeanJobTest.java | 126 ++++++++++++++++++ .../quarkus/quartz/runtime/CdiAwareJob.java | 36 +++++ .../quartz/runtime/QuartzSchedulerImpl.java | 4 +- 3 files changed, 164 insertions(+), 2 deletions(-) create mode 100644 extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/DependentBeanJobTest.java create mode 100644 extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/CdiAwareJob.java diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/DependentBeanJobTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/DependentBeanJobTest.java new file mode 100644 index 0000000000000..cd6d828a1a1c9 --- /dev/null +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/DependentBeanJobTest.java @@ -0,0 +1,126 @@ +package io.quarkus.quartz.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.Dependent; +import jakarta.inject.Inject; + +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.quartz.Job; +import org.quartz.JobBuilder; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.SimpleScheduleBuilder; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; + +import io.quarkus.test.QuarkusUnitTest; + +public class DependentBeanJobTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addClasses(Service.class, MyJob.class) + .addAsResource(new StringAsset("quarkus.quartz.start-mode=forced"), + "application.properties")); + + @Inject + Scheduler quartz; + + @Inject + Service service; + + @Test + public void testDependentBeanJobDestroyed() throws SchedulerException, InterruptedException { + assertEquals(0, MyJob.timesConstructed); + assertEquals(0, MyJob.timesDestroyed); + // prepare latch, schedule 10 one-off jobs, assert + CountDownLatch latch = service.initializeLatch(10); + for (int i = 0; i < 10; i++) { + Trigger trigger = TriggerBuilder.newTrigger() + .withIdentity("myTrigger" + i, "myGroup") + .startNow() + .build(); + JobDetail job = JobBuilder.newJob(MyJob.class) + .withIdentity("myJob" + i, "myGroup") + .build(); + quartz.scheduleJob(job, trigger); + } + assertTrue(latch.await(5, TimeUnit.SECONDS), "Latch count: " + latch.getCount()); + assertEquals(10, MyJob.timesConstructed); + assertEquals(10, MyJob.timesDestroyed); + + // now try the same with repeating job triggering three times + latch = service.initializeLatch(3); + JobDetail job = JobBuilder.newJob(MyJob.class) + .withIdentity("myRepeatingJob", "myGroup") + .build(); + Trigger trigger = TriggerBuilder.newTrigger() + .withIdentity("myRepeatingTrigger", "myGroup") + .startNow() + .withSchedule( + SimpleScheduleBuilder.simpleSchedule() + .withIntervalInMilliseconds(333) + .withRepeatCount(3)) + .build(); + quartz.scheduleJob(job, trigger); + + assertTrue(latch.await(2, TimeUnit.SECONDS), "Latch count: " + latch.getCount()); + assertEquals(13, MyJob.timesConstructed); + assertEquals(13, MyJob.timesDestroyed); + } + + @ApplicationScoped + public static class Service { + + volatile CountDownLatch latch; + + public CountDownLatch initializeLatch(int latchCountdown) { + this.latch = new CountDownLatch(latchCountdown); + return latch; + } + + public void execute() { + latch.countDown(); + } + + } + + @Dependent + static class MyJob implements Job { + + public static volatile int timesConstructed = 0; + public static volatile int timesDestroyed = 0; + + @Inject + Service service; + + @PostConstruct + void postConstruct() { + timesConstructed++; + } + + @PreDestroy + void preDestroy() { + timesDestroyed++; + } + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + service.execute(); + } + } +} diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/CdiAwareJob.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/CdiAwareJob.java new file mode 100644 index 0000000000000..2618bb4379878 --- /dev/null +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/CdiAwareJob.java @@ -0,0 +1,36 @@ +package io.quarkus.quartz.runtime; + +import jakarta.enterprise.context.Dependent; +import jakarta.enterprise.inject.Instance; + +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.Scheduler; +import org.quartz.spi.TriggerFiredBundle; + +/** + * An abstraction allowing proper destruction of Job instances in case they are dependent beans. + * According to {@link org.quartz.spi.JobFactory#newJob(TriggerFiredBundle, Scheduler)}, a new job instance is created for every + * trigger. + * We will therefore create a new dependent bean for every trigger and destroy it afterwards. + */ +class CdiAwareJob implements Job { + + private final Instance.Handle jobInstanceHandle; + + public CdiAwareJob(Instance.Handle jobInstanceHandle) { + this.jobInstanceHandle = jobInstanceHandle; + } + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + try { + jobInstanceHandle.get().execute(context); + } finally { + if (jobInstanceHandle.getBean().getScope().equals(Dependent.class)) { + jobInstanceHandle.destroy(); + } + } + } +} diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java index 60e76ea042e8e..e2b9638cd387a 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java @@ -1243,10 +1243,10 @@ public Job newJob(TriggerFiredBundle bundle, org.quartz.Scheduler Scheduler) thr // Get the original class from an intercepted bean class jobClass = (Class) jobClass.getSuperclass(); } - Instance instance = jobs.select(jobClass); + Instance instance = jobs.select(jobClass); if (instance.isResolvable()) { // This is a job backed by a CDI bean - return jobWithSpanWrapper((Job) instance.get()); + return jobWithSpanWrapper(new CdiAwareJob(instance.getHandle())); } // Instantiate a plain job class return jobWithSpanWrapper(super.newJob(bundle, Scheduler));