Skip to content

Commit

Permalink
feat(quartz): add clustered jobs support
Browse files Browse the repository at this point in the history
  • Loading branch information
machi1990 committed Nov 18, 2019
1 parent a62522d commit 8981a0b
Show file tree
Hide file tree
Showing 21 changed files with 840 additions and 58 deletions.
2 changes: 1 addition & 1 deletion ci-templates/jvm-build-steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ steps:
goals: 'install'
mavenOptions: $(MAVEN_OPTS)
jdkVersionOption: ${{ parameters.jdk }}
options: '-B --settings azure-mvn-settings.xml -Dnative-image.docker-build -Dtest-postgresql -Dtest-elasticsearch -Dtest-mysql -Dtest-dynamodb -Dtest-vault -Dno-format ${{ parameters.extraf }}'
options: '-B --settings azure-mvn-settings.xml -Dnative-image.docker-build -Dtest-postgresql -Dtest-elasticsearch -Dtest-mysql -Dtest-dynamodb -Dtest-vault -Dtest-quartz -Dno-format ${{ parameters.extraf }}'

2 changes: 1 addition & 1 deletion ci-templates/native-build-steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ jobs:
inputs:
goals: 'install'
mavenOptions: $(MAVEN_OPTS)
options: '-pl integration-tests/${{ join('',integration-tests/'', parameters.modules) }} -B --settings azure-mvn-settings.xml -Dquarkus.native.container-build=true -Dtest-postgresql -Dtest-elasticsearch -Dtest-keycloak -Ddocker-keycloak -Dtest-dynamodb -Dtest-mysql -Dtest-vault -Dnative-image.xmx=6g -Dnative -Dno-format'
options: '-pl integration-tests/${{ join('',integration-tests/'', parameters.modules) }} -B --settings azure-mvn-settings.xml -Dquarkus.native.container-build=true -Dtest-postgresql -Dtest-elasticsearch -Dtest-keycloak -Ddocker-keycloak -Dtest-dynamodb -Dtest-mysql -Dtest-vault -Dtest-quartz -Dnative-image.xmx=6g -Dnative -Dno-format'
4 changes: 3 additions & 1 deletion ci-templates/stages.yml
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ stages:
parameters:
poolSettings: ${{parameters.poolSettings}}
expectUseVMs: ${{parameters.expectUseVMs}}
modules: main
modules:
- main
- quartz
name: main
postgres: true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,19 @@

import static io.quarkus.deployment.annotations.ExecutionTime.RUNTIME_INIT;

import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;

import org.quartz.core.QuartzSchedulerThread;
import org.quartz.core.SchedulerSignalerImpl;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.jdbcjobstore.AttributeRestoringConnectionInvocationHandler;
import org.quartz.impl.jdbcjobstore.JobStoreSupport;
import org.quartz.impl.triggers.AbstractTrigger;
import org.quartz.impl.triggers.SimpleTriggerImpl;
import org.quartz.simpl.CascadingClassLoadHelper;
import org.quartz.simpl.RAMJobStore;
import org.quartz.simpl.SimpleInstanceIdGenerator;
import org.quartz.simpl.SimpleThreadPool;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
Expand All @@ -17,18 +25,21 @@
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.CapabilityBuildItem;
import io.quarkus.deployment.builditem.ServiceStartBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageProxyDefinitionBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.logging.LogCleanupFilterBuildItem;
import io.quarkus.quartz.runtime.QuarkusQuartzConnectionPoolProvider;
import io.quarkus.quartz.runtime.QuartzBuildTimeConfig;
import io.quarkus.quartz.runtime.QuartzRecorder;
import io.quarkus.quartz.runtime.QuartzRuntimeConfig;
import io.quarkus.quartz.runtime.QuartzScheduler;
import io.quarkus.quartz.runtime.QuartzSupport;
import io.quarkus.quartz.runtime.StoreType;

/**
* @author Martin Kouba
*/
public class QuartzProcessor {

@BuildStep
CapabilityBuildItem capability() {
return new CapabilityBuildItem(Capabilities.QUARTZ);
Expand All @@ -40,39 +51,74 @@ AdditionalBeanBuildItem beans() {
}

@BuildStep
List<ReflectiveClassBuildItem> reflectiveClasses() {
NativeImageProxyDefinitionBuildItem connectionProxy(QuartzBuildTimeConfig config) {
if (config.storeType.equals(StoreType.DB)) {
return new NativeImageProxyDefinitionBuildItem(Connection.class.getName());
}
return null;
}

@BuildStep
List<ReflectiveClassBuildItem> reflectiveClasses(QuartzBuildTimeConfig config) {
List<ReflectiveClassBuildItem> reflectiveClasses = new ArrayList<>();
reflectiveClasses.add(new ReflectiveClassBuildItem(false, false, CascadingClassLoadHelper.class.getName()));
StoreType storeType = config.storeType;

reflectiveClasses.add(new ReflectiveClassBuildItem(true, false, SimpleThreadPool.class.getName()));
reflectiveClasses.add(new ReflectiveClassBuildItem(true, false, RAMJobStore.class.getName()));
reflectiveClasses.add(new ReflectiveClassBuildItem(true, false, SimpleInstanceIdGenerator.class.getName()));
reflectiveClasses.add(new ReflectiveClassBuildItem(false, false, CascadingClassLoadHelper.class.getName()));
reflectiveClasses.add(new ReflectiveClassBuildItem(true, true, storeType.clazz));

if (storeType.equals(StoreType.DB)) {
reflectiveClasses.add(new ReflectiveClassBuildItem(true, false, JobStoreSupport.class.getName()));
reflectiveClasses.add(new ReflectiveClassBuildItem(true, true, Connection.class.getName()));
reflectiveClasses.add(new ReflectiveClassBuildItem(true, false, AbstractTrigger.class.getName()));
reflectiveClasses.add(new ReflectiveClassBuildItem(true, false, SimpleTriggerImpl.class.getName()));
reflectiveClasses.add(new ReflectiveClassBuildItem(true, false, config.dbStore.driverDialect.get()));
reflectiveClasses
.add(new ReflectiveClassBuildItem(true, true, "io.quarkus.quartz.runtime.QuartzScheduler$InvokerJob"));
reflectiveClasses
.add(new ReflectiveClassBuildItem(true, false, QuarkusQuartzConnectionPoolProvider.class.getName()));
}

return reflectiveClasses;
}

@BuildStep
public void logCleanup(BuildProducer<LogCleanupFilterBuildItem> logCleanupFilter) {
logCleanupFilter.produce(new LogCleanupFilterBuildItem("org.quartz.impl.StdSchedulerFactory",
public List<LogCleanupFilterBuildItem> logCleanup(QuartzBuildTimeConfig config) {
StoreType storeType = config.storeType;
List<LogCleanupFilterBuildItem> logCleanUps = new ArrayList<>();
logCleanUps.add(new LogCleanupFilterBuildItem(StdSchedulerFactory.class.getName(),
"Quartz scheduler version:",
"Using default implementation for",
"Quartz scheduler 'QuarkusQuartzScheduler'"));

logCleanupFilter.produce(new LogCleanupFilterBuildItem("org.quartz.core.QuartzScheduler",
logCleanUps.add(new LogCleanupFilterBuildItem(org.quartz.core.QuartzScheduler.class.getName(),
"Quartz Scheduler v",
"JobFactory set to:",
"Scheduler meta-data:",
"Scheduler QuarkusQuartzScheduler"));

logCleanupFilter.produce(new LogCleanupFilterBuildItem("org.quartz.simpl.RAMJobStore",
"RAMJobStore initialized."));

logCleanupFilter.produce(new LogCleanupFilterBuildItem("org.quartz.core.SchedulerSignalerImpl",
logCleanUps.add(new LogCleanupFilterBuildItem(storeType.clazz, storeType.name + " initialized.", "Handling",
"Using db table-based data access locking", "JDBCJobStore threads will inherit ContextClassLoader of thread",
"Couldn't rollback jdbc connection", "Database connection shutdown unsuccessful"));
logCleanUps.add(new LogCleanupFilterBuildItem(SchedulerSignalerImpl.class.getName(),
"Initialized Scheduler Signaller of type"));
logCleanUps.add(new LogCleanupFilterBuildItem(QuartzSchedulerThread.class.getName(),
"QuartzSchedulerThread Inheriting ContextClassLoader"));
logCleanUps.add(new LogCleanupFilterBuildItem(SimpleThreadPool.class.getName(),
"Job execution threads will use class loader of thread"));

logCleanUps.add(new LogCleanupFilterBuildItem(AttributeRestoringConnectionInvocationHandler.class.getName(),
"Failed restore connection's original"));
return logCleanUps;
}

@BuildStep
@Record(RUNTIME_INIT)
public void build(QuartzRuntimeConfig runtimeConfig, QuartzRecorder recorder, BeanContainerBuildItem beanContainer,
public void build(QuartzRuntimeConfig runtimeConfig, QuartzBuildTimeConfig buildTimeConfig, QuartzRecorder recorder,
BeanContainerBuildItem beanContainer,
BuildProducer<ServiceStartBuildItem> serviceStart) {
recorder.initialize(runtimeConfig, beanContainer.getValue());
recorder.initialize(runtimeConfig, buildTimeConfig, beanContainer.getValue());
// Make sure that StartupEvent is fired after the init
serviceStart.produce(new ServiceStartBuildItem("quartz"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.quarkus.quartz.runtime;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;

import javax.sql.DataSource;

import org.quartz.utils.PoolingConnectionProvider;

import io.quarkus.arc.Arc;
import io.quarkus.arc.ArcContainer;
import io.quarkus.arc.InstanceHandle;

public class QuarkusQuartzConnectionPoolProvider implements PoolingConnectionProvider {
private DataSource dataSource;
private static String dataSourceName;

public QuarkusQuartzConnectionPoolProvider() {
InstanceHandle<DataSource> instanceHandle;
ArcContainer container = Arc.container();
boolean useDefaultDataSource = "QUARKUS_QUARTZ_DEFAULT_DATASOURCE".equals(dataSourceName);
if (useDefaultDataSource) {
instanceHandle = container.instance(DataSource.class);
} else {
instanceHandle = container.instance(dataSourceName);
}
if (instanceHandle.isAvailable()) {
this.dataSource = instanceHandle.get();
} else {
String message = String.format(
"JDBC Store configured but '%s' datasource is missing. You can configure your datasource by following the guide available at: https://quarkus.io/guides/datasource-guide",
useDefaultDataSource ? "default" : dataSourceName);
throw new IllegalStateException(message);
}
}

@SuppressWarnings("unused")
public QuarkusQuartzConnectionPoolProvider(Properties properties) {
this();
}

@Override
public DataSource getDataSource() {
return dataSource;
}

@Override
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}

@Override
public void shutdown() {
// Do nothing as the connection will be closed inside the Agroal extension
}

@Override
public void initialize() {

}

static void setDataSourceName(String dataSourceName) {
QuarkusQuartzConnectionPoolProvider.dataSourceName = dataSourceName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.quarkus.quartz.runtime;

import java.util.Optional;

import org.quartz.impl.jdbcjobstore.DriverDelegate;

import io.quarkus.runtime.annotations.ConfigDocSection;
import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;

@ConfigRoot(name = "quartz", phase = ConfigPhase.BUILD_AND_RUN_TIME_FIXED)
public class QuartzBuildTimeConfig {
/**
* Enable cluster mode or not.
* <p>
* If enabled make sure to set the appropriate cluster properties.
*/
@ConfigItem
public Optional<Boolean> clustered;

/**
* The type of store to use. Possible values are: `ram`, `db`.
* <p>
* When using the `db` store type configuration value make sure that you have the agroal datasource configured. See
* <a href="https://quarkus.io/guides/datasource-guide"> Configuring your datasource</a> for more information.
* <p>
* The Quarkus scheduler does not create the necessary scheduling tables in database automatically.
* To create Quartz tables, visit <a href=
* "https://github.com/quartz-scheduler/quartz/blob/master/quartz-core/src/main/resources/org/quartz/impl/jdbcjobstore">Quartz
* table creation scripts </a> and pick a script file corresponding to your database in use.
*
*/
@ConfigItem(defaultValue = "ram")
public StoreType storeType;

/**
* Quartz JDBC store database configuration
*/
@ConfigDocSection
@ConfigItem(name = "db")
public QuartzJDBCStoreConfig dbStore;

@ConfigGroup
public static class QuartzJDBCStoreConfig {
/**
* The Quartz {@link DriverDelegate} driver dialect class to use.
* <p>
* Optionally needed when using the `db` store type {@link QuartzBuildTimeConfig#storeType}.
* <p>
* The delegate class must correspond to the type of driver in use.
* <br>
* <a href=
* "http://www.quartz-scheduler.org/documentation/2.4.0-SNAPSHOT/configuration.html#configuration-of-database-clustering-achieve-fail-over-and-load-balancing-with-jdbc-jobstore">Configuration
* of JDBC-Store</>
*/
@ConfigItem(defaultValue = "org.quartz.impl.jdbcjobstore.StdJDBCDelegate")
public Optional<String> driverDialect;

/**
* The name of the datasource to use.
* <p>
* Optionally needed when using the `db` store type {@link QuartzJDBCStoreConfig#storeType}.
* If not specified default to using the default datasource.
*/
@ConfigItem(name = "name")
public Optional<String> dataSourceName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
@Recorder
public class QuartzRecorder {

public void initialize(QuartzRuntimeConfig runtimeConfig, BeanContainer container) {
public void initialize(QuartzRuntimeConfig runtimeConfig, QuartzBuildTimeConfig buildTimeConfig, BeanContainer container) {
QuartzSupport support = container.instance(QuartzSupport.class);
support.initialize(runtimeConfig);
support.initialize(runtimeConfig, buildTimeConfig);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;

@ConfigRoot(phase = ConfigPhase.RUN_TIME)
@ConfigRoot(name = "quartz", phase = ConfigPhase.RUN_TIME)
public class QuartzRuntimeConfig {

/**
Expand Down
Loading

0 comments on commit 8981a0b

Please sign in to comment.