Skip to content

Commit

Permalink
feat(scheduler): add scheduler configuration and add support for clus…
Browse files Browse the repository at this point in the history
…tered jobs

Fixes quarkusio#3520
  • Loading branch information
machi1990 committed Nov 15, 2019
1 parent f35c986 commit 5c5fa3f
Show file tree
Hide file tree
Showing 15 changed files with 564 additions and 23 deletions.
55 changes: 55 additions & 0 deletions docs/src/main/asciidoc/scheduler.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,61 @@ public class CountResourceTest {
----
1. Ensure that the response contains `count`

== Scheduler configurations

`quarkus.scheduler.instance-name`:: The instance name of the scheduler.
+
Type: `java.lang.String` +
Defaults to: `DefaultQuartzScheduler` +


`quarkus.scheduler.store.misfire-threshold`:: The number of duration by which a trigger must have missed its next-fire-time, in order for it to be considered "misfired" and thus have its misfire instruction applied.
+
Type: `java.time.Duration` +
Defaults to: `1m` +


`quarkus.scheduler.store.cluster-checking-interval`:: The frequency at which this instance "checks-in" with the other instances of the cluster .This affects the rate of detecting failed instances.
+
Type: `java.time.Duration` +
Defaults to: `20s` +


`quarkus.scheduler.store.cluster-enabled`:: Enable cluster mode or not. This takes effect is the store type is `database-job-store`.
+
Type: `java.lang.Boolean` +


`quarkus.scheduler.store.type`:: The type of store to use. Possible values are: `ram-job-store`, c. Defaults to `ram-job-store`.
When using this option make sure that you have the link:datasource-guide.html[agroal datasource configured] and that <<creating-scheduling-job, scheduling tables are>>.
+
Type: `io.quarkus.scheduler.runtime.SchedulerBuildTimeConfig.StoreType` +
Defaults to: `ram-job-store` +


`quarkus.scheduler.thread-count`:: The size of scheduler thread pool. This will initialise the number of worker threads in the pool
+
Type: `int` +
Defaults to: `25` +


`quarkus.scheduler.thread-priority`:: Thread priority of worker threads in the pool.
+
Type: `int` +
Defaults to: `5` +

[WARNING]
[[creating-scheduling-job]]
.About using `database-job-store` store type properly
=====
The Quarkus scheduler does not create the necessary scheduling tables in database automatically. If these tables are missing, the scheduler will throw an exception during application startup.
Thus you'll need to create them. To do so, you can visit https://community.exoplatform.com/portal/fr/intranet/wiki/How_to_configure_quartz_with_persistent_mode[configuring persistent mode].
The table creation sql queries will need to be adapted to suit the SQL driver you are using. You can use the link:flyway-guide.html[flyway extension] to execute the queries for the creation of the schemas.
=====


include::duration-format-note.adoc[]

== Package and run the application

Run the application with: `./mvnw compile quarkus:dev`.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.scheduler.deployment;

import static io.quarkus.deployment.annotations.ExecutionTime.RUNTIME_INIT;
import static io.quarkus.deployment.annotations.ExecutionTime.STATIC_INIT;

import java.text.ParseException;
Expand All @@ -21,8 +22,11 @@
import org.jboss.jandex.Type.Kind;
import org.jboss.logging.Logger;
import org.quartz.CronExpression;
import org.quartz.impl.jdbcjobstore.JobStoreSupport;
import org.quartz.impl.triggers.AbstractTrigger;
import org.quartz.impl.triggers.SimpleTriggerImpl;
import org.quartz.simpl.CascadingClassLoadHelper;
import org.quartz.simpl.RAMJobStore;
import org.quartz.simpl.SimpleInstanceIdGenerator;
import org.quartz.simpl.SimpleThreadPool;

import io.quarkus.arc.Arc;
Expand Down Expand Up @@ -59,10 +63,7 @@
import io.quarkus.gizmo.ResultHandle;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.ScheduledExecution;
import io.quarkus.scheduler.runtime.QuartzScheduler;
import io.quarkus.scheduler.runtime.ScheduledInvoker;
import io.quarkus.scheduler.runtime.SchedulerConfiguration;
import io.quarkus.scheduler.runtime.SchedulerDeploymentRecorder;
import io.quarkus.scheduler.runtime.*;

/**
* @author Martin Kouba
Expand Down Expand Up @@ -119,9 +120,9 @@ public void transform(TransformationContext context) {
@BuildStep
List<ReflectiveClassBuildItem> reflectiveClasses() {
List<ReflectiveClassBuildItem> reflectiveClasses = new ArrayList<>();
reflectiveClasses.add(new ReflectiveClassBuildItem(false, false, CascadingClassLoadHelper.class.getName()));
reflectiveClasses.add(new ReflectiveClassBuildItem(true, false, SimpleThreadPool.class.getName()));
reflectiveClasses.add(new ReflectiveClassBuildItem(true, false, RAMJobStore.class.getName()));
reflectiveClasses.add(new ReflectiveClassBuildItem(false, false, CascadingClassLoadHelper.class.getName()));
reflectiveClasses.add(new ReflectiveClassBuildItem(true, false, SimpleInstanceIdGenerator.class.getName()));
return reflectiveClasses;
}

Expand Down Expand Up @@ -221,12 +222,42 @@ public void build(SchedulerDeploymentRecorder recorder, BeanContainerBuildItem b
}

@BuildStep
public void logCleanup(BuildProducer<LogCleanupFilterBuildItem> logCleanupFilter) {
@Record(RUNTIME_INIT)
public void registerConfiguration(SchedulerDeploymentRecorder recorder, SchedulerRuntimeConfig config) {
recorder.registerConfiguration(config);
}

@BuildStep
@Record(STATIC_INIT)
public void registerConfiguration(SchedulerDeploymentRecorder recorder, SchedulerBuildTimeConfig config,
BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
StoreType storeType = config.store.type;
reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, storeType.clazz));

if (storeType.equals(StoreType.DATABASE_JOB_STORE)) {
reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, JobStoreSupport.class.getName()));
reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, AbstractTrigger.class.getName()));
reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, SimpleTriggerImpl.class.getName()));
reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, QuarkusQuartzJDBCDriver.class.getName()));
reflectiveClass.produce(
new ReflectiveClassBuildItem(true, true, "io.quarkus.scheduler.runtime.QuartzScheduler$InvokerJob"));
reflectiveClass
.produce(new ReflectiveClassBuildItem(true, true, "io.quarkus.scheduler.runtime.QuartzScheduler$TimeJob"));
reflectiveClass
.produce(new ReflectiveClassBuildItem(true, false, AgroalQuartzConnectionPoolingProvider.class.getName()));
}

recorder.registerConfiguration(config);
}

@BuildStep
public void logCleanup(BuildProducer<LogCleanupFilterBuildItem> logCleanupFilter,
SchedulerBuildTimeConfig schedulerBuildTimeConfig) {
logCleanupFilter.produce(new LogCleanupFilterBuildItem("org.quartz.impl.StdSchedulerFactory",
"Quartz scheduler version:",
// no need to log if it's the default
"Using default implementation for",
"Quartz scheduler 'DefaultQuartzScheduler'"));
"Quartz scheduler '" + schedulerBuildTimeConfig.instanceName + "'"));

logCleanupFilter.produce(new LogCleanupFilterBuildItem("org.quartz.core.QuartzScheduler",
"Quartz Scheduler v",
Expand All @@ -235,8 +266,8 @@ public void logCleanup(BuildProducer<LogCleanupFilterBuildItem> logCleanupFilter
// no need to log if it's the default
"Scheduler DefaultQuartzScheduler"));

logCleanupFilter.produce(new LogCleanupFilterBuildItem("org.quartz.simpl.RAMJobStore",
"RAMJobStore initialized."));
StoreType storeType = schedulerBuildTimeConfig.store.type;
logCleanupFilter.produce(new LogCleanupFilterBuildItem(storeType.clazz, storeType.name + " initialized.", "Handling"));

logCleanupFilter.produce(new LogCleanupFilterBuildItem("org.quartz.core.SchedulerSignalerImpl",
"Initialized Scheduler Signaller of type"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.quarkus.scheduler.test;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;

public class NoDataSourceTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.setExpectedException(IllegalStateException.class)
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(SimpleJobs.class)
.addAsResource(new StringAsset("simpleJobs.cron=0/1 * * * * ?\nsimpleJobs.every=1s" +
"\nquarkus.scheduler.store.type=database-job-store\n"),
"application.properties"));

@Test
public void shouldFailMissingDataSource() throws InterruptedException {
/**
* Should not reach here
*/
Assertions.fail();
}

}
4 changes: 4 additions & 0 deletions extensions/scheduler/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core</artifactId>
</dependency>
<dependency>
<groupId>com.oracle.substratevm</groupId>
<artifactId>svm</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.quarkus.scheduler.runtime;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;

import javax.sql.DataSource;

import org.quartz.utils.PoolingConnectionProvider;

import io.quarkus.arc.Arc;
import io.quarkus.arc.InstanceHandle;

public class AgroalQuartzConnectionPoolingProvider implements PoolingConnectionProvider {
private DataSource dataSource;

public AgroalQuartzConnectionPoolingProvider() {
InstanceHandle<DataSource> dataSourceInstanceHandle = Arc.container().instance(DataSource.class);
if (dataSourceInstanceHandle.isAvailable()) {
this.dataSource = dataSourceInstanceHandle.get();
} else {
throw new IllegalStateException(
"JDBC Store configured but datasource is missing. You can configure your datasource by following the guide available at: https://quarkus.io/guides/datasource-guide");
}
}

@SuppressWarnings("unused")
public AgroalQuartzConnectionPoolingProvider(Properties properties) {
this();
}

@Override
public DataSource getDataSource() {
return dataSource;
}

@Override
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}

@Override
public void shutdown() {
// Do nothing as it will be closed inside Agroal extension ?
}

@Override
public void initialize() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.quarkus.scheduler.runtime;

import java.io.ByteArrayOutputStream;
import java.sql.ResultSet;

import org.quartz.impl.jdbcjobstore.StdJDBCDelegate;

public class QuarkusQuartzJDBCDriver extends StdJDBCDelegate {

/**
* Activate the usage of {@link java.util.Properties} to avoid Object serialization
*
* @return true
*/
@Override
protected boolean canUseProperties() {
return true;
}

@Override
protected ByteArrayOutputStream serializeObject(Object obj) {
throw new IllegalStateException("Object serialization not supported."); // should not reach here
}

@Override
protected Object getObjectFromBlob(ResultSet rs, String colName) {
throw new IllegalStateException("Object serialization not supported."); // should not reach here
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -117,19 +118,8 @@ void start(@Observes StartupEvent startupEvent) {
if (running.compareAndSet(false, true)) {

try {
// TODO: leverage quarkus config - these values are just copied from the default quartz.properties
Properties props = new Properties();
props.put("org.quartz.scheduler.instanceName", "DefaultQuartzScheduler");
props.put("org.quartz.scheduler.rmi.export", false);
props.put("org.quartz.scheduler.rmi.proxy", false);
props.put("org.quartz.scheduler.wrapJobExecutionInUserTransaction", false);
props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
props.put("org.quartz.threadPool.threadCount", "10");
props.put("org.quartz.threadPool.threadPriority", "5");
props.put("org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread", true);
props.put("org.quartz.jobStore.misfireThreshold", "60000");
props.put("org.quartz.jobStore.class", "org.quartz.simpl.RAMJobStore");

Properties props = getSchedulerConfigurationProperties();
SchedulerFactory schedulerFactory = new StdSchedulerFactory(props);
scheduler = schedulerFactory.getScheduler();

Expand Down Expand Up @@ -217,6 +207,52 @@ public Job newJob(TriggerFiredBundle bundle, org.quartz.Scheduler scheduler) thr
}
}

private Properties getSchedulerConfigurationProperties() {
Properties props = new Properties();
SchedulerRuntimeConfig schedulerRuntimeConfig = SchedulerConfigHolder.getSchedulerRuntimeConfig();
SchedulerBuildTimeConfig schedulerBuildTimeConfig = SchedulerConfigHolder.getSchedulerBuildTimeConfig();

props.put(StdSchedulerFactory.PROP_SCHED_RMI_EXPORT, false);
props.put(StdSchedulerFactory.PROP_SCHED_RMI_PROXY, false);
props.put(StdSchedulerFactory.PROP_SCHED_WRAP_JOB_IN_USER_TX, true);
props.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, "org.quartz.simpl.SimpleThreadPool");
props.put(StdSchedulerFactory.PROP_SCHED_INSTANCE_ID, "AUTO");
props.put(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME, schedulerBuildTimeConfig.instanceName);
props.put("org.quartz.threadPool.threadCount", String.valueOf(schedulerRuntimeConfig.threadCount));
props.put("org.quartz.threadPool.threadPriority", String.valueOf(schedulerRuntimeConfig.threadPriority));
props.put("org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread", true);
props.put("org.quartz.jobStore.misfireThreshold",
String.valueOf(schedulerBuildTimeConfig.store.misfireThreshold.getSeconds()));

switch (schedulerBuildTimeConfig.store.type) {
case RAM_JOB_STORE:
props.put(StdSchedulerFactory.PROP_JOB_STORE_CLASS, StoreType.RAM_JOB_STORE.clazz);
break;
case DATABASE_JOB_STORE: {
props.put(StdSchedulerFactory.PROP_JOB_STORE_CLASS, StoreType.DATABASE_JOB_STORE.clazz);
props.put("org.quartz.jobStore.useProperties", false);
props.put("org.quartz.jobStore.dataSource", "QUARKUS_SCHEDULER_DS");
props.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
props.put("org.quartz.jobStore.driverDelegateClass", QuarkusQuartzJDBCDriver.class.getName());

props.put("org.quartz.dataSource.QUARKUS_SCHEDULER_DS.connectionProvider.class",
AgroalQuartzConnectionPoolingProvider.class.getName());
Optional<Boolean> clusterEnabled = schedulerBuildTimeConfig.store.clusterEnabled;
if (clusterEnabled.isPresent()) {
String interval = "20000"; // 20s
Optional<Duration> clusterCheckingInterval = schedulerBuildTimeConfig.store.clusterCheckingInterval;
if (clusterCheckingInterval.isPresent()) {
interval = String.valueOf(clusterCheckingInterval.get().getSeconds());
}
props.put("org.quartz.jobStore.isClustered", clusterEnabled.get().booleanValue());
props.put("org.quartz.jobStore.clusterCheckinInterval", interval);
}
}
}

return props;
}

@PreDestroy
void destroy() {
if (running.compareAndSet(true, false)) {
Expand Down
Loading

0 comments on commit 5c5fa3f

Please sign in to comment.