Skip to content

Commit

Permalink
Merge pull request #31729 from zhfeng/issue_31066_narayana_jdbc_store
Browse files Browse the repository at this point in the history
Support JDBC ObjectStore in narayana-jta extension
  • Loading branch information
yrodiere authored May 30, 2023
2 parents 55a7a31 + 6fc8350 commit c2546e7
Show file tree
Hide file tree
Showing 16 changed files with 370 additions and 55 deletions.
11 changes: 11 additions & 0 deletions docs/src/main/asciidoc/transaction.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,17 @@ NOTE: The `event` object represents the transaction ID, and defines `toString()`
TIP: In listener methods, you can access more information about the transaction in progress by accessing the `TransactionManager`,
which is a CDI bean and can be ``@Inject``ed.

== Configuring transaction log to be stored in a DataSource

The Narayana project has the capability to store the transaction logs into a JDBC Datasource; this should be our recommendation for users needing transaction recovery capabilities, especially when running in volatile containers.

To enable this capability, you need to set `quarkus.transaction-manager.object-store.type` to `jdbc` explicitly. Also, you can specify a datasource name to be used for the transaction log storage by setting `quarkus.transaction-manager.object-store.datasource`. It will use the default datasource configuration if not specified.

If you enable `quarkus.transaction-manager.object-store.create-table`, the transaction log table will be created automatically if it does not exist.

NOTE: When enabling this capability, the transaction node identifier must be set through `quarkus.transaction-manager.node-name`.


== Why always having a transaction manager?

Does it work everywhere I want to?::
Expand Down
4 changes: 4 additions & 0 deletions extensions/narayana-jta/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-narayana-jta</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-agroal-spi</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import static io.quarkus.deployment.annotations.ExecutionTime.RUNTIME_INIT;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import jakarta.annotation.Priority;
Expand All @@ -13,6 +16,8 @@
import com.arjuna.ats.arjuna.recovery.TransactionStatusConnectionManager;
import com.arjuna.ats.internal.arjuna.coordinator.CheckedActionFactoryImple;
import com.arjuna.ats.internal.arjuna.objectstore.ShadowNoFileLockStore;
import com.arjuna.ats.internal.arjuna.objectstore.jdbc.JDBCImple_driver;
import com.arjuna.ats.internal.arjuna.objectstore.jdbc.JDBCStore;
import com.arjuna.ats.internal.arjuna.recovery.AtomicActionExpiryScanner;
import com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule;
import com.arjuna.ats.internal.arjuna.recovery.ExpiredTransactionStatusManagerScanner;
Expand All @@ -30,19 +35,23 @@
import com.arjuna.ats.jta.common.JTAEnvironmentBean;
import com.arjuna.common.util.propertyservice.PropertiesFactory;

import io.quarkus.agroal.spi.JdbcDataSourceBuildItem;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.ContextRegistrationPhaseBuildItem;
import io.quarkus.arc.deployment.ContextRegistrationPhaseBuildItem.ContextConfiguratorBuildItem;
import io.quarkus.arc.deployment.CustomScopeBuildItem;
import io.quarkus.arc.deployment.GeneratedBeanBuildItem;
import io.quarkus.arc.deployment.GeneratedBeanGizmoAdaptor;
import io.quarkus.arc.deployment.SyntheticBeansRuntimeInitBuildItem;
import io.quarkus.arc.deployment.UnremovableBeanBuildItem;
import io.quarkus.deployment.Feature;
import io.quarkus.deployment.IsTest;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.Consume;
import io.quarkus.deployment.annotations.Produce;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageSystemPropertyBuildItem;
Expand Down Expand Up @@ -76,6 +85,7 @@ public NativeImageSystemPropertyBuildItem nativeImageSystemPropertyBuildItem() {
@Record(RUNTIME_INIT)
@Produce(NarayanaInitBuildItem.class)
public void build(NarayanaJtaRecorder recorder,
CombinedIndexBuildItem indexBuildItem,
BuildProducer<AdditionalBeanBuildItem> additionalBeans,
BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<RuntimeInitializedClassBuildItem> runtimeInit,
Expand All @@ -95,21 +105,25 @@ public void build(NarayanaJtaRecorder recorder,
runtimeInit.produce(new RuntimeInitializedClassBuildItem(JTAActionStatusServiceXAResourceOrphanFilter.class.getName()));
runtimeInit.produce(new RuntimeInitializedClassBuildItem(AtomicActionExpiryScanner.class.getName()));

reflectiveClass.produce(ReflectiveClassBuildItem.builder(JTAEnvironmentBean.class.getName(),
UserTransactionImple.class.getName(),
CheckedActionFactoryImple.class.getName(),
TransactionManagerImple.class.getName(),
TransactionSynchronizationRegistryImple.class.getName(),
ObjectStoreEnvironmentBean.class.getName(),
ShadowNoFileLockStore.class.getName(),
SocketProcessId.class.getName(),
AtomicActionRecoveryModule.class.getName(),
XARecoveryModule.class.getName(),
XAResourceRecord.class.getName(),
JTATransactionLogXAResourceOrphanFilter.class.getName(),
JTANodeNameXAResourceOrphanFilter.class.getName(),
JTAActionStatusServiceXAResourceOrphanFilter.class.getName(),
ExpiredTransactionStatusManagerScanner.class.getName()).build());
indexBuildItem.getIndex().getAllKnownSubclasses(JDBCImple_driver.class).stream()
.map(impl -> ReflectiveClassBuildItem.builder(impl.name().toString()).build())
.forEach(reflectiveClass::produce);
reflectiveClass.produce(ReflectiveClassBuildItem.builder(JTAEnvironmentBean.class,
UserTransactionImple.class,
CheckedActionFactoryImple.class,
TransactionManagerImple.class,
TransactionSynchronizationRegistryImple.class,
ObjectStoreEnvironmentBean.class,
ShadowNoFileLockStore.class,
JDBCStore.class,
SocketProcessId.class,
AtomicActionRecoveryModule.class,
XARecoveryModule.class,
XAResourceRecord.class,
JTATransactionLogXAResourceOrphanFilter.class,
JTANodeNameXAResourceOrphanFilter.class,
JTAActionStatusServiceXAResourceOrphanFilter.class,
ExpiredTransactionStatusManagerScanner.class).build());

AdditionalBeanBuildItem.Builder builder = AdditionalBeanBuildItem.builder();
builder.addBeanClass(TransactionalInterceptorSupports.class);
Expand All @@ -135,6 +149,18 @@ public void build(NarayanaJtaRecorder recorder,
recorder.setConfig(transactions);
}

@BuildStep
@Record(RUNTIME_INIT)
@Consume(NarayanaInitBuildItem.class)
@Consume(SyntheticBeansRuntimeInitBuildItem.class)
public void startRecoveryService(NarayanaJtaRecorder recorder,
List<JdbcDataSourceBuildItem> jdbcDataSourceBuildItems, TransactionManagerConfiguration transactions) {
Map<Boolean, String> namedDataSources = new HashMap<>();

jdbcDataSourceBuildItems.forEach(i -> namedDataSources.put(i.isDefault(), i.getName()));
recorder.startRecoveryService(transactions, namedDataSources);
}

@BuildStep(onlyIf = IsTest.class)
void testTx(BuildProducer<GeneratedBeanBuildItem> generatedBeanBuildItemBuildProducer,
BuildProducer<AdditionalBeanBuildItem> additionalBeans) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@
import jakarta.transaction.TransactionSynchronizationRegistry;
import jakarta.transaction.UserTransaction;

import org.jboss.logging.Logger;
import org.jboss.tm.JBossXATerminator;
import org.jboss.tm.XAResourceRecoveryRegistry;
import org.jboss.tm.usertx.UserTransactionRegistry;

import com.arjuna.ats.internal.jbossatx.jta.jca.XATerminator;
import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple;
import com.arjuna.ats.jbossatx.jta.RecoveryManagerService;

import io.quarkus.arc.Unremovable;

@Dependent
public class NarayanaJtaProducers {
private static final Logger log = Logger.getLogger(NarayanaJtaProducers.class);

@Produces
@ApplicationScoped
Expand All @@ -41,13 +42,8 @@ public jakarta.transaction.TransactionManager transactionManager() {

@Produces
@Singleton
public XAResourceRecoveryRegistry xaResourceRecoveryRegistry(TransactionManagerConfiguration config) {
RecoveryManagerService recoveryManagerService = new RecoveryManagerService();
if (config.enableRecovery) {
recoveryManagerService.create();
recoveryManagerService.start();
}
return recoveryManagerService;
public XAResourceRecoveryRegistry xaResourceRecoveryRegistry() {
return QuarkusRecoveryService.getInstance();
}

@Produces
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package io.quarkus.narayana.jta.runtime;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.jboss.logging.Logger;
Expand All @@ -13,13 +16,15 @@
import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
import com.arjuna.ats.arjuna.coordinator.TxControl;
import com.arjuna.ats.arjuna.recovery.RecoveryManager;
import com.arjuna.ats.internal.arjuna.objectstore.jdbc.JDBCStore;
import com.arjuna.ats.jta.common.JTAEnvironmentBean;
import com.arjuna.ats.jta.common.jtaPropertyManager;
import com.arjuna.common.internal.util.propertyservice.BeanPopulator;
import com.arjuna.common.util.propertyservice.PropertiesFactory;

import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
import io.quarkus.runtime.configuration.ConfigurationException;

@Recorder
public class NarayanaJtaRecorder {
Expand Down Expand Up @@ -72,12 +77,12 @@ public void disableTransactionStatusManager() {
}

public void setConfig(final TransactionManagerConfiguration transactions) {
BeanPopulator.getDefaultInstance(ObjectStoreEnvironmentBean.class)
.setObjectStoreDir(transactions.objectStoreDirectory);
BeanPopulator.getNamedInstance(ObjectStoreEnvironmentBean.class, "communicationStore")
.setObjectStoreDir(transactions.objectStoreDirectory);
BeanPopulator.getNamedInstance(ObjectStoreEnvironmentBean.class, "stateStore")
.setObjectStoreDir(transactions.objectStoreDirectory);
List<String> objectStores = Arrays.asList(null, "communicationStore", "stateStore");
if (transactions.objectStore.type.equals(ObjectStoreType.File_System)) {
objectStores.forEach(name -> setObjectStoreDir(name, transactions));
} else if (transactions.objectStore.type.equals(ObjectStoreType.JDBC)) {
objectStores.forEach(name -> setJDBCObjectStore(name, transactions));
}
BeanPopulator.getDefaultInstance(RecoveryEnvironmentBean.class)
.setRecoveryModuleClassNames(transactions.recoveryModules);
BeanPopulator.getDefaultInstance(RecoveryEnvironmentBean.class)
Expand All @@ -86,15 +91,56 @@ public void setConfig(final TransactionManagerConfiguration transactions) {
.setXaResourceOrphanFilterClassNames(transactions.xaResourceOrphanFilters);
}

private void setObjectStoreDir(String name, TransactionManagerConfiguration config) {
BeanPopulator.getNamedInstance(ObjectStoreEnvironmentBean.class, name).setObjectStoreDir(config.objectStore.directory);
}

private void setJDBCObjectStore(String name, TransactionManagerConfiguration config) {
final ObjectStoreEnvironmentBean instance = BeanPopulator.getNamedInstance(ObjectStoreEnvironmentBean.class, name);
instance.setObjectStoreType(JDBCStore.class.getName());
instance.setJdbcDataSource(new QuarkusDataSource(config.objectStore.datasource));
instance.setCreateTable(config.objectStore.createTable);
instance.setDropTable(config.objectStore.dropTable);
instance.setTablePrefix(config.objectStore.tablePrefix);
}

public void startRecoveryService(final TransactionManagerConfiguration transactions, Map<Boolean, String> dataSources) {
if (transactions.objectStore.type.equals(ObjectStoreType.JDBC)) {
if (transactions.objectStore.datasource.isEmpty()) {
dataSources.keySet().stream().filter(i -> i).findFirst().orElseThrow(
() -> new ConfigurationException(
"The Narayana JTA extension does not have a datasource configured,"
+ " so it defaults to the default datasource,"
+ " but that datasource is not configured."
+ " To solve this, either configure the default datasource,"
+ " referring to https://quarkus.io/guides/datasource for guidance,"
+ " or configure the datasource to use in the Narayana JTA extension "
+ " by setting property 'quarkus.transaction-manager.object-store.datasource' to the name of a configured datasource."));
} else {
String dsName = transactions.objectStore.datasource.get();
dataSources.values().stream().filter(i -> i.equals(dsName)).findFirst()
.orElseThrow(() -> new ConfigurationException(
"The Narayana JTA extension is configured to use the datasource '"
+ dsName
+ "' but that datasource is not configured."
+ " To solve this, either configure datasource " + dsName
+ " referring to https://quarkus.io/guides/datasource for guidance,"
+ " or configure another datasource to use in the Narayana JTA extension "
+ " by setting property 'quarkus.transaction-manager.object-store.datasource' to the name of a configured datasource."));
}
}
if (transactions.enableRecovery) {
QuarkusRecoveryService.getInstance().create();
QuarkusRecoveryService.getInstance().start();
}
}

public void handleShutdown(ShutdownContext context, TransactionManagerConfiguration transactions) {
context.addLastShutdownTask(new Runnable() {
@Override
public void run() {
if (transactions.enableRecovery) {
RecoveryManager.manager().terminate(true);
}
TransactionReaper.terminate(false);
context.addLastShutdownTask(() -> {
if (transactions.enableRecovery) {
RecoveryManager.manager().terminate(true);
}
TransactionReaper.terminate(false);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.quarkus.narayana.jta.runtime;

public enum ObjectStoreType {
File_System,
JDBC
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.quarkus.narayana.jta.runtime;

import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.Optional;
import java.util.logging.Logger;

import javax.sql.DataSource;

import jakarta.enterprise.inject.literal.NamedLiteral;

import io.quarkus.arc.Arc;

public class QuarkusDataSource implements DataSource {
private final Optional<String> dsName;
private volatile DataSource datasource;

public QuarkusDataSource(Optional<String> dsName) {
this.dsName = dsName;
}

private DataSource getDataSource() {
if (datasource == null) {
if (dsName.isEmpty()) {
datasource = Arc.container().instance(DataSource.class).get();
} else {
datasource = Arc.container().instance(DataSource.class, NamedLiteral.of(dsName.get())).get();
}
}

return datasource;
}

@Override
public Connection getConnection() throws SQLException {
return getDataSource().getConnection();
}

@Override
public Connection getConnection(final String user, final String passwd) throws SQLException {
return getDataSource().getConnection(user, passwd);
}

@Override
public PrintWriter getLogWriter() throws SQLException {
return getDataSource().getLogWriter();
}

@Override
public void setLogWriter(final PrintWriter writer) throws SQLException {
getDataSource().setLogWriter(writer);
}

@Override
public void setLoginTimeout(final int timeout) throws SQLException {
getDataSource().setLoginTimeout(timeout);
}

@Override
public int getLoginTimeout() throws SQLException {
return getDataSource().getLoginTimeout();
}

@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return getDataSource().getParentLogger();
}

@Override
public <T> T unwrap(final Class<T> aClass) throws SQLException {
return getDataSource().unwrap(aClass);
}

@Override
public boolean isWrapperFor(final Class<?> aClass) throws SQLException {
return getDataSource().isWrapperFor(aClass);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.quarkus.narayana.jta.runtime;

import com.arjuna.ats.jbossatx.jta.RecoveryManagerService;

public class QuarkusRecoveryService {
private static RecoveryManagerService recoveryManagerService;

public static RecoveryManagerService getInstance() {
if (recoveryManagerService == null) {
recoveryManagerService = new RecoveryManagerService();
}
return recoveryManagerService;
}

private QuarkusRecoveryService() {
}
}
Loading

0 comments on commit c2546e7

Please sign in to comment.