From c08f715627e6f870838d4d6211c3f2d5bca6c8bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Wed, 5 Apr 2023 14:10:06 +0200 Subject: [PATCH] Provide programmative transaction for MongoDB reactive with Panache Fixes #32156 --- docs/src/main/asciidoc/mongodb-panache.adoc | 25 ++- .../panache/common/reactive/Panache.java | 112 ++++++++++++++ .../CommonReactivePanacheQueryImpl.java | 29 +++- .../runtime/ReactiveMongoOperations.java | 56 +++++++ .../runtime/ReactivePanacheUpdateImpl.java | 4 + .../ReactiveTransactionPerson.java | 14 ++ .../TransactionPersonResource.java | 104 +++++++++++++ ...ReactiveMongodbPanacheTransactionTest.java | 144 ++++++++++++++++++ ...tiveNativeMongodbPanacheTransactionIT.java | 7 + 9 files changed, 485 insertions(+), 10 deletions(-) create mode 100644 extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/common/reactive/Panache.java create mode 100644 integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/transaction/ReactiveTransactionPerson.java create mode 100644 integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/transaction/TransactionPersonResource.java create mode 100644 integration-tests/mongodb-panache/src/test/java/io/quarkus/it/mongodb/panache/reactive/transaction/ReactiveMongodbPanacheTransactionTest.java create mode 100644 integration-tests/mongodb-panache/src/test/java/io/quarkus/it/mongodb/panache/reactive/transaction/ReactiveNativeMongodbPanacheTransactionIT.java diff --git a/docs/src/main/asciidoc/mongodb-panache.adoc b/docs/src/main/asciidoc/mongodb-panache.adoc index c9821898e0afbb..ee08c8145e56e8 100644 --- a/docs/src/main/asciidoc/mongodb-panache.adoc +++ b/docs/src/main/asciidoc/mongodb-panache.adoc @@ -771,8 +771,6 @@ To use them with MongoDB with Panache you need to annotate the method that start In MongoDB, a transaction is only possible on a replicaset, luckily our xref:mongodb.adoc#dev-services[Dev Services for MongoDB] setups a single node replicaset so it is compatible with transactions. -WARNING: Transaction support inside MongoDB with Panache is still experimental. - == Custom IDs IDs are often a touchy subject. In MongoDB, they are usually auto-generated by the database with an `ObjectId` type. @@ -1006,7 +1004,28 @@ public Multi streamPersons() { TIP: `@RestStreamElementType(MediaType.APPLICATION_JSON)` tells RESTEasy Reactive to serialize the object in JSON. -WARNING: Transactions are not supported for Reactive Entities and Repositories. +=== Reactive transactions + +MongoDB offers ACID transactions since version 4.0. + +To use them with reactive entities or repositories you need to use `io.quarkus.mongodb.panache.common.reactive.Panache.withTransaction()`. + +[source,java] +---- +@POST +public Uni addPerson(ReactiveTransactionPerson person) { + return Panache.withTransaction(() -> person.persist().map(v -> { + //the ID is populated before sending it to the database + String id = person.id.toString(); + return Response.created(URI.create("/reactive-transaction/" + id)).build(); + })); +} +---- + +In MongoDB, a transaction is only possible on a replicaset, +luckily our xref:mongodb.adoc#dev-services[Dev Services for MongoDB] setups a single node replicaset so it is compatible with transactions. + +WARNING: Reactive transaction support inside MongoDB with Panache is still experimental. == Mocking diff --git a/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/common/reactive/Panache.java b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/common/reactive/Panache.java new file mode 100644 index 00000000000000..166767f094a49a --- /dev/null +++ b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/common/reactive/Panache.java @@ -0,0 +1,112 @@ +package io.quarkus.mongodb.panache.common.reactive; + +import java.util.UUID; +import java.util.function.Supplier; + +import org.reactivestreams.Publisher; + +import com.mongodb.reactivestreams.client.ClientSession; + +import io.quarkus.mongodb.panache.common.runtime.BeanUtils; +import io.quarkus.mongodb.reactive.ReactiveMongoClient; +import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Vertx; +import mutiny.zero.flow.adapters.AdaptersToFlow; + +/** + * Utility class for reactive MongoDB with Panache. + */ +public class Panache { + private static final String ERROR_MSG = "MongoDB reactive with Panache requires a safe (isolated) Vert.x sub-context, but the current context hasn't been flagged as such."; + + private static final UUID SESSION_KEY = UUID.randomUUID(); + + /** + * Performs the given work within the scope of a MongoDB transaction. + * The transaction will be rolled back if the work completes with an uncaught exception. + * + * @param The function's return type + * @param work The function to execute in the new transaction + * @return the result of executing the function + */ + public static Uni withTransaction(Supplier> work) { + Context context = vertxContext(); + ClientSession current = context.getLocal(SESSION_KEY); + if (current != null && current.hasActiveTransaction()) { + // reactive session exists - reuse this session + return work.get(); + } else { + // reactive session does not exist - open a new one and close it when the returned Uni completes + return Panache.startSession() + .invoke(s -> s.startTransaction()) + .invoke(s -> context.putLocal(SESSION_KEY, s)) + .chain(s -> work.get()) + .call(() -> commitTransaction()) + .onFailure().call(() -> abortTransaction()) + .eventually(() -> Panache.closeSession()); + } + } + + /** + * Allow to access the current MongoDB session. + * The session will only exist in the context of a reactive MongoDB with Panache transaction started with + * Panache.withTransaction(). + * + * @see #withTransaction(Supplier) + * @return the current ClientSession or null if none. + */ + public static ClientSession getCurrentSession() { + Context context = Vertx.currentContext(); + return context != null ? context.getLocal(SESSION_KEY) : null; + } + + private static Uni abortTransaction() { + Context context = vertxContext(); + ClientSession current = context.getLocal(SESSION_KEY); + return toUni(current.abortTransaction()); + } + + private static Uni commitTransaction() { + Context context = vertxContext(); + ClientSession current = context.getLocal(SESSION_KEY); + return toUni(current.commitTransaction()); + } + + private static Uni toUni(Publisher publisher) { + Context context = Vertx.currentContext(); + Uni uni = Uni.createFrom().publisher(AdaptersToFlow.publisher(publisher)); + if (context != null) { + return uni.emitOn(command -> context.runOnContext(x -> command.run())); + } + return uni; + } + + private static Uni startSession() { + ReactiveMongoClient client = BeanUtils.clientFromArc(null, ReactiveMongoClient.class, true); + return client.startSession(); + } + + private static void closeSession() { + Context context = vertxContext(); + ClientSession current = context.getLocal(SESSION_KEY); + current.close(); + } + + /** + * + * @return the current vertx duplicated context + * @throws IllegalStateException If no vertx context is found or is not a safe context as mandated by the + * {@link VertxContextSafetyToggle} + */ + private static Context vertxContext() { + Context context = Vertx.currentContext(); + if (context != null) { + VertxContextSafetyToggle.validateContextIfExists(ERROR_MSG, ERROR_MSG); + return context; + } else { + throw new IllegalStateException("No current Vertx context found"); + } + } +} diff --git a/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/common/reactive/runtime/CommonReactivePanacheQueryImpl.java b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/common/reactive/runtime/CommonReactivePanacheQueryImpl.java index a3bce40fb0142c..09b16966bfcd2a 100644 --- a/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/common/reactive/runtime/CommonReactivePanacheQueryImpl.java +++ b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/common/reactive/runtime/CommonReactivePanacheQueryImpl.java @@ -5,6 +5,7 @@ import java.util.OptionalInt; import java.util.Set; +import org.bson.BsonDocument; import org.bson.Document; import org.bson.conversions.Bson; @@ -13,6 +14,7 @@ import com.mongodb.client.model.CountOptions; import io.quarkus.mongodb.FindOptions; +import io.quarkus.mongodb.panache.common.reactive.Panache; import io.quarkus.mongodb.panache.common.runtime.MongoPropertyUtil; import io.quarkus.mongodb.reactive.ReactiveMongoCollection; import io.quarkus.panache.common.Page; @@ -168,9 +170,11 @@ public Uni count() { countOptions.collation(collation); } - count = mongoQuery == null - ? collection.countDocuments() - : collection.countDocuments(mongoQuery, countOptions); + if (Panache.getCurrentSession() != null) { + count = collection.countDocuments(Panache.getCurrentSession(), getQuery(), countOptions); + } else { + count = collection.countDocuments(getQuery(), countOptions); + } } return count; } @@ -184,7 +188,8 @@ public Uni> list() { @SuppressWarnings("unchecked") public Multi stream() { FindOptions options = buildOptions(); - return mongoQuery == null ? collection.find(options) : collection.find(mongoQuery, options); + return Panache.getCurrentSession() != null ? collection.find(Panache.getCurrentSession(), getQuery(), options) + : collection.find(getQuery(), options); } public Uni firstResult() { @@ -194,14 +199,18 @@ public Uni firstResult() { public Uni> firstResultOptional() { FindOptions options = buildOptions(1); - Multi results = mongoQuery == null ? collection.find(options) : collection.find(mongoQuery, options); + Multi results = Panache.getCurrentSession() != null + ? collection.find(Panache.getCurrentSession(), getQuery(), options) + : collection.find(getQuery(), options); return results.collect().first().map(o -> Optional.ofNullable(o)); } @SuppressWarnings("unchecked") public Uni singleResult() { FindOptions options = buildOptions(2); - Multi results = mongoQuery == null ? collection.find(options) : collection.find(mongoQuery, options); + Multi results = Panache.getCurrentSession() != null + ? collection.find(Panache.getCurrentSession(), getQuery(), options) + : collection.find(getQuery(), options); return results.collect().asList().map(list -> { if (list.size() != 1) { throw new PanacheQueryException("There should be only one result"); @@ -213,7 +222,9 @@ public Uni singleResult() { public Uni> singleResultOptional() { FindOptions options = buildOptions(2); - Multi results = mongoQuery == null ? collection.find(options) : collection.find(mongoQuery, options); + Multi results = Panache.getCurrentSession() != null + ? collection.find(Panache.getCurrentSession(), getQuery(), options) + : collection.find(getQuery(), options); return results.collect().asList().map(list -> { if (list.size() == 2) { throw new PanacheQueryException("There should be no more than one result"); @@ -258,4 +269,8 @@ private FindOptions buildOptions(int maxResults) { } return options.limit(maxResults); } + + private Bson getQuery() { + return mongoQuery == null ? new BsonDocument() : mongoQuery; + } } diff --git a/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/common/reactive/runtime/ReactiveMongoOperations.java b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/common/reactive/runtime/ReactiveMongoOperations.java index 955af2d4436792..c0fda7ed87a249 100644 --- a/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/common/reactive/runtime/ReactiveMongoOperations.java +++ b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/common/reactive/runtime/ReactiveMongoOperations.java @@ -33,6 +33,7 @@ import io.quarkus.mongodb.panache.common.MongoEntity; import io.quarkus.mongodb.panache.common.binder.NativeQueryBinder; import io.quarkus.mongodb.panache.common.binder.PanacheQlQueryBinder; +import io.quarkus.mongodb.panache.common.reactive.Panache; import io.quarkus.mongodb.reactive.ReactiveMongoClient; import io.quarkus.mongodb.reactive.ReactiveMongoCollection; import io.quarkus.mongodb.reactive.ReactiveMongoDatabase; @@ -213,6 +214,10 @@ public Uni delete(Object entity) { BsonDocument document = getBsonDocument(collection, entity); BsonValue id = document.get(ID); BsonDocument query = new BsonDocument().append(ID, id); + + if (Panache.getCurrentSession() != null) { + return collection.deleteOne(Panache.getCurrentSession(), query).onItem().ignore().andContinueWithNull(); + } return collection.deleteOne(query).onItem().ignore().andContinueWithNull(); } @@ -250,10 +255,16 @@ public Uni nullUni() { } private Uni persist(ReactiveMongoCollection collection, Object entity) { + if (Panache.getCurrentSession() != null) { + return collection.insertOne(Panache.getCurrentSession(), entity).onItem().ignore().andContinueWithNull(); + } return collection.insertOne(entity).onItem().ignore().andContinueWithNull(); } private Uni persist(ReactiveMongoCollection collection, List entities) { + if (Panache.getCurrentSession() != null) { + return collection.insertMany(Panache.getCurrentSession(), entities).onItem().ignore().andContinueWithNull(); + } return collection.insertMany(entities).onItem().ignore().andContinueWithNull(); } @@ -264,6 +275,10 @@ private Uni update(ReactiveMongoCollection collection, Object entity) { //then we get its id field and create a new Document with only this one that will be our replace query BsonValue id = document.get(ID); BsonDocument query = new BsonDocument().append(ID, id); + + if (Panache.getCurrentSession() != null) { + return collection.replaceOne(Panache.getCurrentSession(), query, entity).onItem().ignore().andContinueWithNull(); + } return collection.replaceOne(query, entity).onItem().ignore().andContinueWithNull(); } @@ -280,10 +295,17 @@ private Uni persistOrUpdate(ReactiveMongoCollection collection, Object ent BsonValue id = document.get(ID); if (id == null) { //insert with autogenerated ID + if (Panache.getCurrentSession() != null) { + return collection.insertOne(Panache.getCurrentSession(), entity).onItem().ignore().andContinueWithNull(); + } return collection.insertOne(entity).onItem().ignore().andContinueWithNull(); } else { //insert with user provided ID or update BsonDocument query = new BsonDocument().append(ID, id); + if (Panache.getCurrentSession() != null) { + return collection.replaceOne(Panache.getCurrentSession(), query, entity, new ReplaceOptions().upsert(true)) + .onItem().ignore().andContinueWithNull(); + } return collection.replaceOne(query, entity, new ReplaceOptions().upsert(true)) .onItem().ignore().andContinueWithNull(); } @@ -309,6 +331,9 @@ private Uni persistOrUpdate(ReactiveMongoCollection collection, List findById(Class entityClass, Object id) { public Uni findByIdOptional(Class entityClass, Object id) { ReactiveMongoCollection collection = mongoCollection(entityClass); + if (Panache.getCurrentSession() != null) { + return collection.find(Panache.getCurrentSession(), new Document(ID, id)).collect().first() + .onItem().transform(Optional::ofNullable); + } return collection.find(new Document(ID, id)).collect().first() .onItem().transform(Optional::ofNullable); } @@ -602,6 +631,9 @@ public Multi streamAll(Class entityClass, Sort sort) { public Uni count(Class entityClass) { ReactiveMongoCollection collection = mongoCollection(entityClass); + if (Panache.getCurrentSession() != null) { + return collection.countDocuments(Panache.getCurrentSession()); + } return collection.countDocuments(); } @@ -609,6 +641,9 @@ public Uni count(Class entityClass, String query, Object... params) { String bindQuery = bindFilter(entityClass, query, params); BsonDocument docQuery = BsonDocument.parse(bindQuery); ReactiveMongoCollection collection = mongoCollection(entityClass); + if (Panache.getCurrentSession() != null) { + return collection.countDocuments(Panache.getCurrentSession(), docQuery); + } return collection.countDocuments(docQuery); } @@ -616,6 +651,9 @@ public Uni count(Class entityClass, String query, Map p String bindQuery = bindFilter(entityClass, query, params); BsonDocument docQuery = BsonDocument.parse(bindQuery); ReactiveMongoCollection collection = mongoCollection(entityClass); + if (Panache.getCurrentSession() != null) { + return collection.countDocuments(Panache.getCurrentSession(), docQuery); + } return collection.countDocuments(docQuery); } @@ -626,17 +664,26 @@ public Uni count(Class entityClass, String query, Parameters params) { //specific Mongo query public Uni count(Class entityClass, Document query) { ReactiveMongoCollection collection = mongoCollection(entityClass); + if (Panache.getCurrentSession() != null) { + return collection.countDocuments(Panache.getCurrentSession(), query); + } return collection.countDocuments(query); } public Uni deleteAll(Class entityClass) { ReactiveMongoCollection collection = mongoCollection(entityClass); + if (Panache.getCurrentSession() != null) { + return collection.deleteMany(Panache.getCurrentSession(), new Document()).map(DeleteResult::getDeletedCount); + } return collection.deleteMany(new Document()).map(DeleteResult::getDeletedCount); } public Uni deleteById(Class entityClass, Object id) { ReactiveMongoCollection collection = mongoCollection(entityClass); Document query = new Document().append(ID, id); + if (Panache.getCurrentSession() != null) { + return collection.deleteOne(Panache.getCurrentSession(), query).map(results -> results.getDeletedCount() == 1); + } return collection.deleteOne(query).map(results -> results.getDeletedCount() == 1); } @@ -644,6 +691,9 @@ public Uni delete(Class entityClass, String query, Object... params) { String bindQuery = bindFilter(entityClass, query, params); BsonDocument docQuery = BsonDocument.parse(bindQuery); ReactiveMongoCollection collection = mongoCollection(entityClass); + if (Panache.getCurrentSession() != null) { + return collection.deleteMany(Panache.getCurrentSession(), docQuery).map(DeleteResult::getDeletedCount); + } return collection.deleteMany(docQuery).map(DeleteResult::getDeletedCount); } @@ -651,6 +701,9 @@ public Uni delete(Class entityClass, String query, Map String bindQuery = bindFilter(entityClass, query, params); BsonDocument docQuery = BsonDocument.parse(bindQuery); ReactiveMongoCollection collection = mongoCollection(entityClass); + if (Panache.getCurrentSession() != null) { + return collection.deleteMany(Panache.getCurrentSession(), docQuery).map(DeleteResult::getDeletedCount); + } return collection.deleteMany(docQuery).map(DeleteResult::getDeletedCount); } @@ -661,6 +714,9 @@ public Uni delete(Class entityClass, String query, Parameters params) { //specific Mongo query public Uni delete(Class entityClass, Document query) { ReactiveMongoCollection collection = mongoCollection(entityClass); + if (Panache.getCurrentSession() != null) { + return collection.deleteMany(Panache.getCurrentSession(), query).map(DeleteResult::getDeletedCount); + } return collection.deleteMany(query).map(DeleteResult::getDeletedCount); } diff --git a/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/common/reactive/runtime/ReactivePanacheUpdateImpl.java b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/common/reactive/runtime/ReactivePanacheUpdateImpl.java index 1ead8a325f277a..025f1c21f2be80 100644 --- a/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/common/reactive/runtime/ReactivePanacheUpdateImpl.java +++ b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/common/reactive/runtime/ReactivePanacheUpdateImpl.java @@ -6,6 +6,7 @@ import org.bson.Document; import org.bson.conversions.Bson; +import io.quarkus.mongodb.panache.common.reactive.Panache; import io.quarkus.mongodb.panache.common.reactive.ReactivePanacheUpdate; import io.quarkus.mongodb.reactive.ReactiveMongoCollection; import io.quarkus.panache.common.Parameters; @@ -56,6 +57,9 @@ public Uni all() { } private Uni executeUpdate(Bson query) { + if (Panache.getCurrentSession() != null) { + return collection.updateMany(Panache.getCurrentSession(), query, update).map(result -> result.getModifiedCount()); + } return collection.updateMany(query, update).map(result -> result.getModifiedCount()); } } diff --git a/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/transaction/ReactiveTransactionPerson.java b/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/transaction/ReactiveTransactionPerson.java new file mode 100644 index 00000000000000..a9948a0997edbb --- /dev/null +++ b/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/transaction/ReactiveTransactionPerson.java @@ -0,0 +1,14 @@ +package io.quarkus.it.mongodb.panache.reactive.transaction; + +import org.bson.codecs.pojo.annotations.BsonId; + +import io.quarkus.mongodb.panache.common.MongoEntity; +import io.quarkus.mongodb.panache.reactive.ReactivePanacheMongoEntityBase; + +@MongoEntity(database = "transaction-person") +public class ReactiveTransactionPerson extends ReactivePanacheMongoEntityBase { + @BsonId + public Long id; + public String firstname; + public String lastname; +} diff --git a/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/transaction/TransactionPersonResource.java b/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/transaction/TransactionPersonResource.java new file mode 100644 index 00000000000000..ef5f61b092eb17 --- /dev/null +++ b/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/transaction/TransactionPersonResource.java @@ -0,0 +1,104 @@ +package io.quarkus.it.mongodb.panache.reactive.transaction; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import jakarta.inject.Named; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; + +import com.mongodb.client.MongoClient; + +import io.quarkus.mongodb.panache.common.reactive.Panache; +import io.quarkus.mongodb.panache.reactive.ReactivePanacheMongoEntityBase; +import io.quarkus.runtime.StartupEvent; +import io.smallrye.mutiny.Uni; + +@Path("/reactive-transaction") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +public class TransactionPersonResource { + @Inject + @Named("cl2") + MongoClient mongoClient; + + void initDb(@Observes StartupEvent startupEvent) { + // in case of transaction, the collection needs to exist prior to using it + if (!mongoClient.getDatabase("transaction-person").listCollectionNames().into(new ArrayList<>()) + .contains("ReactiveTransactionPerson")) { + mongoClient.getDatabase("transaction-person").createCollection("ReactiveTransactionPerson"); + } + } + + @GET + public Uni> getPersons() { + return Panache.withTransaction(() -> ReactiveTransactionPerson.listAll()); + } + + @POST + public Uni addPerson(ReactiveTransactionPerson person) { + return Panache.withTransaction(() -> person.persist().map(v -> { + //the ID is populated before sending it to the database + String id = person.id.toString(); + return Response.created(URI.create("/reactive-transaction/" + id)).build(); + })); + } + + @POST + @Path("/exception") + public Uni addPersonTwice(ReactiveTransactionPerson person) { + return Panache.withTransaction(() -> person.persist().call(p -> { + throw new RuntimeException("You shall not pass"); + })); + } + + @PUT + public Uni updatePerson(ReactiveTransactionPerson person) { + return Panache.withTransaction(() -> person.update().map(p -> Response.accepted().build())); + } + + @DELETE + @Path("/{id}") + public Uni deletePerson(@PathParam("id") String id) { + return Panache.withTransaction(() -> ReactiveTransactionPerson.findById(Long.parseLong(id)) + .flatMap(p -> p.delete())) + .map(ignore -> Response.noContent().build()); + } + + @GET + @Path("/{id}") + public Uni getPerson(@PathParam("id") String id) { + return Panache.withTransaction(() -> ReactiveTransactionPerson.findById(Long.parseLong(id))); + } + + @GET + @Path("/count") + public Uni countAll() { + return Panache.withTransaction(() -> ReactiveTransactionPerson.count()); + } + + @DELETE + public Uni deleteAll() { + return Panache.withTransaction(() -> ReactiveTransactionPerson.deleteAll()); + } + + @POST + @Path("/rename") + public Uni rename(@QueryParam("previousName") String previousName, @QueryParam("newName") String newName) { + return Panache + .withTransaction(() -> ReactiveTransactionPerson.update("lastname", newName).where("lastname", previousName) + .map(l -> Response.ok().build())); + } +} diff --git a/integration-tests/mongodb-panache/src/test/java/io/quarkus/it/mongodb/panache/reactive/transaction/ReactiveMongodbPanacheTransactionTest.java b/integration-tests/mongodb-panache/src/test/java/io/quarkus/it/mongodb/panache/reactive/transaction/ReactiveMongodbPanacheTransactionTest.java new file mode 100644 index 00000000000000..ee1789da83dec8 --- /dev/null +++ b/integration-tests/mongodb-panache/src/test/java/io/quarkus/it/mongodb/panache/reactive/transaction/ReactiveMongodbPanacheTransactionTest.java @@ -0,0 +1,144 @@ +package io.quarkus.it.mongodb.panache.reactive.transaction; + +import static io.restassured.RestAssured.get; + +import java.util.List; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import io.quarkus.it.mongodb.panache.transaction.PersonDTO; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.mongodb.MongoReplicaSetTestResource; +import io.restassured.RestAssured; +import io.restassured.common.mapper.TypeRef; +import io.restassured.config.ObjectMapperConfig; +import io.restassured.parsing.Parser; +import io.restassured.response.Response; + +@QuarkusTest +@QuarkusTestResource(MongoReplicaSetTestResource.class) +class ReactiveMongodbPanacheTransactionTest { + private static final TypeRef> LIST_OF_PERSON_TYPE_REF = new TypeRef>() { + }; + + @Test + public void testTheEndpoint() { + String endpoint = "/reactive-transaction"; + RestAssured.defaultParser = Parser.JSON; + RestAssured.config + .objectMapperConfig(new ObjectMapperConfig().jackson2ObjectMapperFactory((type, s) -> new ObjectMapper() + .registerModule(new Jdk8Module()) + .registerModule(new JavaTimeModule()) + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS))); + + //delete all + Response response = RestAssured + .given() + .delete(endpoint) + .andReturn(); + Assertions.assertEquals(200, response.statusCode()); + + List list = get(endpoint).as(LIST_OF_PERSON_TYPE_REF); + Assertions.assertEquals(0, list.size()); + + PersonDTO person1 = new PersonDTO(); + person1.id = 1L; + person1.firstname = "John"; + person1.lastname = "Doe"; + response = RestAssured + .given() + .header("Content-Type", "application/json") + .body(person1) + .post(endpoint) + .andReturn(); + Assertions.assertEquals(201, response.statusCode()); + + PersonDTO person2 = new PersonDTO(); + person2.id = 2L; + person2.firstname = "Jane"; + person2.lastname = "Doh!"; + response = RestAssured + .given() + .header("Content-Type", "application/json") + .body(person2) + .post(endpoint) + .andReturn(); + Assertions.assertEquals(201, response.statusCode()); + + list = get(endpoint).as(LIST_OF_PERSON_TYPE_REF); + Assertions.assertEquals(2, list.size()); + + // This will insert Charles Baudelaire then throws an exception. + // As we are in a transaction Charles Baudelaire will not be saved. + PersonDTO person3 = new PersonDTO(); + person3.id = 3L; + person3.firstname = "Charles"; + person3.lastname = "Baudelaire"; + response = RestAssured + .given() + .header("Content-Type", "application/json") + .body(person3) + .post(endpoint + "/exception") + .andReturn(); + Assertions.assertEquals(500, response.statusCode()); + + list = get(endpoint).as(LIST_OF_PERSON_TYPE_REF); + Assertions.assertEquals(2, list.size()); + + //count + Long count = get(endpoint + "/count").as(Long.class); + Assertions.assertEquals(2, count); + + //update a person + person2.lastname = "Doe"; + response = RestAssured + .given() + .header("Content-Type", "application/json") + .body(person2) + .put(endpoint) + .andReturn(); + Assertions.assertEquals(202, response.statusCode()); + + //check that the title has been updated + person2 = get(endpoint + "/" + person2.id.toString()).as(PersonDTO.class); + Assertions.assertEquals(2L, person2.id); + Assertions.assertEquals("Doe", person2.lastname); + + //rename the Doe + response = RestAssured + .given() + .queryParam("previousName", "Doe").queryParam("newName", "Dupont") + .header("Content-Type", "application/json") + .when().post(endpoint + "/rename") + .andReturn(); + Assertions.assertEquals(200, response.statusCode()); + + //delete a person + response = RestAssured + .given() + .delete(endpoint + "/" + person2.id.toString()) + .andReturn(); + Assertions.assertEquals(204, response.statusCode()); + + count = get(endpoint + "/count").as(Long.class); + Assertions.assertEquals(1, count); + + //delete all + response = RestAssured + .given() + .delete(endpoint) + .andReturn(); + Assertions.assertEquals(200, response.statusCode()); + + count = get(endpoint + "/count").as(Long.class); + Assertions.assertEquals(0, count); + } + +} diff --git a/integration-tests/mongodb-panache/src/test/java/io/quarkus/it/mongodb/panache/reactive/transaction/ReactiveNativeMongodbPanacheTransactionIT.java b/integration-tests/mongodb-panache/src/test/java/io/quarkus/it/mongodb/panache/reactive/transaction/ReactiveNativeMongodbPanacheTransactionIT.java new file mode 100644 index 00000000000000..2c8b957c23c581 --- /dev/null +++ b/integration-tests/mongodb-panache/src/test/java/io/quarkus/it/mongodb/panache/reactive/transaction/ReactiveNativeMongodbPanacheTransactionIT.java @@ -0,0 +1,7 @@ +package io.quarkus.it.mongodb.panache.reactive.transaction; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +class ReactiveNativeMongodbPanacheTransactionIT extends ReactiveMongodbPanacheTransactionTest { +}