Skip to content

Commit

Permalink
feat(scheduler): add scheduler configuration and add support for clus…
Browse files Browse the repository at this point in the history
…tering

Fixes quarkusio#3520
  • Loading branch information
machi1990 committed Aug 30, 2019
1 parent 3993977 commit 11407ee
Show file tree
Hide file tree
Showing 14 changed files with 583 additions and 19 deletions.
69 changes: 69 additions & 0 deletions docs/src/main/asciidoc/scheduled-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,75 @@ public class CountResourceTest {
----
1. Ensure that the response contains `count`

== Scheduler configurations

`quarkus.scheduler.instance-name`:: The instance name of the scheduler.
+
Type: `java.lang.String` +
Defaults to: `DefaultQuartzScheduler` +


`quarkus.scheduler.store.misfire-threshold`:: The number of duration by which a trigger must have missed its next-fire-time, in order for it to be considered "misfired" and thus have its misfire instruction applied.
+
Type: `java.time.Duration` +
Defaults to: `1m` +


`quarkus.scheduler.store.cluster-checking-interval`:: The frequency at which this instance "checks-in" with the other instances of the cluster .This affects the rate of detecting failed instances.
+
Type: `java.time.Duration` +
Defaults to: `20s` +


`quarkus.scheduler.store.cluster-enabled`:: Enable cluster mode or not. This takes effect is the store type is `database-store`.
+
Type: `java.lang.Boolean` +


`quarkus.scheduler.store.driver-delegate-class`:: The JDBC driver delegate class.
+
Type: `java.lang.String` +
Defaults to: `org.quartz.impl.jdbcjobstore.StdJDBCDelegate` +


`quarkus.scheduler.store.type`:: The type of store to use. Possible values are: `ram-store`, and `database-store`.
- If set to `ram-store`, the scheduler will use the `org.quartz.simpl.RAMJobStore` job store class
- If set to `database-store`, the scheduler will use the `org.quartz.impl.jdbcjobstore.JobStoreTX` job store class.
When using this option make sure that you have the link:datasource-guide.html[agroal datasource configured] and that <<creating-scheduling-job, scheduling tables are>> exists.
+
Type: `io.quarkus.scheduler.runtime.SchedulerBuildTimeConfig.StoreType` +
Defaults to: `ram-store` +


`quarkus.scheduler.instance-id`:: The instance id of the scheduler. This is highly required when running clustered schedulers as each node in the cluster MUST have a unique instanceId.
Defaults to `AUTO` to automatically generate unique ids for each node in the cluster
+
Type: `java.lang.String` +
Defaults to: `AUTO` +


`quarkus.scheduler.thread-count`:: The size of scheduler thread pool. This will initialise the number of worker threads in the pool
+
Type: `int` +
Defaults to: `25` +


`quarkus.scheduler.thread-priority`:: Thread priority of worker threads in the pool.
+
Type: `int` +
Defaults to: `5` +

include::duration-format-note.adoc[]

[TIP]
[[creating-scheduling-job]]
.About creating database tables when using `database-store` store type
=====
The Quarkus scheduler does not create the necessary scheduling tables in database automatically. If these tables are missing, the schuduler will throw an excpetion during application startup.
Thus you'll need to create them. To do so, copy the content from https://github.com/quarkusio/quarkus/tree/master/integration-tests/main/src/main/resources/import.sql[scheduler's sql script file] and adapt the content of the file to suit your SQL driver.
Once you are done, you can use the link:flyway-guide.html[flyway extension] to execute tables creation using the resulting SQL queries.
=====

== Package and run the application

Run the application with: `./mvnw compile quarkus:dev`.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.scheduler.deployment;

import static io.quarkus.deployment.annotations.ExecutionTime.RUNTIME_INIT;
import static io.quarkus.deployment.annotations.ExecutionTime.STATIC_INIT;

import java.text.ParseException;
Expand All @@ -22,7 +23,6 @@
import org.jboss.logging.Logger;
import org.quartz.CronExpression;
import org.quartz.simpl.CascadingClassLoadHelper;
import org.quartz.simpl.RAMJobStore;
import org.quartz.simpl.SimpleThreadPool;

import io.quarkus.arc.Arc;
Expand Down Expand Up @@ -58,10 +58,14 @@
import io.quarkus.gizmo.ResultHandle;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.ScheduledExecution;
import io.quarkus.scheduler.runtime.AgroalQuartzConnectionPoolingProvider;
import io.quarkus.scheduler.runtime.QuartzScheduler;
import io.quarkus.scheduler.runtime.ScheduledInvoker;
import io.quarkus.scheduler.runtime.SchedulerBuildTimeConfig;
import io.quarkus.scheduler.runtime.SchedulerConfiguration;
import io.quarkus.scheduler.runtime.SchedulerDeploymentRecorder;
import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig;
import io.quarkus.scheduler.runtime.StoreType;

/**
* @author Martin Kouba
Expand Down Expand Up @@ -120,7 +124,7 @@ List<ReflectiveClassBuildItem> reflectiveClasses() {
List<ReflectiveClassBuildItem> reflectiveClasses = new ArrayList<>();
reflectiveClasses.add(new ReflectiveClassBuildItem(false, false, CascadingClassLoadHelper.class.getName()));
reflectiveClasses.add(new ReflectiveClassBuildItem(true, false, SimpleThreadPool.class.getName()));
reflectiveClasses.add(new ReflectiveClassBuildItem(true, false, RAMJobStore.class.getName()));
reflectiveClasses.add(new ReflectiveClassBuildItem(true, false, AgroalQuartzConnectionPoolingProvider.class.getName()));
return reflectiveClasses;
}

Expand Down Expand Up @@ -225,12 +229,38 @@ public void write(String name, byte[] data) {
}

@BuildStep
public void logCleanup(BuildProducer<LogCleanupFilterBuildItem> logCleanupFilter) {
@Record(RUNTIME_INIT)
public void registerConfiguration(SchedulerDeploymentRecorder recorder,
SchedulerRuntimeConfig schedulerRuntimeConfig) {
recorder.registerConfiguration(schedulerRuntimeConfig);
}

@BuildStep
@Record(STATIC_INIT)
public void registerConfiguration(SchedulerDeploymentRecorder recorder,
SchedulerBuildTimeConfig schedulerBuildTimeConfig,
BuildProducer<ReflectiveClassBuildItem> reflectiveClassBuildItemBuildProducer) {

StoreType storeType = schedulerBuildTimeConfig.store.type;
reflectiveClassBuildItemBuildProducer
.produce(new ReflectiveClassBuildItem(true, false, storeType.clazz));

if (storeType.equals(StoreType.DATABASE_STORE)) {
reflectiveClassBuildItemBuildProducer
.produce(new ReflectiveClassBuildItem(true, false, schedulerBuildTimeConfig.store.driverDelegateClass));
}

recorder.registerConfiguration(schedulerBuildTimeConfig);
}

@BuildStep
public void logCleanup(BuildProducer<LogCleanupFilterBuildItem> logCleanupFilter,
SchedulerBuildTimeConfig schedulerBuildTimeConfig) {
logCleanupFilter.produce(new LogCleanupFilterBuildItem("org.quartz.impl.StdSchedulerFactory",
"Quartz scheduler version:",
// no need to log if it's the default
"Using default implementation for",
"Quartz scheduler 'DefaultQuartzScheduler'"));
"Quartz scheduler '" + schedulerBuildTimeConfig.instanceName + "'"));

logCleanupFilter.produce(new LogCleanupFilterBuildItem("org.quartz.core.QuartzScheduler",
"Quartz Scheduler v",
Expand All @@ -239,8 +269,8 @@ public void logCleanup(BuildProducer<LogCleanupFilterBuildItem> logCleanupFilter
// no need to log if it's the default
"Scheduler DefaultQuartzScheduler"));

logCleanupFilter.produce(new LogCleanupFilterBuildItem("org.quartz.simpl.RAMJobStore",
"RAMJobStore initialized."));
StoreType storeType = schedulerBuildTimeConfig.store.type;
logCleanupFilter.produce(new LogCleanupFilterBuildItem(storeType.clazz, storeType.name + " initialized.", "Handling"));

logCleanupFilter.produce(new LogCleanupFilterBuildItem("org.quartz.core.SchedulerSignalerImpl",
"Initialized Scheduler Signaller of type"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.quarkus.scheduler.test;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;

public class NoDataSourceTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.setExpectedException(IllegalStateException.class)
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(SimpleJobs.class)
.addAsResource(new StringAsset("simpleJobs.cron=0/1 * * * * ?\nsimpleJobs.every=1s" +
"\nquarkus.scheduler.store.type=database-store\n"),
"application.properties"));

@Test
public void shouldFailMissingDataSource() throws InterruptedException {
/**
* Should not reach here
*/
Assertions.fail();
}

}
4 changes: 4 additions & 0 deletions extensions/scheduler/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core</artifactId>
</dependency>
<dependency>
<groupId>com.oracle.substratevm</groupId>
<artifactId>svm</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.quarkus.scheduler.runtime;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;

import javax.sql.DataSource;

import org.quartz.utils.PoolingConnectionProvider;

import io.quarkus.arc.Arc;
import io.quarkus.arc.InstanceHandle;

public class AgroalQuartzConnectionPoolingProvider implements PoolingConnectionProvider {
final private DataSource dataSource;

public AgroalQuartzConnectionPoolingProvider() {
final InstanceHandle<DataSource> dataSourceInstanceHandle = Arc.container().instance(DataSource.class);
if (dataSourceInstanceHandle.isAvailable()) {
this.dataSource = dataSourceInstanceHandle.get();
} else {
throw new IllegalStateException(
"JDBC Store configured but datasource is missing. You can configure your datasource by following the guide available at: https://quarkus.io/guides/datasource-guide");
}
}

@SuppressWarnings("unused")
public AgroalQuartzConnectionPoolingProvider(Properties properties) {
this();
}

@Override
public DataSource getDataSource() {
return dataSource;
}

@Override
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}

@Override
public void shutdown() {
}

@Override
public void initialize() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -113,19 +114,7 @@ void start(@Observes StartupEvent startupEvent) {
if (running.compareAndSet(false, true)) {

try {
// TODO: leverage quarkus config - these values are just copied from the default quartz.properties
Properties props = new Properties();
props.put("org.quartz.scheduler.instanceName", "DefaultQuartzScheduler");
props.put("org.quartz.scheduler.rmi.export", false);
props.put("org.quartz.scheduler.rmi.proxy", false);
props.put("org.quartz.scheduler.wrapJobExecutionInUserTransaction", false);
props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
props.put("org.quartz.threadPool.threadCount", "10");
props.put("org.quartz.threadPool.threadPriority", "5");
props.put("org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread", true);
props.put("org.quartz.threadPool.threadPriority", "5");
props.put("org.quartz.jobStore.misfireThreshold", "60000");
props.put("org.quartz.jobStore.class", "org.quartz.simpl.RAMJobStore");
Properties props = getSchedulerConfigurationProperties();

SchedulerFactory schedulerFactory = new StdSchedulerFactory(props);
scheduler = schedulerFactory.getScheduler();
Expand Down Expand Up @@ -214,6 +203,50 @@ public Job newJob(TriggerFiredBundle bundle, org.quartz.Scheduler scheduler) thr
}
}

private Properties getSchedulerConfigurationProperties() {
Properties props = new Properties();
SchedulerRuntimeConfig schedulerRuntimeConfig = SchedulerConfigHolder.getSchedulerRuntimeConfig();
SchedulerBuildTimeConfig schedulerBuildTimeConfig = SchedulerConfigHolder.getSchedulerBuildTimeConfig();

props.put(StdSchedulerFactory.PROP_SCHED_WRAP_JOB_IN_USER_TX, false);
props.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, "org.quartz.simpl.SimpleThreadPool");
props.put(StdSchedulerFactory.PROP_SCHED_INSTANCE_ID, schedulerRuntimeConfig.instanceId);
props.put(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME, schedulerBuildTimeConfig.instanceName);
props.put("org.quartz.threadPool.threadCount", String.valueOf(schedulerRuntimeConfig.threadCount));
props.put("org.quartz.threadPool.threadPriority", String.valueOf(schedulerRuntimeConfig.threadPriority));
props.put("org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread", true);
props.put("org.quartz.jobStore.misfireThreshold",
String.valueOf(schedulerBuildTimeConfig.store.misfireThreshold.toMillis()));

switch (schedulerBuildTimeConfig.store.type) {
case RAM_STORE:
props.put(StdSchedulerFactory.PROP_JOB_STORE_CLASS, StoreType.RAM_STORE.clazz);
break;
case DATABASE_STORE: {
props.put(StdSchedulerFactory.PROP_JOB_STORE_CLASS, StoreType.DATABASE_STORE.clazz);
props.put("org.quartz.jobStore.useProperties", true);
props.put("org.quartz.jobStore.dataSource", "QUARKUS_SCHEDULER_DS");
props.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
props.put("org.quartz.jobStore.driverDelegateClass", schedulerBuildTimeConfig.store.driverDelegateClass);

props.put("org.quartz.dataSource.QUARKUS_SCHEDULER_DS.connectionProvider.class",
AgroalQuartzConnectionPoolingProvider.class.getName());
Optional<Boolean> clusterEnabled = schedulerBuildTimeConfig.store.clusterEnabled;
if (clusterEnabled.isPresent()) {
String interval = "20000"; // 20s
Optional<Duration> clusterCheckingInterval = schedulerBuildTimeConfig.store.clusterCheckingInterval;
if (clusterCheckingInterval.isPresent()) {
interval = String.valueOf(clusterCheckingInterval.get().toMillis());
}
props.put("org.quartz.jobStore.isClustered", clusterEnabled.get().booleanValue());
props.put("org.quartz.jobStore.clusterCheckinInterval", interval);
}
}
}

return props;
}

@PreDestroy
void destroy() {
if (running.compareAndSet(true, false)) {
Expand Down
Loading

0 comments on commit 11407ee

Please sign in to comment.