diff --git a/extensions/hibernate-orm/deployment/pom.xml b/extensions/hibernate-orm/deployment/pom.xml index 84d6dd9a6d89d..e8714c990a185 100644 --- a/extensions/hibernate-orm/deployment/pom.xml +++ b/extensions/hibernate-orm/deployment/pom.xml @@ -69,6 +69,11 @@ quarkus-resteasy-deployment test + + io.quarkus + quarkus-resteasy-jackson-deployment + test + io.quarkus quarkus-hibernate-validator-deployment diff --git a/extensions/hibernate-orm/deployment/src/test/java/io/quarkus/narayana/quarkus/Fruit.java b/extensions/hibernate-orm/deployment/src/test/java/io/quarkus/narayana/quarkus/Fruit.java new file mode 100644 index 0000000000000..eac44d3c7e4c3 --- /dev/null +++ b/extensions/hibernate-orm/deployment/src/test/java/io/quarkus/narayana/quarkus/Fruit.java @@ -0,0 +1,49 @@ +package io.quarkus.narayana.quarkus; + +import jakarta.persistence.Cacheable; +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.NamedQuery; +import jakarta.persistence.QueryHint; +import jakarta.persistence.SequenceGenerator; +import jakarta.persistence.Table; + +@Entity +@Table(name = "known_fruits") +@NamedQuery(name = "Fruits.findAll", query = "SELECT f FROM Fruit f ORDER BY f.name", hints = @QueryHint(name = "org.hibernate.cacheable", value = "true")) +@Cacheable +public class Fruit { + @Id + @SequenceGenerator(name = "fruitsSequence", sequenceName = "known_fruits_id_seq", allocationSize = 1, initialValue = 10) + @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "fruitsSequence") + private Integer id; + + @Column(length = 40, unique = true) + private String name; + + public Fruit() { + } + + public Fruit(String name) { + this.name = name; + } + + public Integer getId() { + return id; + } + + public void setId(Integer id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +} diff --git a/extensions/hibernate-orm/deployment/src/test/java/io/quarkus/narayana/quarkus/FruitResource.java b/extensions/hibernate-orm/deployment/src/test/java/io/quarkus/narayana/quarkus/FruitResource.java new file mode 100644 index 0000000000000..97ee669fb678a --- /dev/null +++ b/extensions/hibernate-orm/deployment/src/test/java/io/quarkus/narayana/quarkus/FruitResource.java @@ -0,0 +1,129 @@ +package io.quarkus.narayana.quarkus; + +import java.util.List; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Event; +import jakarta.inject.Inject; +import jakarta.persistence.EntityManager; +import jakarta.transaction.Transactional; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.ext.ExceptionMapper; +import jakarta.ws.rs.ext.Provider; + +import org.jboss.logging.Logger; +import org.jboss.resteasy.annotations.jaxrs.PathParam; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +@Path("fruits") +@ApplicationScoped +@Produces("application/json") +@Consumes("application/json") +public class FruitResource { + + private static final Logger LOGGER = Logger.getLogger(FruitResource.class.getName()); + + @Inject + EntityManager entityManager; + + @Inject + Event eventBus; + + @GET + public List get() { + return entityManager.createNamedQuery("Fruits.findAll", Fruit.class) + .getResultList(); + } + + @GET + @Path("{id}") + public Fruit getSingle(@PathParam Integer id) { + Fruit entity = entityManager.find(Fruit.class, id); + if (entity == null) { + throw new WebApplicationException("Fruit with id of " + id + " does not exist.", 404); + } + return entity; + } + + @POST + @Transactional + public Response create(Fruit fruit) { + if (fruit.getId() != null) { + throw new WebApplicationException("Id was invalidly set on request.", 422); + } + + entityManager.persist(fruit); + eventBus.fire(fruit); + return Response.ok(fruit).status(201).build(); + } + + @PUT + @Path("{id}") + @Transactional + public Fruit update(@PathParam Integer id, Fruit fruit) { + if (fruit.getName() == null) { + throw new WebApplicationException("Fruit Name was not set on request.", 422); + } + + Fruit entity = entityManager.find(Fruit.class, id); + + if (entity == null) { + throw new WebApplicationException("Fruit with id of " + id + " does not exist.", 404); + } + + entity.setName(fruit.getName()); + return entity; + } + + @DELETE + @Path("{id}") + @Transactional + public Response delete(@PathParam Integer id) { + Fruit entity = entityManager.getReference(Fruit.class, id); + if (entity == null) { + throw new WebApplicationException("Fruit with id of " + id + " does not exist.", 404); + } + entityManager.remove(entity); + return Response.status(204).build(); + } + + @Provider + public static class ErrorMapper implements ExceptionMapper { + + @Inject + ObjectMapper objectMapper; + + @Override + public Response toResponse(Exception exception) { + LOGGER.error("Failed to handle request", exception); + + int code = 500; + if (exception instanceof WebApplicationException) { + code = ((WebApplicationException) exception).getResponse().getStatus(); + } + + ObjectNode exceptionJson = objectMapper.createObjectNode(); + exceptionJson.put("exceptionType", exception.getClass().getName()); + exceptionJson.put("code", code); + + if (exception.getMessage() != null) { + exceptionJson.put("error", exception.getMessage()); + } + + return Response.status(code) + .entity(exceptionJson) + .build(); + } + + } +} diff --git a/extensions/hibernate-orm/deployment/src/test/java/io/quarkus/narayana/quarkus/TSRTest.java b/extensions/hibernate-orm/deployment/src/test/java/io/quarkus/narayana/quarkus/TSRTest.java new file mode 100644 index 0000000000000..eb5a2d7b078cc --- /dev/null +++ b/extensions/hibernate-orm/deployment/src/test/java/io/quarkus/narayana/quarkus/TSRTest.java @@ -0,0 +1,213 @@ +package io.quarkus.narayana.quarkus; + +import java.util.ArrayList; +import java.util.List; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Event; +import jakarta.enterprise.event.Observes; +import jakarta.enterprise.event.TransactionPhase; +import jakarta.inject.Inject; +import jakarta.persistence.EntityManager; +import jakarta.transaction.HeuristicMixedException; +import jakarta.transaction.HeuristicRollbackException; +import jakarta.transaction.NotSupportedException; +import jakarta.transaction.RollbackException; +import jakarta.transaction.Synchronization; +import jakarta.transaction.SystemException; +import jakarta.transaction.TransactionManager; +import jakarta.transaction.TransactionSynchronizationRegistry; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +/** + * Test that interposed synchronizations are called in the correct order + * See {@code AgroalOrderedLastSynchronizationList} for the implementation + */ +public class TSRTest { + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addClasses(Fruit.class, FruitResource.class) + .addAsResource("application-tsr.properties")); + + @Inject + TransactionSynchronizationRegistry tsr; + + @Inject + TransactionManager tm; + + @Inject + Event event; + + private enum SYNCH_TYPES { + AGROAL, + HIBERNATE, + OTHER + }; + + private static final List synchronizationCallbacks = new ArrayList<>(); + + @BeforeEach + public void before() { + synchronizationCallbacks.clear(); + } + + @Test + public void test() throws SystemException, NotSupportedException, HeuristicRollbackException, HeuristicMixedException, + RollbackException { + tm.begin(); + + RestAssured.given() + .when() + .body("{\"name\" : \"Pear\"}") + .contentType("application/json") + .post("/fruits") + .then() + .statusCode(201); + + // register a synchronization that registers more synchronizations during the beforeCompletion callback + tsr.registerInterposedSynchronization(new Synchronization() { + @Override + public void beforeCompletion() { + synchronizationCallbacks.add(SYNCH_TYPES.OTHER.name()); + + // Add another synchronization belonging to the same "category". + // This registration should succeed since it belongs to the same group that's currently being processed. + // But note that adding one for a group that has already finished should fail (but we cannot test that + // here since the other groups belong to different packages, ie hibernate and agroal). + tsr.registerInterposedSynchronization(new NormalSynchronization()); + } + + @Override + public void afterCompletion(int status) { + } + }); + + // cause ARC to register a callback for transaction lifecycle events (see ObservingBean), but since ARC + // uses a session synchronization this should *not* result in an interposed synchronization being registered + event.fire("commit"); + + tm.commit(); + + /* + * Check that the two normal synchronizations added by this test were invoked. + * The actual list is maintained by {@code AgroalOrderedLastSynchronizationList} + * and it will also include interposed synchronizations added by hibernate + * and Agroal as a result of calling the above hibernate query. + * If you want to verify that the order is correct then run the test under + * the control of a debugger and look at the order of the list maintained + * by the AgroalOrderedLastSynchronizationList class. + */ + Assertions.assertEquals(2, synchronizationCallbacks.size()); + Assertions.assertEquals(SYNCH_TYPES.OTHER.name(), synchronizationCallbacks.get(0)); + Assertions.assertEquals(SYNCH_TYPES.OTHER.name(), synchronizationCallbacks.get(1)); + } + + @Test + public void testException() + throws SystemException, NotSupportedException, HeuristicRollbackException, HeuristicMixedException, + RollbackException { + final String MESSAGE = "testException from synchronization"; + final NormalSynchronization normalSynchronization = new NormalSynchronization(); + + tm.begin(); + + RestAssured.given() + .when() + .body("{\"name\" : \"Orange\"}") // use a different fruit from the other tests in this suite + .contentType("application/json") + .post("/fruits") + .then() + .statusCode(201); + + // register a synchronization that registers more synchronizations during the beforeCompletion callback + tsr.registerInterposedSynchronization(new Synchronization() { + @Override + public void beforeCompletion() { + synchronizationCallbacks.add(SYNCH_TYPES.OTHER.name()); + + // Add another synchronization belonging to the same "category". + // This registration should succeed since it belongs to the same group that's currently being processed. + // But note that adding one for a group that has already finished should fail (but we cannot test that + // here since the other groups belong to different packages, ie hibernate and agroal). + tsr.registerInterposedSynchronization(normalSynchronization); + + // throw an exception to verify that the other beforeCompletion synchronizations still execute + throw new RuntimeException(MESSAGE); + } + + @Override + public void afterCompletion(int status) { + } + }); + + try { + tm.commit(); + + Assertions.fail("Expected commit to throw an exception"); + } catch (RollbackException | HeuristicMixedException | HeuristicRollbackException | SecurityException + | IllegalStateException | SystemException e) { + Assertions.assertNotNull(e.getCause(), "expected exception cause to be present"); + Assertions.assertTrue(e.getCause().getMessage().endsWith(MESSAGE), + "expected a different exception message"); + + // the synchronization registered a synchronization (the variable normalSynchronization) + // just before it threw the exception so now check that it was still called: + Assertions.assertTrue(normalSynchronization.wasInvoked(), + "the synchronization registered before the exception should have ran"); + + /* + * Check that the two normal synchronizations added by this test were invoked. + * The actual list is maintained by {@code AgroalOrderedLastSynchronizationList} + * and it will also include interposed synchronizations added by hibernate + * and Agroal as a result of calling the above hibernate query. + * If you want to verify that the order is correct then run the test under + * the control of a debugger and look at the order of the list maintained + * by the AgroalOrderedLastSynchronizationList class. + */ + Assertions.assertEquals(2, synchronizationCallbacks.size()); + Assertions.assertEquals(SYNCH_TYPES.OTHER.name(), synchronizationCallbacks.get(0)); + Assertions.assertEquals(SYNCH_TYPES.OTHER.name(), synchronizationCallbacks.get(1)); + } + } + + @ApplicationScoped + static class ObservingBean { + @Inject + EntityManager entityManager; + + // observing beforeCompletion is what triggered the issue about the need to order Agroal synchronizations last + public void observeBeforeCompletion(@Observes(during = TransactionPhase.BEFORE_COMPLETION) String payload) { + final var list = entityManager.createNamedQuery("Fruits.findAll", Fruit.class).getResultList(); + Assertions.assertFalse(list.isEmpty()); // a Pear should have been added + } + } + + // define another synchronization to test various things such as verifying that synchronizations can be + // registered by other synchronizations and that later synchronizations still run even though earlier ones + // may have thrown exceptions + private static class NormalSynchronization implements Synchronization { + private boolean invoked; + + @Override + public void beforeCompletion() { + synchronizationCallbacks.add(SYNCH_TYPES.OTHER.name()); + invoked = true; + } + + @Override + public void afterCompletion(int status) { + } + + public boolean wasInvoked() { + return invoked; + } + } +} diff --git a/extensions/hibernate-orm/deployment/src/test/resources/application-tsr.properties b/extensions/hibernate-orm/deployment/src/test/resources/application-tsr.properties new file mode 100644 index 0000000000000..4254ad3a251f5 --- /dev/null +++ b/extensions/hibernate-orm/deployment/src/test/resources/application-tsr.properties @@ -0,0 +1,5 @@ +quarkus.datasource.db-kind=h2 +quarkus.datasource.jdbc.url=jdbc:h2:mem:test + +quarkus.hibernate-orm.database.generation=drop-and-create +quarkus.hibernate-orm.conn.packages=io.quarkus.narayana.quarkus diff --git a/extensions/hibernate-orm/runtime/src/main/java/io/quarkus/hibernate/orm/runtime/customized/QuarkusJtaPlatform.java b/extensions/hibernate-orm/runtime/src/main/java/io/quarkus/hibernate/orm/runtime/customized/QuarkusJtaPlatform.java index f44cde9b90597..9828bcf9c2028 100644 --- a/extensions/hibernate-orm/runtime/src/main/java/io/quarkus/hibernate/orm/runtime/customized/QuarkusJtaPlatform.java +++ b/extensions/hibernate-orm/runtime/src/main/java/io/quarkus/hibernate/orm/runtime/customized/QuarkusJtaPlatform.java @@ -1,20 +1,25 @@ package io.quarkus.hibernate.orm.runtime.customized; +import static jakarta.transaction.Status.STATUS_ACTIVE; + import jakarta.transaction.Synchronization; import jakarta.transaction.SystemException; import jakarta.transaction.Transaction; import jakarta.transaction.TransactionManager; +import jakarta.transaction.TransactionSynchronizationRegistry; import jakarta.transaction.UserTransaction; -import org.hibernate.engine.transaction.internal.jta.JtaStatusHelper; import org.hibernate.engine.transaction.jta.platform.internal.TransactionManagerAccess; import org.hibernate.engine.transaction.jta.platform.spi.JtaPlatform; import org.hibernate.engine.transaction.jta.platform.spi.JtaPlatformException; +import io.quarkus.arc.Arc; + public final class QuarkusJtaPlatform implements JtaPlatform, TransactionManagerAccess { public static final QuarkusJtaPlatform INSTANCE = new QuarkusJtaPlatform(); + private volatile TransactionSynchronizationRegistry transactionSynchronizationRegistry; private volatile TransactionManager transactionManager; private volatile UserTransaction userTransaction; @@ -22,6 +27,16 @@ private QuarkusJtaPlatform() { //nothing } + public TransactionSynchronizationRegistry retrieveTransactionSynchronizationRegistry() { + TransactionSynchronizationRegistry transactionSynchronizationRegistry = this.transactionSynchronizationRegistry; + if (transactionSynchronizationRegistry == null) { + transactionSynchronizationRegistry = Arc.container().instance(TransactionSynchronizationRegistry.class).get(); + + this.transactionSynchronizationRegistry = transactionSynchronizationRegistry; + } + return transactionSynchronizationRegistry; + } + @Override public TransactionManager retrieveTransactionManager() { TransactionManager transactionManager = this.transactionManager; @@ -63,7 +78,8 @@ public void registerSynchronization(Synchronization synchronization) { @Override public boolean canRegisterSynchronization() { - return JtaStatusHelper.isActive(getTransactionManager()); + // no need to check STATUS_MARKED_ROLLBACK since synchronizations can't be registered in that state + return retrieveTransactionSynchronizationRegistry().getTransactionStatus() == STATUS_ACTIVE; } @Override diff --git a/extensions/narayana-jta/deployment/src/test/java/io/quarkus/narayana/quarkus/QuarkusTransactionTest.java b/extensions/narayana-jta/deployment/src/test/java/io/quarkus/narayana/quarkus/QuarkusTransactionTest.java index f895d26c31fe3..4f7f8c0cfdb9a 100644 --- a/extensions/narayana-jta/deployment/src/test/java/io/quarkus/narayana/quarkus/QuarkusTransactionTest.java +++ b/extensions/narayana-jta/deployment/src/test/java/io/quarkus/narayana/quarkus/QuarkusTransactionTest.java @@ -3,22 +3,34 @@ import static io.quarkus.narayana.jta.QuarkusTransaction.beginOptions; import static io.quarkus.narayana.jta.QuarkusTransaction.runOptions; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; import jakarta.enterprise.context.ContextNotActiveException; import jakarta.enterprise.context.control.ActivateRequestContext; import jakarta.enterprise.inject.Produces; import jakarta.inject.Inject; import jakarta.inject.Singleton; +import jakarta.transaction.HeuristicMixedException; +import jakarta.transaction.HeuristicRollbackException; +import jakarta.transaction.NotSupportedException; import jakarta.transaction.RollbackException; import jakarta.transaction.Status; import jakarta.transaction.Synchronization; import jakarta.transaction.SystemException; +import jakarta.transaction.Transaction; import jakarta.transaction.TransactionManager; import jakarta.transaction.TransactionScoped; +import jakarta.transaction.TransactionSynchronizationRegistry; import org.eclipse.microprofile.context.ThreadContext; import org.jboss.shrinkwrap.api.ShrinkWrap; @@ -35,7 +47,10 @@ public class QuarkusTransactionTest { - private static AtomicInteger counter = new AtomicInteger(); + private static final AtomicInteger counter = new AtomicInteger(); + + @Inject + TransactionSynchronizationRegistry tsr; @Inject TransactionManager transactionManager; @@ -241,6 +256,8 @@ public void testCallJoinExisting() throws SystemException { @Test public void testConcurrentTransactionScopedBeanCreation() { + counter.set(0); + // 1. A Transaction is activated in a parent thread. QuarkusTransaction.run(() -> { ExecutorService executor = Executors.newCachedThreadPool(); @@ -264,6 +281,86 @@ public void testConcurrentTransactionScopedBeanCreation() { Assertions.assertEquals(1, counter.get()); } + @Test + public void testConcurrentTransactionScopedBeanCreationWithSynchronization() { + // test that propagating a transaction to other threads and use of Synchronizations do not interfere + counter.set(0); + + // 1. A Transaction is activated in a parent thread. + QuarkusTransaction.run(() -> { + ExecutorService executor = Executors.newCachedThreadPool(); + + try { + Transaction txn = testBean.doWorkWithSynchronization(tsr, transactionManager); + + // 2. The parent thread starts 2 child threads, and propagates the transaction. + // 3. The child threads access a @TransactionScoped bean concurrently, + Future f1 = executor + .submit(threadContext + .contextualCallable(() -> testBean.doWorkWithSynchronization(tsr, transactionManager))); + Future f2 = executor + .submit(threadContext + .contextualCallable(() -> testBean.doWorkWithSynchronization(tsr, transactionManager))); + + Transaction t1 = f1.get(); + Transaction t2 = f2.get(); + + // the Synchronization callbacks for the parent thread and the two child threads should + // all have run with the same transaction context + Assertions.assertEquals(t1, txn); + Assertions.assertEquals(t2, txn); + } catch (Throwable e) { + throw new AssertionError("Should not have thrown", e); + } finally { + executor.shutdownNow(); + } + }); + } + + @Test + public void testConcurrentWithSynchronization() { + // test that Synchronizations registered with concurrent transactions do not interfere + Collection> callables = new ArrayList<>(); + IntStream.rangeClosed(1, 8) + .forEach(i -> callables.add(() -> { + try { + // start a txn + // then register an interposed Synchronization + // then commit the txn + // and then verify the Synchronization ran with the same transaction + TestInterposedSync t = new TestInterposedSync(tsr, transactionManager); + transactionManager.begin(); + Transaction txn = transactionManager.getTransaction(); + tsr.registerInterposedSynchronization(t); + transactionManager.commit(); + // check that the transaction seen by the Synchronization is same as the one we just started + Assertions.assertEquals(txn, t.getContext(), "Synchronization ran with the wrong context"); + } catch (NotSupportedException | SystemException | RollbackException | HeuristicMixedException + | HeuristicRollbackException | SecurityException | IllegalStateException e) { + throw new RuntimeException(e); + } + return null; + })); + + ExecutorService executor = Executors.newCachedThreadPool(); + + try { + List> futures = executor.invokeAll(callables); + futures.forEach(f -> { + try { + // verify that the task did not throw an exception + f.get(); + } catch (InterruptedException | ExecutionException e) { + throw new AssertionError("Should not have thrown", e); + } + }); + } catch (InterruptedException e) { + throw new AssertionError("Should not have thrown", e); + } finally { + executor.shutdownNow(); + } + } + TestSync register() { TestSync t = new TestSync(); try { @@ -289,10 +386,51 @@ public void afterCompletion(int status) { } } + static class TestInterposedSync implements Synchronization { + private final TransactionManager tm; + private Transaction context; + int completionStatus = -1; + + public TestInterposedSync(TransactionSynchronizationRegistry tsr, TransactionManager tm) { + this.tm = tm; + } + + @Override + public void beforeCompletion() { + try { + // remember the transaction context used to run the Synchronization + context = tm.getTransaction(); + } catch (SystemException e) { + throw new RuntimeException(e); + } + } + + @Override + public void afterCompletion(int status) { + this.completionStatus = status; + } + + public Transaction getContext() { + // report the transaction context used to run the Synchronization + return context; + } + } + static class TransactionScopedTestBean { public void doWork() { } + + public Transaction doWorkWithSynchronization(TransactionSynchronizationRegistry tsr, TransactionManager tm) { + TestInterposedSync t = new TestInterposedSync(tsr, tm); + + try { + tsr.registerInterposedSynchronization(t); + return tm.getTransaction(); + } catch (Exception e) { + throw new AssertionError("Should not have thrown", e); + } + } } @Singleton diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/NarayanaJtaProducers.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/NarayanaJtaProducers.java index b784d5b92e6df..327acd72af42c 100644 --- a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/NarayanaJtaProducers.java +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/NarayanaJtaProducers.java @@ -16,6 +16,7 @@ import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple; import io.quarkus.arc.Unremovable; +import io.quarkus.narayana.jta.runtime.internal.tsr.TransactionSynchronizationRegistryWrapper; @Dependent public class NarayanaJtaProducers { @@ -50,7 +51,7 @@ public XAResourceRecoveryRegistry xaResourceRecoveryRegistry() { @ApplicationScoped @Unremovable public TransactionSynchronizationRegistry transactionSynchronizationRegistry() { - return new TransactionSynchronizationRegistryImple(); + return new TransactionSynchronizationRegistryWrapper(new TransactionSynchronizationRegistryImple()); } @Produces diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/internal/tsr/AgroalOrderedLastSynchronizationList.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/internal/tsr/AgroalOrderedLastSynchronizationList.java new file mode 100644 index 0000000000000..51ee738db44b6 --- /dev/null +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/internal/tsr/AgroalOrderedLastSynchronizationList.java @@ -0,0 +1,190 @@ +package io.quarkus.narayana.jta.runtime.internal.tsr; + +import java.util.ArrayList; +import java.util.List; + +import jakarta.transaction.Status; +import jakarta.transaction.Synchronization; +import jakarta.transaction.TransactionSynchronizationRegistry; + +import org.jboss.logging.Logger; + +public class AgroalOrderedLastSynchronizationList implements Synchronization { + private static final Logger LOGGER = Logger.getLogger(AgroalOrderedLastSynchronizationList.class); + private static final String ADD_SYNC_ERROR = "Syncs are not allowed because the group of synchronizations to which this sync belongs has already ran"; + private static final String REGISTER_SYNC_ERROR = "Syncs are not allowed to be registered when the transaction is in state "; + + // order the groups of synchronization as follows (Agroal is last since it needs to validate that + // connection wrappers are closed at the right time): + private static final String[] PKG_PREFIXES = { "", "org.hibernate", "io.agroal.narayana" }; + + private final List synchGroups = new ArrayList<>(); + private SynchronizationGroup otherSynchs; + private final TransactionSynchronizationRegistry tsr; + private volatile Throwable deferredThrowable; // remember the first beforeCompletion exception + + /* + * Keep track of whether a synchronization group has been processed. + * If a group of synchs has already been processed then do not allow further synchs to be registered in that group. + * If a group of synchs is currently being processed then allow it to be registered. + * But note that no synchronizations can be registered after the transaction has finished preparing. + */ + private enum ExecutionStatus { + PENDING, // the synchronization has not started executing + RUNNING, // the synchronization is executing + FINISHED // the synchronization has executed + } + + /* + * Synchronizations are grouped by package prefix and these groups are ordered such that the + * synchronizations in the first group execute first, then the second group is processed, etc. + * In particular, the Agroal synchronization group runs last. + * + * The beforeCompletion methods within a group are called in the order they were added, + * and the afterCompletion methods are ran in the reverse order + */ + private class SynchronizationGroup implements Synchronization { + String packagePrefix; // Synchronizations with this package prefix belong to this group + final List synchs; // the Synchronizations in the group + volatile ExecutionStatus status; // track the status to decide when it's too late to allow more registrations + + public SynchronizationGroup(String packagePrefix) { + this.packagePrefix = packagePrefix; + this.synchs = new ArrayList<>(); + this.status = ExecutionStatus.PENDING; + } + + public void add(Synchronization synchronization) { + if (status == ExecutionStatus.FINISHED) { + // this group of syncs have already ran + throw new IllegalStateException(ADD_SYNC_ERROR); + } + synchs.add(synchronization); + } + + @Override + public void beforeCompletion() { + status = ExecutionStatus.RUNNING; + + // Note that because synchronizations can register other synchronizations + // we cannot use enhanced for loops as that could cause a concurrency exception + for (int i = 0; i < synchs.size(); i++) { + Synchronization sync = synchs.get(i); + + try { + sync.beforeCompletion(); + } catch (Exception e) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debugf( + "The synchronization %s associated with tx key %s failed during beforeCompletion: %s", + sync, tsr.getTransactionKey(), e.getMessage()); + } + + if (deferredThrowable == null) { + // only save the first failure + deferredThrowable = e; + } + } + } + + status = ExecutionStatus.FINISHED; + } + + @Override + public void afterCompletion(int status) { + // The list should be iterated in reverse order + for (int i = synchs.size(); i-- > 0;) { + synchs.get(i).afterCompletion(status); + } + } + + // does packageName belong to this group of synchronizations + private boolean shouldAdd(String packageName) { + return !packagePrefix.isEmpty() && packageName.startsWith(packagePrefix); + } + } + + public AgroalOrderedLastSynchronizationList( + TransactionSynchronizationRegistryWrapper transactionSynchronizationRegistryWrapper) { + + this.tsr = transactionSynchronizationRegistryWrapper; + + for (var packagePrefix : PKG_PREFIXES) { + var synchronizationGroup = new SynchronizationGroup(packagePrefix); + + synchGroups.add(synchronizationGroup); + + if (packagePrefix.isEmpty()) { + otherSynchs = synchronizationGroup; // the catch-all group + } + } + } + + /** + * Register an interposed synchronization. Note that synchronizations are not allowed if: + *

+ * + * @param synchronization The synchronization to register + * @throws IllegalStateException if the transaction is in the wrong state: + *

    + *
  1. the transaction has already prepared; + *
  2. the transaction is marked rollback only + *
  3. the group that the synchronization should belong to has already been processed + *
+ */ + public void registerInterposedSynchronization(Synchronization synchronization) { + int status = tsr.getTransactionStatus(); + + switch (status) { + case Status.STATUS_ACTIVE: + case Status.STATUS_PREPARING: + break; + default: + throw new IllegalStateException(REGISTER_SYNC_ERROR + status); + } + + // add the synchronization to the group that matches this package and, if there is no such group + // then add it to the catch-all group (otherSyncs) + String packageName = synchronization.getClass().getName(); + SynchronizationGroup synchGroup = otherSynchs; + + for (SynchronizationGroup g : synchGroups) { + if (g.shouldAdd(packageName)) { + synchGroup = g; + break; + } + } + + synchGroup.add(synchronization); + } + + /** + * Exceptions from beforeCompletion Synchronizations are not caught because such errors should cause the + * transaction to roll back. + */ + @Override + public void beforeCompletion() { + // run each group of synchs according to the order they were added to the list + for (SynchronizationGroup g : synchGroups) { + g.beforeCompletion(); + } + + if (deferredThrowable != null) { + /* + * If any Synchronization threw an exception then only report the first one. + * + * Cause the transaction to rollback. The underlying transaction manager will catch the runtime + * exception and re-throw it when it does the rollback + */ + throw new RuntimeException(deferredThrowable); + } + } + + @Override + public void afterCompletion(int status) { + // run each group of synchs according to the order they were added to the list + for (SynchronizationGroup g : synchGroups) { + g.afterCompletion(status); + } + } +} diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/internal/tsr/TransactionSynchronizationRegistryWrapper.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/internal/tsr/TransactionSynchronizationRegistryWrapper.java new file mode 100644 index 0000000000000..3c88840159e7d --- /dev/null +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/internal/tsr/TransactionSynchronizationRegistryWrapper.java @@ -0,0 +1,87 @@ +package io.quarkus.narayana.jta.runtime.internal.tsr; + +import jakarta.transaction.Synchronization; +import jakarta.transaction.TransactionSynchronizationRegistry; + +import org.jboss.logging.Logger; + +import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple; + +/** + * Agroal registers an interposed synchronization which validates that connections have been released. + * Components such as hibernate release connections in an interposed synchronization. + * Therefore, we must ensure that Agroal runs last. + *

+ * + * This wrapper re-orders interposed synchronizations as follows: [other, hibernate-orm, agroal]. + *

+ * + * Synchronizations are placed into groups according to their package name and the groups are ordered which means + * that all hibernate synchronizations run before Agroal ones and all other synchs run before the hibernate ones. + *

+ * + * See {@code AgroalOrderedLastSynchronizationList} for details of the re-ordering. + */ +public class TransactionSynchronizationRegistryWrapper implements TransactionSynchronizationRegistry { + private final Object key = new Object(); + private static final Logger LOG = Logger.getLogger(TransactionSynchronizationRegistryWrapper.class); + + private final TransactionSynchronizationRegistryImple tsr; + private transient com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple delegate; + + public TransactionSynchronizationRegistryWrapper( + TransactionSynchronizationRegistryImple transactionSynchronizationRegistryImple) { + this.tsr = transactionSynchronizationRegistryImple; + } + + @Override + public void registerInterposedSynchronization(Synchronization sync) { + AgroalOrderedLastSynchronizationList agroalOrderedLastSynchronization = (AgroalOrderedLastSynchronizationList) tsr + .getResource(key); + + if (agroalOrderedLastSynchronization == null) { + synchronized (key) { + agroalOrderedLastSynchronization = (AgroalOrderedLastSynchronizationList) tsr.getResource(key); + if (agroalOrderedLastSynchronization == null) { + agroalOrderedLastSynchronization = new AgroalOrderedLastSynchronizationList(this); + + tsr.putResource(key, agroalOrderedLastSynchronization); + tsr.registerInterposedSynchronization(agroalOrderedLastSynchronization); + } + } + } + + // add the synchronization to the list that does the reordering + agroalOrderedLastSynchronization.registerInterposedSynchronization(sync); + } + + @Override + public Object getTransactionKey() { + return tsr.getTransactionKey(); + } + + @Override + public int getTransactionStatus() { + return tsr.getTransactionStatus(); + } + + @Override + public boolean getRollbackOnly() { + return tsr.getRollbackOnly(); + } + + @Override + public void setRollbackOnly() { + tsr.setRollbackOnly(); + } + + @Override + public Object getResource(Object key) { + return tsr.getResource(key); + } + + @Override + public void putResource(Object key, Object value) { + tsr.putResource(key, value); + } +}