Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support chaining Kafka Transactions with Hibernate ORM transactions #40530

Open
Sgitario opened this issue May 9, 2024 · 13 comments
Open

Support chaining Kafka Transactions with Hibernate ORM transactions #40530

Sgitario opened this issue May 9, 2024 · 13 comments
Labels
area/kafka kind/bug Something isn't working

Comments

@Sgitario
Copy link
Contributor

Sgitario commented May 9, 2024

Describe the bug

I'm using a Quarkus service that requires:

  1. consume one message from a channel
  2. perform some database queries
  3. emit several messages to another channel as a result of the database queries

All the above steps need to be executed in a single transaction, so for example, if the database fails, the message is nack, or if the emission of the messages fails the database queries are rollback.

In the Quarkus service, we're using the smallrye messaging kafka and the Hibernate ORM extensions. I'm aware that there is an existing section in the Kafka guide to chain transactions between Kafka and Hibernate Reactive: https://quarkus.io/guides/kafka#chaining-kafka-transactions-with-hibernate-reactive-transactions, but nothing about to do that with Hibernate ORM which seems to me like a very important requirement.

I've written a reproducer in https://github.com/Sgitario/quarkus-examples/tree/main/kafka-orm-transactions with several approaches I tried (see NewAuthorConsumer.java).

In the reproducer, the primary key of author is the name, so we can't have two authors with the same name. As part of the initial "import.sql", we have created an author "Jose" without books. Then, we have a resource to send a message to a topic:

@Path("/api")
public class Resource {

    @Channel("new-author-out")
    MutinyEmitter<String> newAuthor;

    @Transactional
    @POST
    @Path("/authors/{name}")
    public void submitAuthor(@RestPath String name) {
        newAuthor.sendAndAwait(name);
    }
}

And this message will be consumed by the class NewAuthorConsumer that will insert the author using the given name and send another message to another topic to create the books. What I expected here is that when adding another author with the same name "Jose" and a constraint exception is thrown, the messages to the book topic are not sent.

The implementations of the NewAuthorConsumer that I tried:

  • Using the KafkaTransactions:
/**
 * Approach 3: Using KafkaTransactions
 */
@ApplicationScoped
public class NewAuthorConsumer {

    @Channel("new-book-out") KafkaTransactions<String> kafkaTx;

    @Incoming("new-author-in")
    Uni<Void> consume(Message<String> message) {
        Log.infof("Received new message in authors topic with name `{}`", message.getPayload());
        return kafkaTx.withTransactionAndAck(message, emitter -> {
            persistAuthor(message.getPayload());
            emitter.send(message.getPayload() + "-book1");
            return Uni.createFrom().voidItem();
        }).replaceWithVoid();
    }

    @Transactional
    public void persistAuthor(String name) {
        Author author = new Author();
        author.name = name;
        author.persist();
    }
}

As expected, this fails because with:

Caused by: io.quarkus.runtime.BlockingOperationNotAllowedException: Cannot start a JTA transaction from the IO thread.

The error is expected and correct.

  • Using Emitter:
@ApplicationScoped
public class NewAuthorConsumer {

    @Channel("new-book-out") Emitter<String> newBook;

    @Transactional
    @Incoming("new-author-in")
    CompletionStage<Void> consume(String authorName) {
        Log.infof("Received new message in authors topic with name `{}`", authorName);
        Author author = new Author();
        author.name = authorName;
        author.persist();
        return newBook.send(authorName + "-book1");
    }
}

This time when adding an existing author, it fails to be inserted but the book message is wrongly sent and hence the book is inserted into the db.

Expected behavior

Spite of the nature of the reactive model of the smallrye messaging extension, it should be a way to chain transactions between kafka and Hibernate ORM.
If there is a way to do this, it would need to be documented.
If not, I feel this is a very strong limitation, yet I'm not sure if this should be a bug or a feature request.

I also raised the same question in smallrye/smallrye-reactive-messaging#2609.

Actual behavior

The kafka and the Hiberante ORM transactions are not chained.

How to Reproduce?

1.- git clone https://github.com/Sgitario/quarkus-examples
2.- cd quarkus-examples/kafka-orm-transactions
3.- comment/uncomment/add the approach you would like to use in NewAuthorConsumer
4.- start dev mode: mvn quarkus:dev
5.- add the existing author: curl -X POST http://localhost:8080/api/authors/Jose
6.- you should see the primary key exception in the service logs:

Caused by: org.h2.jdbc.JdbcSQLIntegrityConstraintViolationException: Violación de indice de Unicidad ó Clave primaria: "PUBLIC.PRIMARY_KEY_7 ON PUBLIC.AUTHOR(NAME) VALUES ( /* 1 */ 'Jose' )"
Unique index or primary key violation: "PUBLIC.PRIMARY_KEY_7 ON PUBLIC.AUTHOR(NAME) VALUES ( /* 1 */ 'Jose' )"; SQL statement:
insert into Author (name) values (?) [23505-224]

7.- check the books: curl http://localhost:8080/api/books

And because of the previous exception, the message should have not been sent and there should not be any books. However, the message was wrongly sent and there are books:

[{"id":1,"title":"Jose-book1"}]

Quarkus version or git rev

999-SNAPSHOT

Build tool (ie. output of mvnw --version or gradlew --version)

mvn

@Sgitario Sgitario added the kind/bug Something isn't working label May 9, 2024
Copy link

quarkus-bot bot commented May 9, 2024

/cc @alesj (kafka), @cescoffier (kafka), @ozangunalp (kafka)

@ozangunalp
Copy link
Contributor

To give more context: smallrye/smallrye-reactive-messaging#2609 (reply in thread)
I haven't checked the reproducer but it is most likely a limitation on Smallrye reactive messaging upstream.

@Sgitario
Copy link
Contributor Author

To give more context: smallrye/smallrye-reactive-messaging#2609 (reply in thread) I haven't checked the reproducer but it is most likely a limitation on Smallrye reactive messaging upstream.

If it can't be fixed/supported, I think this should be noted in the documentation as an incompatibility between these two extensions. I would say this is a major limitation that most users tend to oversee/expect to be working.

@donreeal
Copy link

donreeal commented Aug 6, 2024

Hey @Sgitario,

I am currently experimenting with a very similar usecase. I want to try to get away with a outbox pattern light as described here: https://spring.io/blog/2023/10/24/a-use-case-for-transactions-adapting-to-transactional-outbox-pattern.

Here's what I've come up so far based on your example - Approach 3 - and this hint: https://smallrye.io/smallrye-mutiny/latest/guides/imperative-to-reactive/#running-blocking-code-on-subscription. Note that in my personal working example instead of hibernate I am using jdbc only, so not sure if this approach works with hibernate.

I would love to hear your opinion on this though :). I am especially unsure about the internalProcessing.flatMap(...) call as I have no experience with using Mutiny so far.

@ApplicationScoped
public class NewAuthorConsumer {

    @Channel("new-book-out")
    KafkaTransactions<String> kafkaTx;

    @Incoming("new-author-in")
    Uni<Void> consume(Message<String> message) {
        Log.infof("Received new message in authors topic with name `{}`", message.getPayload());

        // what we do in our application wrapped in a boolean 
        // that indicates if reply message should be sent
        Uni<Boolean> internalProcessing = Uni.createFrom()
                // wrapped up in a boolean indicating successful creation
                .item(processNewAuthor(message.getPayload()))
                // when author is already processed 
                // assume reply message has already been sent; therefore ...
                .onFailure(this::isAlreadyProcessed).recoverWithItem(false)
                // this is the important part which will delegate to a worker thread
                .runSubscriptionOn(Infrastructure.getDefaultWorkerPool());

        return kafkaTx.withTransactionAndAck(message, txEmitter ->
                internalProcessing.flatMap(authorAdded -> {
                    if (authorAdded) {
                        txEmitter.send(message.getPayload() + "-book1");
                    }
                    return Uni.createFrom().voidItem();
                }));
    }


    public boolean processNewAuthor(String authorName) {
        persistAuthor(authorName);
        return true;
    }

    @Transactional
    public void persistAuthor(String name) {
        Author author = new Author();
        author.name = name;
        author.persist();
    }


    private boolean isAlreadyProcessed(Throwable t) {
        return t instanceof IntegrityConstraintViolationException
                && t.getCause() != null
                && t.getCause().getMessage().contains("author_name_key");
    }

}

@Sgitario
Copy link
Contributor Author

Sgitario commented Aug 6, 2024

Hey @Sgitario,

I am currently experimenting with a very similar usecase. I want to try to get away with a outbox pattern light as described here: https://spring.io/blog/2023/10/24/a-use-case-for-transactions-adapting-to-transactional-outbox-pattern.

Here's what I've come up so far based on your example - Approach 3 - and this hint: https://smallrye.io/smallrye-mutiny/latest/guides/imperative-to-reactive/#running-blocking-code-on-subscription. Note that in my personal working example instead of hibernate I am using jdbc only, so not sure if this approach works with hibernate.

I would love to hear your opinion on this though :). I am especially unsure about the internalProcessing.flatMap(...) call as I have no experience with using Mutiny so far.

@ApplicationScoped
public class NewAuthorConsumer {

    @Channel("new-book-out")
    KafkaTransactions<String> kafkaTx;

    @Incoming("new-author-in")
    Uni<Void> consume(Message<String> message) {
        Log.infof("Received new message in authors topic with name `{}`", message.getPayload());

        // what we do in our application wrapped in a boolean 
        // that indicates if reply message should be sent
        Uni<Boolean> internalProcessing = Uni.createFrom()
                // wrapped up in a boolean indicating successful creation
                .item(processNewAuthor(message.getPayload()))
                // when author is already processed 
                // assume reply message has already been sent; therefore ...
                .onFailure(this::isAlreadyProcessed).recoverWithItem(false)
                // this is the important part which will delegate to a worker thread
                .runSubscriptionOn(Infrastructure.getDefaultWorkerPool());

        return kafkaTx.withTransactionAndAck(message, txEmitter ->
                internalProcessing.flatMap(authorAdded -> {
                    if (authorAdded) {
                        txEmitter.send(message.getPayload() + "-book1");
                    }
                    return Uni.createFrom().voidItem();
                }));
    }


    public boolean processNewAuthor(String authorName) {
        persistAuthor(authorName);
        return true;
    }

    @Transactional
    public void persistAuthor(String name) {
        Author author = new Author();
        author.name = name;
        author.persist();
    }


    private boolean isAlreadyProcessed(Throwable t) {
        return t instanceof IntegrityConstraintViolationException
                && t.getCause() != null
                && t.getCause().getMessage().contains("author_name_key");
    }

}

I haven't played a lot with this pattern, but I think you need to make up to 3 database transactions using this pattern and I wouldn't use the KafkaTransactions here, but:

@ApplicationScoped
public class NewAuthorConsumer {

    @Channel("new-book-out")
    Emitter<String> emitter;
    @Inject AuthorService authorService;

    @Incoming("new-author-in")
    Uni<Void> consume(Message<String> message) {
        // db tx 1 to create the author
        var id = authorService.handleAuthor(message.getPayload());
        // db tx 2 to query the author (this is just an example)
        var name = authorService.getAuthorName(id);
        return emitter.send(name).onFailure()
            .recoverWithUni(ex -> {
                 // failed to send
                 // db tx 3 to delete the author
                 authorService.deleteAuthor(id);
                 message.nack(ex);
            })
            .onItem().transformToUni(unused -> message.ack());
    }
}

Note that I haven't tested this code and also make sure that the commit strategy is manual for this consumer.

@cescoffier
Copy link
Member

@ozangunalp was this related to the work you did on the emitter recently?

Note that database and Kafka transactions have very different semantic. Thus, I would not use them together as it is very likely not going to do what you think (especially when reading the KIP about Kafka transaction (server side defense))

@ozangunalp
Copy link
Contributor

@cescoffier in part, yes. The other part is to be able to run blocking code inside the Kafka transaction.

Note that database and Kafka transactions have very different semantic. Thus, I would not use them together as it is very likely not going to do what you think

I think it is always useful to repeat that. The KIP-890 will bring some stability improvements to Kafka transactions but it is not there yet.

@cescoffier
Copy link
Member

From my understand it's now possible to chain them correctly. We should document this somewhere.

@ozangunalp
Copy link
Contributor

@Sgitario
Copy link
Contributor Author

@ozangunalp I tried the example from the guide and it's still not working: when causing a database error (for example: a database constraint error), it gets blocked indefinitely.

If I modify the example with something like:

@Transactional 
    public Uni<Void> post(Fruit fruit) {
        return emitter.withTransaction(e -> { 
            // if id is attributed by the database, will need to flush to get it
            // fruit.persistAndFlush();
            fruit.persist(); <1>
            Log.infov("Persisted fruit {0}", p);
            e.send(p); <2>
            return Uni.createFrom().voidItem();
        });
    }

Now, it's not blocked indefinitely, but the message at <2> is sent regardless the database persist at <1> failed.

You can try the same reproducer I added in the issue description:

1.- git clone https://github.com/Sgitario/quarkus-examples
2.- cd quarkus-examples/kafka-orm-transactions
3.- comment/uncomment/add the approach you would like to use in NewAuthorConsumer
4.- start dev mode: mvn quarkus:dev
5.- add the existing author: curl -X POST http://localhost:8080/api/authors/Jose
6.- you should see the primary key exception in the service logs:

Caused by: org.h2.jdbc.JdbcSQLIntegrityConstraintViolationException: Violación de indice de Unicidad ó Clave primaria: "PUBLIC.PRIMARY_KEY_7 ON PUBLIC.AUTHOR(NAME) VALUES ( /* 1 */ 'Jose' )"
Unique index or primary key violation: "PUBLIC.PRIMARY_KEY_7 ON PUBLIC.AUTHOR(NAME) VALUES ( /* 1 */ 'Jose' )"; SQL statement:
insert into Author (name) values (?) [23505-224]

7.- check the books: curl http://localhost:8080/api/books

And because of the previous exception, the message should have not been sent and there should not be any books. However, the message was wrongly sent and there are books:

[{"id":1,"title":"Jose-book1"}]

Quarkus version or git rev

3.18.2

I'm reopening the issue since this is stll not working.

@Sgitario Sgitario reopened this Feb 12, 2025
@ozangunalp
Copy link
Contributor

@Sgitario thanks for checking this!

Do you consume book records with isolation.level=read_committed?

For the blocking, I'll check.

@Sgitario
Copy link
Contributor Author

@Sgitario thanks for checking this!

Do you consume book records with isolation.level=read_committed?

I was not, but after adding this property mp.messaging.incoming.new-book-in.isolation.level=read_committed nothing changed, I still see the book message being sent even when the author record failed to be persisted :/

@ozangunalp
Copy link
Contributor

That is because the constraint violation is thrown when the Hibernate tx is committed, and by that time Kafka tx is already committed.

You can call persistAndFlush instead of persist to check the constraint eagerly.

How to chain both tx depends very much on the use case. While some compensation action is possible on a DB, for Kafka tx it is more complicated to compensate for a committed tx.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/kafka kind/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants