Skip to content

Commit

Permalink
Provide programmative transaction for MongoDB reactive with Panache
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed May 24, 2023
1 parent 071a89c commit 86ac859
Show file tree
Hide file tree
Showing 8 changed files with 463 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package io.quarkus.mongodb.panache.common.reactive;

import java.util.UUID;
import java.util.function.Supplier;

import org.reactivestreams.Publisher;

import com.mongodb.reactivestreams.client.ClientSession;

import io.quarkus.mongodb.panache.common.runtime.BeanUtils;
import io.quarkus.mongodb.reactive.ReactiveMongoClient;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import mutiny.zero.flow.adapters.AdaptersToFlow;

/**
* Utility class for reactive MongoDB with Panache.
*/
public class Panache {
private static final String ERROR_MSG = "MongoDB reactive with Panache requires a safe (isolated) Vert.x sub-context, but the current context hasn't been flagged as such.";

private static final UUID SESSION_KEY = UUID.randomUUID();

/**
* Performs the given work within the scope of a MongoDB transaction.
* The transaction will be rolled back if the work completes with an uncaught exception.
*
* @param <T> The function's return type
* @param work The function to execute in the new transaction
* @return the result of executing the function
*/
public static <T> Uni<T> withTransaction(Supplier<Uni<T>> work) {
Context context = vertxContext();
ClientSession current = context.getLocal(SESSION_KEY);
if (current != null && current.hasActiveTransaction()) {
// reactive session exists - reuse this session
return work.get();
} else {
// reactive session does not exist - open a new one and close it when the returned Uni completes
return Panache.startSession()
.invoke(s -> s.startTransaction())
.invoke(s -> context.putLocal(SESSION_KEY, s))
.chain(s -> work.get())
.call(() -> commitTransaction())
.onFailure().call(() -> abortTransaction())
.eventually(() -> Panache.closeSession());
}
}

/**
* Allow to access the current MongoDB session.
* The session will only exist in the context of a reactive MongoDB with Panache transaction started with
* <code>Panache.withTransaction()</code>.
*
* @see #withTransaction(Supplier)
* @return the current ClientSession or null if none.
*/
public static ClientSession getCurrentSession() {
Context context = vertxContext();
return context.getLocal(SESSION_KEY);
}

private static Uni<?> abortTransaction() {
Context context = vertxContext();
ClientSession current = context.getLocal(SESSION_KEY);
return toUni(current.abortTransaction());
}

private static Uni<?> commitTransaction() {
Context context = vertxContext();
ClientSession current = context.getLocal(SESSION_KEY);
return toUni(current.commitTransaction());
}

private static <T> Uni<T> toUni(Publisher<T> publisher) {
Context context = Vertx.currentContext();
Uni<T> uni = Uni.createFrom().publisher(AdaptersToFlow.publisher(publisher));
if (context != null) {
return uni.emitOn(command -> context.runOnContext(x -> command.run()));
}
return uni;
}

private static Uni<ClientSession> startSession() {
ReactiveMongoClient client = BeanUtils.clientFromArc(null, ReactiveMongoClient.class, true);
return client.startSession();
}

private static void closeSession() {
Context context = vertxContext();
ClientSession current = context.getLocal(SESSION_KEY);
current.close();
}

/**
*
* @return the current vertx duplicated context
* @throws IllegalStateException If no vertx context is found or is not a safe context as mandated by the
* {@link VertxContextSafetyToggle}
*/
private static Context vertxContext() {
Context context = Vertx.currentContext();
if (context != null) {
VertxContextSafetyToggle.validateContextIfExists(ERROR_MSG, ERROR_MSG);
return context;
} else {
throw new IllegalStateException("No current Vertx context found");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.OptionalInt;
import java.util.Set;

import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;

Expand All @@ -13,6 +14,7 @@
import com.mongodb.client.model.CountOptions;

import io.quarkus.mongodb.FindOptions;
import io.quarkus.mongodb.panache.common.reactive.Panache;
import io.quarkus.mongodb.panache.common.runtime.MongoPropertyUtil;
import io.quarkus.mongodb.reactive.ReactiveMongoCollection;
import io.quarkus.panache.common.Page;
Expand Down Expand Up @@ -168,9 +170,11 @@ public Uni<Long> count() {
countOptions.collation(collation);
}

count = mongoQuery == null
? collection.countDocuments()
: collection.countDocuments(mongoQuery, countOptions);
if (Panache.getCurrentSession() != null) {
count = collection.countDocuments(Panache.getCurrentSession(), getQuery(), countOptions);
} else {
count = collection.countDocuments(getQuery(), countOptions);
}
}
return count;
}
Expand All @@ -184,7 +188,8 @@ public <T extends Entity> Uni<List<T>> list() {
@SuppressWarnings("unchecked")
public <T extends Entity> Multi<T> stream() {
FindOptions options = buildOptions();
return mongoQuery == null ? collection.find(options) : collection.find(mongoQuery, options);
return Panache.getCurrentSession() != null ? collection.find(Panache.getCurrentSession(), getQuery(), options)
: collection.find(getQuery(), options);
}

public <T extends Entity> Uni<T> firstResult() {
Expand All @@ -194,14 +199,18 @@ public <T extends Entity> Uni<T> firstResult() {

public <T extends Entity> Uni<Optional<T>> firstResultOptional() {
FindOptions options = buildOptions(1);
Multi<T> results = mongoQuery == null ? collection.find(options) : collection.find(mongoQuery, options);
Multi<T> results = Panache.getCurrentSession() != null
? collection.find(Panache.getCurrentSession(), getQuery(), options)
: collection.find(getQuery(), options);
return results.collect().first().map(o -> Optional.ofNullable(o));
}

@SuppressWarnings("unchecked")
public <T extends Entity> Uni<T> singleResult() {
FindOptions options = buildOptions(2);
Multi<T> results = mongoQuery == null ? collection.find(options) : collection.find(mongoQuery, options);
Multi<T> results = Panache.getCurrentSession() != null
? collection.find(Panache.getCurrentSession(), getQuery(), options)
: collection.find(getQuery(), options);
return results.collect().asList().map(list -> {
if (list.size() != 1) {
throw new PanacheQueryException("There should be only one result");
Expand All @@ -213,7 +222,9 @@ public <T extends Entity> Uni<T> singleResult() {

public <T extends Entity> Uni<Optional<T>> singleResultOptional() {
FindOptions options = buildOptions(2);
Multi<T> results = mongoQuery == null ? collection.find(options) : collection.find(mongoQuery, options);
Multi<T> results = Panache.getCurrentSession() != null
? collection.find(Panache.getCurrentSession(), getQuery(), options)
: collection.find(getQuery(), options);
return results.collect().asList().map(list -> {
if (list.size() == 2) {
throw new PanacheQueryException("There should be no more than one result");
Expand Down Expand Up @@ -258,4 +269,8 @@ private FindOptions buildOptions(int maxResults) {
}
return options.limit(maxResults);
}

private Bson getQuery() {
return mongoQuery == null ? new BsonDocument() : mongoQuery;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.quarkus.mongodb.panache.common.MongoEntity;
import io.quarkus.mongodb.panache.common.binder.NativeQueryBinder;
import io.quarkus.mongodb.panache.common.binder.PanacheQlQueryBinder;
import io.quarkus.mongodb.panache.common.reactive.Panache;
import io.quarkus.mongodb.reactive.ReactiveMongoClient;
import io.quarkus.mongodb.reactive.ReactiveMongoCollection;
import io.quarkus.mongodb.reactive.ReactiveMongoDatabase;
Expand Down Expand Up @@ -213,6 +214,10 @@ public Uni<Void> delete(Object entity) {
BsonDocument document = getBsonDocument(collection, entity);
BsonValue id = document.get(ID);
BsonDocument query = new BsonDocument().append(ID, id);

if (Panache.getCurrentSession() != null) {
return collection.deleteOne(Panache.getCurrentSession(), query).onItem().ignore().andContinueWithNull();
}
return collection.deleteOne(query).onItem().ignore().andContinueWithNull();
}

Expand Down Expand Up @@ -250,10 +255,16 @@ public Uni<Void> nullUni() {
}

private Uni<Void> persist(ReactiveMongoCollection collection, Object entity) {
if (Panache.getCurrentSession() != null) {
return collection.insertOne(Panache.getCurrentSession(), entity).onItem().ignore().andContinueWithNull();
}
return collection.insertOne(entity).onItem().ignore().andContinueWithNull();
}

private Uni<Void> persist(ReactiveMongoCollection collection, List<Object> entities) {
if (Panache.getCurrentSession() != null) {
return collection.insertMany(Panache.getCurrentSession(), entities).onItem().ignore().andContinueWithNull();
}
return collection.insertMany(entities).onItem().ignore().andContinueWithNull();
}

Expand All @@ -264,6 +275,10 @@ private Uni<Void> update(ReactiveMongoCollection collection, Object entity) {
//then we get its id field and create a new Document with only this one that will be our replace query
BsonValue id = document.get(ID);
BsonDocument query = new BsonDocument().append(ID, id);

if (Panache.getCurrentSession() != null) {
return collection.replaceOne(Panache.getCurrentSession(), query, entity).onItem().ignore().andContinueWithNull();
}
return collection.replaceOne(query, entity).onItem().ignore().andContinueWithNull();
}

Expand All @@ -280,10 +295,17 @@ private Uni<Void> persistOrUpdate(ReactiveMongoCollection collection, Object ent
BsonValue id = document.get(ID);
if (id == null) {
//insert with autogenerated ID
if (Panache.getCurrentSession() != null) {
return collection.insertOne(Panache.getCurrentSession(), entity).onItem().ignore().andContinueWithNull();
}
return collection.insertOne(entity).onItem().ignore().andContinueWithNull();
} else {
//insert with user provided ID or update
BsonDocument query = new BsonDocument().append(ID, id);
if (Panache.getCurrentSession() != null) {
return collection.replaceOne(Panache.getCurrentSession(), query, entity, new ReplaceOptions().upsert(true))
.onItem().ignore().andContinueWithNull();
}
return collection.replaceOne(query, entity, new ReplaceOptions().upsert(true))
.onItem().ignore().andContinueWithNull();
}
Expand All @@ -309,6 +331,9 @@ private Uni<Void> persistOrUpdate(ReactiveMongoCollection collection, List<Objec
}
}

if (Panache.getCurrentSession() != null) {
return collection.bulkWrite(Panache.getCurrentSession(), bulk).onItem().ignore().andContinueWithNull();
}
return collection.bulkWrite(bulk).onItem().ignore().andContinueWithNull();
}

Expand Down Expand Up @@ -352,6 +377,10 @@ public Uni<Object> findById(Class<?> entityClass, Object id) {

public Uni<Optional> findByIdOptional(Class<?> entityClass, Object id) {
ReactiveMongoCollection collection = mongoCollection(entityClass);
if (Panache.getCurrentSession() != null) {
return collection.find(Panache.getCurrentSession(), new Document(ID, id)).collect().first()
.onItem().transform(Optional::ofNullable);
}
return collection.find(new Document(ID, id)).collect().first()
.onItem().transform(Optional::ofNullable);
}
Expand Down Expand Up @@ -602,20 +631,29 @@ public Multi<?> streamAll(Class<?> entityClass, Sort sort) {

public Uni<Long> count(Class<?> entityClass) {
ReactiveMongoCollection collection = mongoCollection(entityClass);
if (Panache.getCurrentSession() != null) {
return collection.countDocuments(Panache.getCurrentSession());
}
return collection.countDocuments();
}

public Uni<Long> count(Class<?> entityClass, String query, Object... params) {
String bindQuery = bindFilter(entityClass, query, params);
BsonDocument docQuery = BsonDocument.parse(bindQuery);
ReactiveMongoCollection collection = mongoCollection(entityClass);
if (Panache.getCurrentSession() != null) {
return collection.countDocuments(Panache.getCurrentSession(), docQuery);
}
return collection.countDocuments(docQuery);
}

public Uni<Long> count(Class<?> entityClass, String query, Map<String, Object> params) {
String bindQuery = bindFilter(entityClass, query, params);
BsonDocument docQuery = BsonDocument.parse(bindQuery);
ReactiveMongoCollection collection = mongoCollection(entityClass);
if (Panache.getCurrentSession() != null) {
return collection.countDocuments(Panache.getCurrentSession(), docQuery);
}
return collection.countDocuments(docQuery);
}

Expand All @@ -626,31 +664,46 @@ public Uni<Long> count(Class<?> entityClass, String query, Parameters params) {
//specific Mongo query
public Uni<Long> count(Class<?> entityClass, Document query) {
ReactiveMongoCollection<?> collection = mongoCollection(entityClass);
if (Panache.getCurrentSession() != null) {
return collection.countDocuments(Panache.getCurrentSession(), query);
}
return collection.countDocuments(query);
}

public Uni<Long> deleteAll(Class<?> entityClass) {
ReactiveMongoCollection<?> collection = mongoCollection(entityClass);
if (Panache.getCurrentSession() != null) {
return collection.deleteMany(Panache.getCurrentSession(), new Document()).map(DeleteResult::getDeletedCount);
}
return collection.deleteMany(new Document()).map(DeleteResult::getDeletedCount);
}

public Uni<Boolean> deleteById(Class<?> entityClass, Object id) {
ReactiveMongoCollection<?> collection = mongoCollection(entityClass);
Document query = new Document().append(ID, id);
if (Panache.getCurrentSession() != null) {
return collection.deleteOne(Panache.getCurrentSession(), query).map(results -> results.getDeletedCount() == 1);
}
return collection.deleteOne(query).map(results -> results.getDeletedCount() == 1);
}

public Uni<Long> delete(Class<?> entityClass, String query, Object... params) {
String bindQuery = bindFilter(entityClass, query, params);
BsonDocument docQuery = BsonDocument.parse(bindQuery);
ReactiveMongoCollection<?> collection = mongoCollection(entityClass);
if (Panache.getCurrentSession() != null) {
return collection.deleteMany(Panache.getCurrentSession(), docQuery).map(DeleteResult::getDeletedCount);
}
return collection.deleteMany(docQuery).map(DeleteResult::getDeletedCount);
}

public Uni<Long> delete(Class<?> entityClass, String query, Map<String, Object> params) {
String bindQuery = bindFilter(entityClass, query, params);
BsonDocument docQuery = BsonDocument.parse(bindQuery);
ReactiveMongoCollection<?> collection = mongoCollection(entityClass);
if (Panache.getCurrentSession() != null) {
return collection.deleteMany(Panache.getCurrentSession(), docQuery).map(DeleteResult::getDeletedCount);
}
return collection.deleteMany(docQuery).map(DeleteResult::getDeletedCount);
}

Expand All @@ -661,6 +714,9 @@ public Uni<Long> delete(Class<?> entityClass, String query, Parameters params) {
//specific Mongo query
public Uni<Long> delete(Class<?> entityClass, Document query) {
ReactiveMongoCollection<?> collection = mongoCollection(entityClass);
if (Panache.getCurrentSession() != null) {
return collection.deleteMany(Panache.getCurrentSession(), query).map(DeleteResult::getDeletedCount);
}
return collection.deleteMany(query).map(DeleteResult::getDeletedCount);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.bson.Document;
import org.bson.conversions.Bson;

import io.quarkus.mongodb.panache.common.reactive.Panache;
import io.quarkus.mongodb.panache.common.reactive.ReactivePanacheUpdate;
import io.quarkus.mongodb.reactive.ReactiveMongoCollection;
import io.quarkus.panache.common.Parameters;
Expand Down Expand Up @@ -56,6 +57,9 @@ public Uni<Long> all() {
}

private Uni<Long> executeUpdate(Bson query) {
if (Panache.getCurrentSession() != null) {
return collection.updateMany(Panache.getCurrentSession(), query, update).map(result -> result.getModifiedCount());
}
return collection.updateMany(query, update).map(result -> result.getModifiedCount());
}
}
Loading

0 comments on commit 86ac859

Please sign in to comment.