From fef58f1d49314da68b26984f4d79bee47f0857a2 Mon Sep 17 00:00:00 2001 From: apakhomov Date: Mon, 10 Feb 2025 15:01:54 +0300 Subject: [PATCH 01/12] Add concurrent test for TransactionExpirationRegistry --- ...ransactionExpirationRegistryBenchmark.java | 68 ++-- ...ctionExpirationRegistryConcurrentTest.java | 296 ++++++++++++++++++ 2 files changed, 342 insertions(+), 22 deletions(-) create mode 100644 modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java diff --git a/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java b/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java index 3c77e812f8b..5fc803bbc7c 100644 --- a/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java +++ b/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java @@ -18,7 +18,10 @@ package org.apache.ignite.internal.tx.impl; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -33,54 +36,75 @@ import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Timeout; import org.openjdk.jmh.annotations.Warmup; /** Benchmark for TransactionExpirationRegistry. */ @State(Scope.Benchmark) -@OutputTimeUnit(MILLISECONDS) +@OutputTimeUnit(SECONDS) @Timeout(time = 200, timeUnit = MILLISECONDS) @Warmup(iterations = 2, time = 5, timeUnit = MILLISECONDS) @Measurement(time = 5, timeUnit = MILLISECONDS, iterations = 5) public class TransactionExpirationRegistryBenchmark { - private static final int ITERATIONS_COUNT = 10_000; + private static final int ITERATIONS_COUNT = 100_000; - /** Register transactions in the cycle. */ - @Benchmark - public static void register() { - TransactionExpirationRegistry registry = new TransactionExpirationRegistry(); + private static final List transactions = new ArrayList<>(ITERATIONS_COUNT); + + @Setup + public void setup() { for (int i = 0; i < ITERATIONS_COUNT; i++) { - registry.register(new FakeInternalTransaction(i), i); + transactions.add(new FakeInternalTransaction(i)); } } - /** Register and unregister transactions in the cycle. */ + /** Register transactions in the cycle. */ @Benchmark - public static void registerUnregister() { + public static void register() { TransactionExpirationRegistry registry = new TransactionExpirationRegistry(); for (int i = 0; i < ITERATIONS_COUNT; i++) { - registry.register(new FakeInternalTransaction(i), i); - } - - for (int i = 0; i < ITERATIONS_COUNT; i++) { - registry.unregister(new FakeInternalTransaction(i)); + registry.register(transactions.get(i), i); } } - /** Register and expire transactions in the cycle. */ @Benchmark - public static void registerExpire() { + public static void register10() { TransactionExpirationRegistry registry = new TransactionExpirationRegistry(); - for (int i = 0; i < ITERATIONS_COUNT; i++) { - registry.register(new FakeInternalTransaction(i), i); - } - - for (int i = ITERATIONS_COUNT; i > 0; i--) { - registry.expireUpTo(i); + int iterCnt = ITERATIONS_COUNT / 10; + for (int i = 0; i < iterCnt ; i++) { + for (int j = 0; j < 10; j++) { + registry.register(transactions.get(i * 10 + j), i); + } } } + /** Register and unregister transactions in the cycle. */ +// @Benchmark +// public static void registerUnregister() { +// TransactionExpirationRegistry registry = new TransactionExpirationRegistry(); +// for (int i = 0; i < ITERATIONS_COUNT; i++) { +// registry.register(transactions.get(i), i); +// } +// +// for (int i = 0; i < ITERATIONS_COUNT; i++) { +// registry.unregister(transactions.get(i)); +// } +// } + + /** Register and expire transactions in the cycle. */ +// @Benchmark +// public static void registerExpire() { +// TransactionExpirationRegistry registry = new TransactionExpirationRegistry(); +// for (int i = 0; i < ITERATIONS_COUNT; i++) { +// registry.register(transactions.get(i), i); +// } +// +// for (int i = ITERATIONS_COUNT; i > 0; i--) { +// registry.expireUpTo(i); +// } +// } + private static class FakeInternalTransaction implements InternalTransaction { private final int id; diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java new file mode 100644 index 00000000000..3b930e84b50 --- /dev/null +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.lang.IgniteBiTuple; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.tx.TxState; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.tx.TransactionException; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.Test; + +/** Concurrent test for {@link TransactionExpirationRegistry}. */ +class TransactionExpirationRegistryConcurrentTest extends BaseIgniteAbstractTest { + + private final TransactionExpirationRegistry registry = new TransactionExpirationRegistry(); + + @Test + void registerExpireUnregister() { + // Given. + var workers = List.of( + // Time: 0...100_000 + new Worker(registry, 1, 0, 100_000), + // Time: 100_000...200_000 + new Worker(registry, 2, 100_000, 100_000), + // Time: 200_000...300_000 + new Worker(registry, 3, 200_000, 100_000), + // Time: 300_000...400_000 + new Worker(registry, 4, 300_000, 100_000), + // Time: 0...400_000 + new Worker(registry, 5, 0, 400_000) + ); + + // When do concurrent work. + workers.forEach(Worker::run); + // And all workers finish. + workers.forEach(w -> w.waitFinished(10_000)); + + // Then expired transactions are rolled back. + workers.forEach(Worker::assertAllTransactionsRolledBack); + } + + /** Performs the workload in run method. */ + static class Worker { + /** The registry under test. */ + private final TransactionExpirationRegistry transactionExpirationRegistry; + /** Transactions pool. */ + private final ArrayList txPool; + + /** How many transactions are in the worker pool. */ + private final long count; + /** Timestamp offset starting from witch transactions from pool are registered in registry. */ + private final long offset; + /** Unique worker id. */ + private final int workerId; + + /** Latch that is == 0 when the workload is done. */ + private final CountDownLatch latch; + + Worker(TransactionExpirationRegistry transactionExpirationRegistry, int workerId, int offset, int count) { + this.transactionExpirationRegistry = transactionExpirationRegistry; + this.count = count; + this.offset = offset; + this.workerId = workerId; + this.txPool = new ArrayList<>(count); + this.latch = new CountDownLatch(1); + + generateTxns(); + } + + /** + * Run the main workload. + *

+ * Logic: + *

    + *
  • register all transactions from pool with expirationTime == i + offset, where i is the index of tx in pool
  • + *
  • unregister all tx with even i
  • + *
  • expire all tx with odd i
  • + *
  • count down latch
  • + *
+ *

+ * Invariant: each tx with odd i must be eventually rolled back. + */ + void run() { + new Thread(() -> { + for (int i = 0; i < count; i++) { + transactionExpirationRegistry.register(txPool.get(i), offset + i); + } + + for (int i = 0; i < count; i++) { + if (i % 2 == 0) { + transactionExpirationRegistry.unregister(txPool.get(i)); + continue; + } + + transactionExpirationRegistry.expireUpTo(offset + i); + } + + latch.countDown(); + }).start(); + } + + private void generateTxns() { + for (int i = 0; i < count; i++) { + txPool.add(new FakeInternalTransaction(workerId * 1_000_000 + i)); + } + } + + void waitFinished(int timeoutMs) { + try { + if (this.latch.await(timeoutMs, TimeUnit.MILLISECONDS)) { + return; + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + throw new RuntimeException("Thread did not finish in time."); + } + + /** Asserts that all transactions with odd i are eventually rolled back. */ + void assertAllTransactionsRolledBack() { + for (int i = 0; i < count; i++) { + if (i % 2 == 0) { + continue; + } + + int spinCnt = 100; + while (spinCnt-- != 0 && txPool.get(i).getRollbackCount() != 1) { + try { + // Wait for async rollback. + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + if (spinCnt <= 0) { + throw new RuntimeException("Transaction was not rolled back."); + } + } + } + } + + /** Fake transaction for testing purposes. Only has an id (for equals/hashCode) and counts rollbackAsync invocations. */ + private static class FakeInternalTransaction implements InternalTransaction { + private final int id; + private final AtomicInteger rollbackCount = new AtomicInteger(); + + FakeInternalTransaction(int id) { + this.id = id; + } + + @Override + public UUID id() { + return UUID.fromString(id + ""); + } + + @Override + public IgniteBiTuple enlistedNodeAndConsistencyToken(TablePartitionId tablePartitionId) { + return null; + } + + @Override + public TxState state() { + return null; + } + + @Override + public boolean assignCommitPartition(TablePartitionId tablePartitionId) { + return false; + } + + @Override + public TablePartitionId commitPartition() { + return null; + } + + @Override + public IgniteBiTuple enlist(TablePartitionId tablePartitionId, + IgniteBiTuple nodeAndConsistencyToken) { + return null; + } + + @Override + public @Nullable HybridTimestamp readTimestamp() { + return null; + } + + @Override + public HybridTimestamp startTimestamp() { + return null; + } + + @Override + public UUID coordinatorId() { + return null; + } + + @Override + public boolean implicit() { + return false; + } + + @Override + public CompletableFuture finish(boolean commit, @Nullable HybridTimestamp executionTimestamp, boolean full) { + return null; + } + + @Override + public boolean isFinishingOrFinished() { + return false; + } + + @Override + public long timeout() { + return 0; + } + + @Override + public CompletableFuture kill() { + return null; + } + + @Override + public void commit() throws TransactionException { + + } + + @Override + public CompletableFuture commitAsync() { + return null; + } + + @Override + public void rollback() throws TransactionException { + // Do nothing. + } + + @Override + public CompletableFuture rollbackAsync() { + this.rollbackCount.incrementAndGet(); + return CompletableFuture.completedFuture(null); + } + + @Override + public boolean isReadOnly() { + return false; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + FakeInternalTransaction that = (FakeInternalTransaction) o; + return id == that.id; + } + + @Override + public int hashCode() { + return Objects.hashCode(id); + } + + int getRollbackCount() { + return rollbackCount.get(); + } + } + +} From 2cbd8532cfd2138f6f7f1930fe1117873db4e562 Mon Sep 17 00:00:00 2001 From: apakhomov Date: Tue, 11 Feb 2025 17:58:04 +0300 Subject: [PATCH 02/12] Add RW transactions to ExpirationRegistry --- .../client/tx/ClientTransactions.java | 5 -- ...java => ItClientTxTimeoutOneNodeTest.java} | 9 ++-- ...va => ItEmbeddedTxTimeoutOneNodeTest.java} | 9 ++-- ...eTest.java => ItTxTimeoutOneNodeTest.java} | 45 +++++++++++++--- ...ransactionExpirationRegistryBenchmark.java | 50 +++++++++--------- .../tx/impl/IgniteTransactionsImpl.java | 5 -- .../internal/tx/impl/TxManagerImpl.java | 52 +++++++++++-------- ...ctionExpirationRegistryConcurrentTest.java | 8 +-- 8 files changed, 105 insertions(+), 78 deletions(-) rename modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/{readonly/ItClientReadOnlyTxTimeoutOneNodeTest.java => ItClientTxTimeoutOneNodeTest.java} (85%) rename modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/{readonly/ItEmbeddedReadOnlyTxTimeoutOneNodeTest.java => ItEmbeddedTxTimeoutOneNodeTest.java} (75%) rename modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/{readonly/ItReadOnlyTxTimeoutOneNodeTest.java => ItTxTimeoutOneNodeTest.java} (57%) diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java index ece4c13b584..6b2860c1083 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java @@ -62,11 +62,6 @@ static CompletableFuture beginAsync( @Nullable String preferredNodeName, @Nullable TransactionOptions options, long observableTimestamp) { - if (options != null && options.timeoutMillis() != 0 && !options.readOnly()) { - // TODO: IGNITE-16193 - throw new UnsupportedOperationException("Timeouts are not supported yet for RW transactions"); - } - boolean readOnly = options != null && options.readOnly(); return ch.serviceAsync( diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItClientReadOnlyTxTimeoutOneNodeTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItClientTxTimeoutOneNodeTest.java similarity index 85% rename from modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItClientReadOnlyTxTimeoutOneNodeTest.java rename to modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItClientTxTimeoutOneNodeTest.java index b7ca1c75964..b41eeebe046 100644 --- a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItClientReadOnlyTxTimeoutOneNodeTest.java +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItClientTxTimeoutOneNodeTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.tx.readonly; +package org.apache.ignite.internal.tx; import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; @@ -24,12 +24,11 @@ import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.internal.client.tx.ClientLazyTransaction; import org.apache.ignite.internal.lang.IgniteInternalCheckedException; -import org.apache.ignite.internal.tx.impl.ReadOnlyTransactionImpl; import org.apache.ignite.tx.Transaction; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -class ItClientReadOnlyTxTimeoutOneNodeTest extends ItReadOnlyTxTimeoutOneNodeTest { +class ItClientTxTimeoutOneNodeTest extends ItTxTimeoutOneNodeTest { private IgniteClient client; @BeforeEach @@ -52,12 +51,12 @@ Ignite ignite() { } @Override - ReadOnlyTransactionImpl transactionImpl(Transaction tx) { + InternalTransaction toInternalTransaction(Transaction tx) { long txId = ClientLazyTransaction.get(tx).startedTx().id(); ClientResourceRegistry resources = unwrapIgniteImpl(cluster.aliveNode()).clientInboundMessageHandler().resources(); try { - return resources.get(txId).get(ReadOnlyTransactionImpl.class); + return resources.get(txId).get(InternalTransaction.class); } catch (IgniteInternalCheckedException e) { throw new RuntimeException(e); } diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItEmbeddedReadOnlyTxTimeoutOneNodeTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItEmbeddedTxTimeoutOneNodeTest.java similarity index 75% rename from modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItEmbeddedReadOnlyTxTimeoutOneNodeTest.java rename to modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItEmbeddedTxTimeoutOneNodeTest.java index 9d174defc88..5089b2a98d7 100644 --- a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItEmbeddedReadOnlyTxTimeoutOneNodeTest.java +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItEmbeddedTxTimeoutOneNodeTest.java @@ -15,21 +15,20 @@ * limitations under the License. */ -package org.apache.ignite.internal.tx.readonly; +package org.apache.ignite.internal.tx; import org.apache.ignite.Ignite; -import org.apache.ignite.internal.tx.impl.ReadOnlyTransactionImpl; import org.apache.ignite.internal.wrapper.Wrappers; import org.apache.ignite.tx.Transaction; -class ItEmbeddedReadOnlyTxTimeoutOneNodeTest extends ItReadOnlyTxTimeoutOneNodeTest { +class ItEmbeddedTxTimeoutOneNodeTest extends ItTxTimeoutOneNodeTest { @Override Ignite ignite() { return cluster.aliveNode(); } @Override - ReadOnlyTransactionImpl transactionImpl(Transaction tx) { - return Wrappers.unwrap(tx, ReadOnlyTransactionImpl.class); + InternalTransaction toInternalTransaction(Transaction tx) { + return Wrappers.unwrap(tx, InternalTransaction.class); } } diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxTimeoutOneNodeTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java similarity index 57% rename from modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxTimeoutOneNodeTest.java rename to modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java index c03ed59f1f2..c7a87957d27 100644 --- a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxTimeoutOneNodeTest.java +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.tx.readonly; +package org.apache.ignite.internal.tx; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; @@ -24,15 +24,16 @@ import org.apache.ignite.Ignite; import org.apache.ignite.internal.ClusterPerTestIntegrationTest; -import org.apache.ignite.internal.tx.impl.ReadOnlyTransactionImpl; import org.apache.ignite.table.Table; import org.apache.ignite.tx.Transaction; import org.apache.ignite.tx.TransactionException; import org.apache.ignite.tx.TransactionOptions; import org.junit.jupiter.api.Test; -abstract class ItReadOnlyTxTimeoutOneNodeTest extends ClusterPerTestIntegrationTest { - private static final String TABLE_NAME = "TEST"; +abstract class ItTxTimeoutOneNodeTest extends ClusterPerTestIntegrationTest { + private static final String TABLE1_NAME = "TEST1"; + private static final String TABLE2_NAME = "TEST2"; + @Override protected int initialNodes() { @@ -41,15 +42,15 @@ protected int initialNodes() { abstract Ignite ignite(); - abstract ReadOnlyTransactionImpl transactionImpl(Transaction tx); + abstract InternalTransaction toInternalTransaction(Transaction tx); @Test void roTransactionTimesOut() throws Exception { Ignite ignite = ignite(); - ignite.sql().executeScript("CREATE TABLE " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); + ignite.sql().executeScript("CREATE TABLE " + TABLE1_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); - Table table = ignite.tables().table(TABLE_NAME); + Table table = ignite.tables().table(TABLE1_NAME); Transaction roTx = ignite.transactions().begin(new TransactionOptions().readOnly(true).timeoutMillis(100)); @@ -57,7 +58,7 @@ void roTransactionTimesOut() throws Exception { doGetOn(table, roTx); assertTrue( - waitForCondition(() -> transactionImpl(roTx).isFinishingOrFinished(), SECONDS.toMillis(10)), + waitForCondition(() -> toInternalTransaction(roTx).isFinishingOrFinished(), SECONDS.toMillis(10)), "Transaction should have been finished due to timeout" ); @@ -66,7 +67,35 @@ void roTransactionTimesOut() throws Exception { // assertThrows(TransactionException.class, roTx::commit); } + @Test + void readWriteTransactionTimesOut() throws InterruptedException { + Ignite ignite = ignite(); + + ignite.sql().executeScript("CREATE TABLE " + TABLE2_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); + + Table table = ignite.tables().table(TABLE2_NAME); + + Transaction rwTx = ignite.transactions().begin(new TransactionOptions().readOnly(false).timeoutMillis(5_000)); + + // Make sure the tx actually begins on the server (as thin client transactions are lazy). + doPutOn(table, rwTx); + + assertTrue( + waitForCondition(() -> toInternalTransaction(rwTx).isFinishingOrFinished(), SECONDS.toMillis(10)), + "Transaction should have been finished due to timeout" + ); + + assertThrows(TransactionException.class, () -> doGetOn(table, rwTx)); + // TODO: uncomment the following assert after IGNITE-24233 is fixed. + // assertThrows(TransactionException.class, roTx::commit); + + } + private static void doGetOn(Table table, Transaction tx) { table.keyValueView(Integer.class, String.class).get(tx, 1); } + + private static void doPutOn(Table table, Transaction tx) { + table.keyValueView(Integer.class, String.class).put(tx, 1, "one"); + } } diff --git a/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java b/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java index 5fc803bbc7c..4602dc31c2c 100644 --- a/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java +++ b/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java @@ -53,7 +53,7 @@ public class TransactionExpirationRegistryBenchmark { private static final List transactions = new ArrayList<>(ITERATIONS_COUNT); @Setup - public void setup() { + static void setup() { for (int i = 0; i < ITERATIONS_COUNT; i++) { transactions.add(new FakeInternalTransaction(i)); } @@ -69,10 +69,10 @@ public static void register() { } @Benchmark - public static void register10() { + static void register10() { TransactionExpirationRegistry registry = new TransactionExpirationRegistry(); int iterCnt = ITERATIONS_COUNT / 10; - for (int i = 0; i < iterCnt ; i++) { + for (int i = 0; i < iterCnt; i++) { for (int j = 0; j < 10; j++) { registry.register(transactions.get(i * 10 + j), i); } @@ -80,30 +80,30 @@ public static void register10() { } /** Register and unregister transactions in the cycle. */ -// @Benchmark -// public static void registerUnregister() { -// TransactionExpirationRegistry registry = new TransactionExpirationRegistry(); -// for (int i = 0; i < ITERATIONS_COUNT; i++) { -// registry.register(transactions.get(i), i); -// } -// -// for (int i = 0; i < ITERATIONS_COUNT; i++) { -// registry.unregister(transactions.get(i)); -// } -// } + @Benchmark + public static void registerUnregister() { + TransactionExpirationRegistry registry = new TransactionExpirationRegistry(); + for (int i = 0; i < ITERATIONS_COUNT; i++) { + registry.register(transactions.get(i), i); + } + + for (int i = 0; i < ITERATIONS_COUNT; i++) { + registry.unregister(transactions.get(i)); + } + } /** Register and expire transactions in the cycle. */ -// @Benchmark -// public static void registerExpire() { -// TransactionExpirationRegistry registry = new TransactionExpirationRegistry(); -// for (int i = 0; i < ITERATIONS_COUNT; i++) { -// registry.register(transactions.get(i), i); -// } -// -// for (int i = ITERATIONS_COUNT; i > 0; i--) { -// registry.expireUpTo(i); -// } -// } + @Benchmark + public static void registerExpire() { + TransactionExpirationRegistry registry = new TransactionExpirationRegistry(); + for (int i = 0; i < ITERATIONS_COUNT; i++) { + registry.register(transactions.get(i), i); + } + + for (int i = ITERATIONS_COUNT; i > 0; i--) { + registry.expireUpTo(i); + } + } private static class FakeInternalTransaction implements InternalTransaction { private final int id; diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java index 571a0c089f6..dbb7f88294a 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java @@ -50,11 +50,6 @@ public IgniteTransactionsImpl(TxManager txManager, HybridTimestampTracker observ /** {@inheritDoc} */ @Override public Transaction begin(@Nullable TransactionOptions options) { - if (options != null && options.timeoutMillis() != 0 && !options.readOnly()) { - // TODO: IGNITE-15936. - throw new UnsupportedOperationException("Timeouts are not supported yet for RW transactions."); - } - InternalTxOptions internalTxOptions = options == null ? InternalTxOptions.defaults() : InternalTxOptions.builder() diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index 05fb1f07a09..f163e741f7f 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -414,20 +414,10 @@ private InternalTransaction beginBusy( if (readOnly) { HybridTimestamp beginTimestamp = clockService.now(); - - UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp, options.priority()); - - tx = beginReadOnlyTransaction(timestampTracker, beginTimestamp, txId, implicit, options); + tx = beginReadOnlyTransaction(timestampTracker, beginTimestamp, implicit, options); } else { HybridTimestamp beginTimestamp = createBeginTimestampWithIncrementRwTxCounter(); - - UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp, options.priority()); - - // TODO: RW timeouts will be supported in https://issues.apache.org/jira/browse/IGNITE-24244 - // long timeout = options.timeoutMillis() == 0 ? txConfig.readWriteTimeout().value() : options.timeoutMillis(); - long timeout = 3_000; - - tx = new ReadWriteTransactionImpl(this, timestampTracker, txId, localNodeId, implicit, timeout); + tx = beginReadWriteTransaction(timestampTracker, beginTimestamp, implicit, options); } txStateVolatileStorage.initialize(tx); @@ -435,13 +425,38 @@ private InternalTransaction beginBusy( return tx; } + private ReadWriteTransactionImpl beginReadWriteTransaction( + HybridTimestampTracker timestampTracker, + HybridTimestamp beginTimestamp, + boolean implicit, + InternalTxOptions options) { + + UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp, options.priority()); + + long timeout = options.timeoutMillis() == 0 ? txConfig.readWriteTimeout().value() : options.timeoutMillis(); + + var transaction = new ReadWriteTransactionImpl(this, timestampTracker, txId, localNodeId, implicit, timeout); + + // Implicit transactions are finished as soon as their operation/query is finished, they cannot be abandoned, so there is + // no need to register them. + // TODO: https://issues.apache.org/jira/browse/IGNITE-24229 - schedule expiration for multi-key implicit transactions? + boolean scheduleExpiration = !implicit; + + if (scheduleExpiration) { + transactionExpirationRegistry.register(transaction, physicalExpirationTimeMillis(beginTimestamp, timeout)); + } + + return transaction; + } + private ReadOnlyTransactionImpl beginReadOnlyTransaction( HybridTimestampTracker timestampTracker, HybridTimestamp beginTimestamp, - UUID txId, boolean implicit, InternalTxOptions options ) { + UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp, options.priority()); + HybridTimestamp observableTimestamp = timestampTracker.get(); HybridTimestamp readTimestamp = observableTimestamp != null @@ -460,7 +475,7 @@ private ReadOnlyTransactionImpl beginReadOnlyTransaction( try { CompletableFuture txFuture = new CompletableFuture<>(); - long timeout = options.timeoutMillis() == 0 ? defaultReadOnlyTransactionTimeoutMillis() : options.timeoutMillis(); + long timeout = options.timeoutMillis() == 0 ? txConfig.readOnlyTimeout().value() : options.timeoutMillis(); var transaction = new ReadOnlyTransactionImpl( this, timestampTracker, txId, localNodeId, implicit, timeout, readTimestamp, txFuture @@ -472,7 +487,7 @@ private ReadOnlyTransactionImpl beginReadOnlyTransaction( boolean scheduleExpiration = !implicit; if (scheduleExpiration) { - transactionExpirationRegistry.register(transaction, roExpirationPhysicalTimeFor(beginTimestamp, options)); + transactionExpirationRegistry.register(transaction, physicalExpirationTimeMillis(beginTimestamp, timeout)); } txFuture.whenComplete((unused, throwable) -> { @@ -491,8 +506,7 @@ private ReadOnlyTransactionImpl beginReadOnlyTransaction( } } - private long roExpirationPhysicalTimeFor(HybridTimestamp beginTimestamp, InternalTxOptions options) { - long effectiveTimeoutMillis = options.timeoutMillis() == 0 ? defaultReadOnlyTransactionTimeoutMillis() : options.timeoutMillis(); + private static long physicalExpirationTimeMillis(HybridTimestamp beginTimestamp, long effectiveTimeoutMillis) { return sumWithSaturation(beginTimestamp.getPhysical(), effectiveTimeoutMillis); } @@ -510,10 +524,6 @@ private static long sumWithSaturation(long a, long b) { } } - private long defaultReadOnlyTransactionTimeoutMillis() { - return txConfig.readOnlyTimeout().value(); - } - /** * Current read timestamp, for calculation of read timestamp of read-only transactions. * diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java index 3b930e84b50..d4573f1fb9e 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java @@ -96,16 +96,16 @@ static class Worker { /** * Run the main workload. - *

- * Logic: + * + *

Logic: *

    *
  • register all transactions from pool with expirationTime == i + offset, where i is the index of tx in pool
  • *
  • unregister all tx with even i
  • *
  • expire all tx with odd i
  • *
  • count down latch
  • *
- *

- * Invariant: each tx with odd i must be eventually rolled back. + * + *

Invariant: each tx with odd i must be eventually rolled back. */ void run() { new Thread(() -> { From cf1a0c5c8e250fec0c3bf89052afb77e3f3c4dff Mon Sep 17 00:00:00 2001 From: apakhomov Date: Wed, 12 Feb 2025 16:32:04 +0300 Subject: [PATCH 03/12] Fix TxManagerTest --- .../test/java/org/apache/ignite/internal/tx/TxManagerTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java index 2cc40815b31..4aa7cfc1dca 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java @@ -44,6 +44,7 @@ import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.framework; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; @@ -761,6 +762,8 @@ void testCreateBeginTsInsideInUpdateRwTxCount() { }).when(localRwTxCounter).inUpdateRwTxCountLock(any()); txManager.beginExplicitRw(hybridTimestampTracker, InternalTxOptions.defaults()); + + doReturn(null).when(localRwTxCounter).inUpdateRwTxCountLock(any()); } private InternalTransaction prepareTransaction() { From 65102da24c939a745a1168dde146e566196600d6 Mon Sep 17 00:00:00 2001 From: apakhomov Date: Wed, 12 Feb 2025 18:30:36 +0300 Subject: [PATCH 04/12] Longer RW Transaction timeout in Sql DML test --- .../org/apache/ignite/internal/sql/engine/ItDmlTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java index 7b013218b57..0a38ca1abc9 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java @@ -40,6 +40,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.calcite.sql.type.SqlTypeName; @@ -147,7 +148,6 @@ public void batchWithConflictShouldBeRejectedEntirely() { () -> sql("INSERT INTO test VALUES (0, 0), (1, 1), (2, 2)") ); - assertQuery("SELECT count(*) FROM test") .returns(1L) .check(); @@ -214,7 +214,7 @@ public void testNullDefault() { assertQuery("SELECT col FROM test_null_def WHERE id = 2").returns(null).check(); } - /**Test full MERGE command. */ + /** Test full MERGE command. */ @Test public void testMerge() { clearAndPopulateMergeTable1(); @@ -373,7 +373,9 @@ public void testMergeBatch() { assertQuery("SELECT count(*) FROM test2 WHERE b = 0").returns(10_000L).check(); - sql("MERGE INTO test2 dst USING test1 src ON dst.a = src.a" + var longerTimeoutOptions = new TransactionOptions().readOnly(false).timeoutMillis(TimeUnit.MINUTES.toMillis(2)); + var tx = igniteTx().begin(longerTimeoutOptions); + sql(tx, "MERGE INTO test2 dst USING test1 src ON dst.a = src.a" + " WHEN MATCHED THEN UPDATE SET b = 1 " + " WHEN NOT MATCHED THEN INSERT (key, a, b) VALUES (src.key, src.a, 2)"); From 1857e81202d017dc431ab3b77c6aa7632b264477 Mon Sep 17 00:00:00 2001 From: apakhomov Date: Wed, 12 Feb 2025 19:56:17 +0300 Subject: [PATCH 05/12] Fix test --- .../java/org/apache/ignite/internal/sql/engine/ItDmlTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java index 0a38ca1abc9..ad74d6d7b9c 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java @@ -378,6 +378,7 @@ public void testMergeBatch() { sql(tx, "MERGE INTO test2 dst USING test1 src ON dst.a = src.a" + " WHEN MATCHED THEN UPDATE SET b = 1 " + " WHEN NOT MATCHED THEN INSERT (key, a, b) VALUES (src.key, src.a, 2)"); + tx.commit(); assertQuery("SELECT count(*) FROM test2 WHERE b = 0").returns(5_000L).check(); assertQuery("SELECT count(*) FROM test2 WHERE b = 1").returns(5_000L).check(); From c3b4ea6b4044e668384e7d9aa4267acc45120e71 Mon Sep 17 00:00:00 2001 From: apakhomov Date: Fri, 14 Feb 2025 00:33:07 +0300 Subject: [PATCH 06/12] WIP: Adding timeout info into TxMeta --- .../client/tx/ClientTransactions.java | 3 +- .../distributed/raft/PartitionListener.java | 6 ++- .../replicator/PartitionReplicaListener.java | 10 ++-- .../storage/InternalTableImpl.java | 6 +-- .../ignite/internal/table/TxAbstractTest.java | 3 +- .../internal/tx/ItTxTimeoutOneNodeTest.java | 46 ++++++++++++++++--- .../ItTransactionRecoveryTest.java | 1 + .../internal/tx/InternalTransaction.java | 12 ++++- .../apache/ignite/internal/tx/TxManager.java | 6 ++- .../ignite/internal/tx/TxStateMeta.java | 21 +++++++-- .../internal/tx/TxStateMetaAbandoned.java | 2 +- .../internal/tx/TxStateMetaFinishing.java | 2 +- .../impl/PublicApiThreadingTransaction.java | 4 +- .../tx/impl/ReadOnlyTransactionImpl.java | 16 +++++-- .../tx/impl/ReadWriteTransactionImpl.java | 26 ++++++++--- .../impl/TransactionExpirationRegistry.java | 2 +- .../tx/impl/TxCleanupRequestSender.java | 3 +- .../internal/tx/impl/TxManagerImpl.java | 17 +++++-- .../tx/impl/VolatileTxStateMetaStorage.java | 6 ++- .../tx/message/TxStateMetaMessage.java | 5 +- 20 files changed, 149 insertions(+), 48 deletions(-) diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java index 6b2860c1083..10e7327c5d7 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java @@ -63,12 +63,13 @@ static CompletableFuture beginAsync( @Nullable TransactionOptions options, long observableTimestamp) { boolean readOnly = options != null && options.readOnly(); + long timeout = options == null ? 0 : options.timeoutMillis(); return ch.serviceAsync( ClientOp.TX_BEGIN, w -> { w.out().packBoolean(readOnly); - w.out().packLong(options == null ? 0 : options.timeoutMillis()); + w.out().packLong(timeout); w.out().packLong(observableTimestamp); }, r -> readTx(r, readOnly), diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 5fe51075962..91a2bddc89d 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -764,7 +764,8 @@ private void replicaTouch(UUID txId, UUID txCoordinatorId, HybridTimestamp commi txCoordinatorId, old == null ? null : old.commitPartitionId(), full ? commitTimestamp : null, - old == null ? null : old.tx() + old == null ? null : old.tx(), + old == null ? null : old.isFinishedDueToTimeout() )); } @@ -776,7 +777,8 @@ private void markFinished(UUID txId, boolean commit, @Nullable HybridTimestamp c commit ? commitTimestamp : null, old == null ? null : old.tx(), old == null ? null : old.initialVacuumObservationTimestamp(), - old == null ? null : old.cleanupCompletionTimestamp() + old == null ? null : old.cleanupCompletionTimestamp(), + old == null ? null : old.isFinishedDueToTimeout() )); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 65ff30057e1..8495f35eca2 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -517,7 +517,8 @@ private CompletableFuture processRequest(ReplicaRequest request, @Nullable Bo req.coordinatorId(), req.commitPartitionId().asTablePartitionId(), null, - old == null ? null : old.tx() + old == null ? null : old.tx(), + old == null ? null : old.isFinishedDueToTimeout() )); } } @@ -697,6 +698,7 @@ private CompletableFuture triggerTxRecovery(UUID txId, UUID senderId) { // Tx recovery is executed on the commit partition. replicationGroupId, false, + false, // Enlistment consistency token is not required for the rollback, so it is 0L. Map.of(replicationGroupId, new IgniteBiTuple<>(clusterNodeResolver.getById(senderId), 0L)), txId @@ -813,7 +815,8 @@ private CompletableFuture processOperationRequest( req.coordinatorId(), req.commitPartitionId().asTablePartitionId(), null, - old == null ? null : old.tx() + old == null ? null : old.tx(), + old == null ? null : old.isFinishedDueToTimeout() )); var opId = new OperationId(senderId, req.timestamp().longValue()); @@ -3949,7 +3952,8 @@ private void markFinished(UUID txId, TxState txState, @Nullable HybridTimestamp txState == COMMITTED ? commitTimestamp : null, old == null ? null : old.tx(), old == null ? null : old.initialVacuumObservationTimestamp(), - old == null ? null : old.cleanupCompletionTimestamp() + old == null ? null : old.cleanupCompletionTimestamp(), + old == null ? null : old.isFinishedDueToTimeout() )); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index 96b1020a522..73b6541b177 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -665,7 +665,7 @@ private CompletableFuture trackingInvoke( assert hasError || r instanceof TimestampAware; // Timestamp is set to commit timestamp for full transactions. - tx.finish(!hasError, hasError ? null : ((TimestampAware) r).timestamp(), true); + tx.finish(!hasError, hasError ? null : ((TimestampAware) r).timestamp(), true, false); if (e != null) { sneakyThrow(e); @@ -879,7 +879,7 @@ private CompletableFuture sendReadOnlyToPrimaryReplica( private CompletableFuture postEvaluate(CompletableFuture fut, InternalTransaction tx) { return fut.handle((BiFunction>) (r, e) -> { if (e != null) { - return tx.finish(false, clockService.current(), false) + return tx.finish(false, clockService.current(), false, false) .handle((ignored, err) -> { if (err != null) { e.addSuppressed(err); @@ -890,7 +890,7 @@ private CompletableFuture postEvaluate(CompletableFuture fut, Internal }); // Preserve failed state. } - return tx.finish(true, clockService.current(), false).thenApply(ignored -> r); + return tx.finish(true, clockService.current(), false, false).thenApply(ignored -> r); }).thenCompose(identity()); } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java index 29921ef1bb1..1f3bff2ba50 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java @@ -2172,7 +2172,8 @@ public void testWriteIntentResolutionFallbackToCommitPartitionPath() { new UUID(1, 2), old.commitPartitionId(), old.commitTimestamp(), - old == null ? null : old.tx() + old == null ? null : old.tx(), + old == null ? null : old.isFinishedDueToTimeout() )); } diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java index c7a87957d27..92bffd0aeee 100644 --- a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java @@ -21,6 +21,7 @@ import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import org.apache.ignite.Ignite; import org.apache.ignite.internal.ClusterPerTestIntegrationTest; @@ -31,8 +32,7 @@ import org.junit.jupiter.api.Test; abstract class ItTxTimeoutOneNodeTest extends ClusterPerTestIntegrationTest { - private static final String TABLE1_NAME = "TEST1"; - private static final String TABLE2_NAME = "TEST2"; + private static final String TABLE_NAME = "TEST"; @Override @@ -48,9 +48,9 @@ protected int initialNodes() { void roTransactionTimesOut() throws Exception { Ignite ignite = ignite(); - ignite.sql().executeScript("CREATE TABLE " + TABLE1_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); + ignite.sql().executeScript("CREATE TABLE " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); - Table table = ignite.tables().table(TABLE1_NAME); + Table table = ignite.tables().table(TABLE_NAME); Transaction roTx = ignite.transactions().begin(new TransactionOptions().readOnly(true).timeoutMillis(100)); @@ -71,9 +71,9 @@ void roTransactionTimesOut() throws Exception { void readWriteTransactionTimesOut() throws InterruptedException { Ignite ignite = ignite(); - ignite.sql().executeScript("CREATE TABLE " + TABLE2_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); + ignite.sql().executeScript("CREATE TABLE " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); - Table table = ignite.tables().table(TABLE2_NAME); + Table table = ignite.tables().table(TABLE_NAME); Transaction rwTx = ignite.transactions().begin(new TransactionOptions().readOnly(false).timeoutMillis(5_000)); @@ -88,7 +88,41 @@ void readWriteTransactionTimesOut() throws InterruptedException { assertThrows(TransactionException.class, () -> doGetOn(table, rwTx)); // TODO: uncomment the following assert after IGNITE-24233 is fixed. // assertThrows(TransactionException.class, roTx::commit); + } + + @Test + void timeoutExceptionHasCorrectCause() throws InterruptedException { + Ignite ignite = ignite(); + + ignite.sql().executeScript("CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); + + Table table = ignite.tables().table(TABLE_NAME); + + Transaction rwTx = ignite.transactions().begin(new TransactionOptions().readOnly(false).timeoutMillis(1_000)); + + // Wait for an exception. + assertTrue( + waitForCondition(() -> timeoutExceeded(table, rwTx), 10_000), + "Write operation should throw an exception with TX_TIMEOUT_EXCEEDED error code" + ); + + assertThrows(TransactionException.class, () -> doGetOn(table, rwTx)); + // TODO: uncomment the following assert after IGNITE-24233 is fixed. + // assertThrows(TransactionException.class, roTx::commit); + } + private static boolean timeoutExceeded(Table table, Transaction rwTx) { + try { + doPutOn(table, rwTx); + return false; + } catch (TransactionException ex) { + if (ex.getMessage().contains("timeout exceeded")) { + return true; + } else { + fail("Expected TX_TIMEOUT_EXCEEDED error code, but got: " + ex.code()); + return false; + } + } } private static void doGetOn(Table table, Transaction tx) { diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java index d2ba2a059ad..ef018df8c6e 100644 --- a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java @@ -743,6 +743,7 @@ public void testFinishAlreadyFinishedTx() throws Exception { HybridTimestampTracker.atomicTracker(null), ((InternalTransaction) rwTx1).commitPartition(), false, + false, Map.of(((InternalTransaction) rwTx1).commitPartition(), new IgniteBiTuple<>(txCrdNode2.node(), 0L)), rwTx1Id ); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java index 6a1e561f45b..bb7f5b19169 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java @@ -112,9 +112,10 @@ public interface InternalTransaction extends Transaction { * @param executionTimestamp The timestamp is the time when a read-only transaction is applied to the remote node. The parameter * is not used for read-write transactions. * @param full Full state transaction marker. + * @param timeoutExceeded Timeout exceeded marker. * @return The future. */ - CompletableFuture finish(boolean commit, @Nullable HybridTimestamp executionTimestamp, boolean full); + CompletableFuture finish(boolean commit, @Nullable HybridTimestamp executionTimestamp, boolean full, boolean timeoutExceeded); /** * Checks if the transaction is finishing or finished. If {@code true}, no more operations can be performed on the transaction. @@ -137,4 +138,13 @@ public interface InternalTransaction extends Transaction { * @return The future. */ CompletableFuture kill(); + + /** + * Rolls back the transaction if the timeout is exceeded. + * + * @return The future. + */ + default CompletableFuture rollbackTimeoutExceededAsync() { + return rollbackAsync(); + } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java index e4754f05dc1..72c7fdd9222 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java @@ -147,7 +147,9 @@ default InternalTransaction beginExplicitRo(HybridTimestampTracker timestampTrac * @param ts The timestamp which is associated to txn completion. * @param commit {@code True} if a commit requested. */ - void finishFull(HybridTimestampTracker timestampTracker, UUID txId, @Nullable HybridTimestamp ts, boolean commit); + void finishFull( + HybridTimestampTracker timestampTracker, UUID txId, @Nullable HybridTimestamp ts, boolean commit, boolean timeoutExceeded + ); /** * Finishes a dependant transactions. @@ -156,6 +158,7 @@ default InternalTransaction beginExplicitRo(HybridTimestampTracker timestampTrac * should pass its own tracker to provide linearizability between read-write and read-only transactions started by this client. * @param commitPartition Partition to store a transaction state. * @param commit {@code true} if a commit requested. + * @param timeoutExceeded {@code true} if a timeout exceeded. * @param enlistedGroups Enlisted partition groups with consistency tokens. * @param txId Transaction id. */ @@ -163,6 +166,7 @@ CompletableFuture finish( HybridTimestampTracker timestampTracker, TablePartitionId commitPartition, boolean commit, + boolean timeoutExceeded, Map> enlistedGroups, UUID txId ); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java index 3a28aaff4f8..fffc70368d2 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java @@ -50,6 +50,8 @@ public class TxStateMeta implements TransactionMeta { private final @Nullable Long cleanupCompletionTimestamp; + private final @Nullable Boolean isFinishedDueToTimeout; + /** * The ignite transaction object is associated with this state. This field can be initialized only on the transaction coordinator, * {@code null} in other nodes. @@ -71,9 +73,10 @@ public TxStateMeta( @Nullable UUID txCoordinatorId, @Nullable TablePartitionId commitPartitionId, @Nullable HybridTimestamp commitTimestamp, - @Nullable InternalTransaction tx + @Nullable InternalTransaction tx, + @Nullable Boolean isFinishedDueToTimeout ) { - this(txState, txCoordinatorId, commitPartitionId, commitTimestamp, tx, null); + this(txState, txCoordinatorId, commitPartitionId, commitTimestamp, tx, null, isFinishedDueToTimeout); } /** @@ -92,9 +95,10 @@ public TxStateMeta( @Nullable TablePartitionId commitPartitionId, @Nullable HybridTimestamp commitTimestamp, @Nullable InternalTransaction tx, - @Nullable Long initialVacuumObservationTimestamp + @Nullable Long initialVacuumObservationTimestamp, + @Nullable Boolean isFinishedDueToTimeout ) { - this(txState, txCoordinatorId, commitPartitionId, commitTimestamp, tx, initialVacuumObservationTimestamp, null); + this(txState, txCoordinatorId, commitPartitionId, commitTimestamp, tx, initialVacuumObservationTimestamp, null, isFinishedDueToTimeout); } /** @@ -115,7 +119,8 @@ public TxStateMeta( @Nullable HybridTimestamp commitTimestamp, @Nullable InternalTransaction tx, @Nullable Long initialVacuumObservationTimestamp, - @Nullable Long cleanupCompletionTimestamp + @Nullable Long cleanupCompletionTimestamp, + @Nullable Boolean isFinishedDueToTimeout ) { this.txState = txState; this.txCoordinatorId = txCoordinatorId; @@ -124,6 +129,7 @@ public TxStateMeta( this.tx = tx; this.initialVacuumObservationTimestamp = initialVacuumObservationTimestamp; this.cleanupCompletionTimestamp = cleanupCompletionTimestamp; + this.isFinishedDueToTimeout = isFinishedDueToTimeout; } /** @@ -181,6 +187,10 @@ public TxState txState() { return cleanupCompletionTimestamp; } + public @Nullable Boolean isFinishedDueToTimeout() { + return isFinishedDueToTimeout; + } + @Override public TxStateMetaMessage toTransactionMetaMessage( ReplicaMessagesFactory replicaMessagesFactory, @@ -193,6 +203,7 @@ public TxStateMetaMessage toTransactionMetaMessage( .commitTimestamp(commitTimestamp) .initialVacuumObservationTimestamp(initialVacuumObservationTimestamp) .cleanupCompletionTimestamp(cleanupCompletionTimestamp) + .isFinishedDueToTimeout(isFinishedDueToTimeout) .build(); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java index 55ca4ff8962..72b5ae01606 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java @@ -47,7 +47,7 @@ public TxStateMetaAbandoned( UUID txCoordinatorId, TablePartitionId commitPartitionId ) { - super(ABANDONED, txCoordinatorId, commitPartitionId, null, null); + super(ABANDONED, txCoordinatorId, commitPartitionId, null, null, null); this.lastAbandonedMarkerTs = FastTimestamps.coarseCurrentTimeMillis(); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java index ca4246b52dd..b4f71fce2a0 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java @@ -45,7 +45,7 @@ public class TxStateMetaFinishing extends TxStateMeta { * @param commitPartitionId Commit partition id. */ public TxStateMetaFinishing(@Nullable UUID txCoordinatorId, @Nullable TablePartitionId commitPartitionId) { - super(TxState.FINISHING, txCoordinatorId, commitPartitionId, null, null); + super(TxState.FINISHING, txCoordinatorId, commitPartitionId, null, null, null); } /** diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java index 73787429772..9f9420efc05 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java @@ -132,8 +132,8 @@ public boolean implicit() { } @Override - public CompletableFuture finish(boolean commit, HybridTimestamp executionTimestamp, boolean full) { - return transaction.finish(commit, executionTimestamp, full); + public CompletableFuture finish(boolean commit, HybridTimestamp executionTimestamp, boolean full, boolean timeoutExceeded) { + return transaction.finish(commit, executionTimestamp, full, timeoutExceeded); } @Override diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java index 24dd0210d09..1544e7d77ed 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java @@ -110,7 +110,7 @@ public TablePartitionId commitPartition() { @Override public CompletableFuture commitAsync() { return TransactionsExceptionMapperUtil.convertToPublicFuture( - finish(true, readTimestamp, false), + finish(true, readTimestamp, false, false), TX_COMMIT_ERR ); } @@ -118,13 +118,21 @@ public CompletableFuture commitAsync() { @Override public CompletableFuture rollbackAsync() { return TransactionsExceptionMapperUtil.convertToPublicFuture( - finish(false, readTimestamp, false), + finish(false, readTimestamp, false, false), TX_ROLLBACK_ERR ); } @Override - public CompletableFuture finish(boolean commit, HybridTimestamp executionTimestamp, boolean full) { + public CompletableFuture rollbackTimeoutExceededAsync() { + return TransactionsExceptionMapperUtil.convertToPublicFuture( + finish(false, readTimestamp, false, true), + TX_ROLLBACK_ERR + ); + } + + @Override + public CompletableFuture finish(boolean commit, HybridTimestamp executionTimestamp, boolean full, boolean timeoutExceeded) { assert !full : "Read-only transactions cannot be full."; if (!finishGuard.compareAndSet(false, true)) { @@ -147,6 +155,6 @@ public boolean isFinishingOrFinished() { @Override public CompletableFuture kill() { - return finish(false, readTimestamp, false); + return finish(false, readTimestamp, false, false); } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java index 8e5cbc510e4..50c46c875d7 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java @@ -143,7 +143,7 @@ private void checkEnlistPossibility() { @Override public CompletableFuture commitAsync() { return TransactionsExceptionMapperUtil.convertToPublicFuture( - finish(true, null, false), + finish(true, null, false, false), TX_COMMIT_ERR ); } @@ -151,18 +151,28 @@ public CompletableFuture commitAsync() { @Override public CompletableFuture rollbackAsync() { return TransactionsExceptionMapperUtil.convertToPublicFuture( - finish(false, null, false), + finish(false, null, false, false), TX_ROLLBACK_ERR ); } @Override - public CompletableFuture finish(boolean commit, @Nullable HybridTimestamp executionTimestamp, boolean full) { + public CompletableFuture rollbackTimeoutExceededAsync() { + return TransactionsExceptionMapperUtil.convertToPublicFuture( + finish(false, null, false, true), + TX_ROLLBACK_ERR + ); + } + + @Override + public CompletableFuture finish( + boolean commit, @Nullable HybridTimestamp executionTimestamp, boolean full, boolean timeoutExceeded + ) { if (finishFuture != null) { return finishFuture; } - return finishInternal(commit, executionTimestamp, full, true); + return finishInternal(commit, executionTimestamp, full, true, timeoutExceeded); } /** @@ -178,7 +188,8 @@ private CompletableFuture finishInternal( boolean commit, @Nullable HybridTimestamp executionTimestamp, boolean full, - boolean isComplete + boolean isComplete, + boolean timeoutExceeded ) { enlistPartitionLock.writeLock().lock(); @@ -198,7 +209,7 @@ private CompletableFuture finishInternal( } if (full) { - txManager.finishFull(observableTsTracker, id(), executionTimestamp, commit); + txManager.finishFull(observableTsTracker, id(), executionTimestamp, commit, timeoutExceeded); if (isComplete) { finishFuture = nullCompletedFuture(); @@ -210,6 +221,7 @@ private CompletableFuture finishInternal( observableTsTracker, commitPart, commit, + timeoutExceeded, enlisted, id() ); @@ -255,6 +267,6 @@ public HybridTimestamp startTimestamp() { @Override public CompletableFuture kill() { - return finishInternal(false, null, false, false); + return finishInternal(false, null, false, false, false); } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistry.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistry.java index b3c0351958d..4b7a3a8a9ed 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistry.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistry.java @@ -94,7 +94,7 @@ private boolean isExpired(long expirationTime) { } private static void abortTransaction(InternalTransaction tx) { - tx.rollbackAsync().whenComplete((res, ex) -> { + tx.rollbackTimeoutExceededAsync().whenComplete((res, ex) -> { if (ex != null) { LOG.error("Transaction abort due to timeout failed [txId={}]", ex, tx.id()); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java index b2e268a874e..cd5ebcecd78 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java @@ -121,7 +121,8 @@ private void markTxnCleanupReplicated(UUID txId, TxState state, TablePartitionId oldMeta == null ? null : oldMeta.commitTimestamp(), oldMeta == null ? null : oldMeta.tx(), oldMeta == null ? null : oldMeta.initialVacuumObservationTimestamp(), - cleanupCompletionTimestamp + cleanupCompletionTimestamp, + oldMeta == null ? null : oldMeta.isFinishedDueToTimeout() ) ); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index f163e741f7f..ee00c8e1281 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -552,7 +552,9 @@ public Collection states() { } @Override - public void finishFull(HybridTimestampTracker timestampTracker, UUID txId, @Nullable HybridTimestamp ts, boolean commit) { + public void finishFull( + HybridTimestampTracker timestampTracker, UUID txId, @Nullable HybridTimestamp ts, boolean commit, boolean timeout + ) { TxState finalState; finishedTxs.add(1); @@ -573,7 +575,8 @@ public void finishFull(HybridTimestampTracker timestampTracker, UUID txId, @Null old == null ? null : old.txCoordinatorId(), old == null ? null : old.commitPartitionId(), ts, - old == null ? null : old.tx() + old == null ? null : old.tx(), + timeout )); decrementRwTxCount(txId); @@ -588,6 +591,7 @@ public CompletableFuture finish( HybridTimestampTracker observableTimestampTracker, TablePartitionId commitPartition, boolean commitIntent, + boolean timeout, Map> enlistedGroups, UUID txId ) { @@ -604,7 +608,8 @@ public CompletableFuture finish( localNodeId, commitPartition, commitTimestamp(commitIntent), - old == null ? null : old.tx() + old == null ? null : old.tx(), + timeout )); decrementRwTxCount(txId); @@ -753,7 +758,8 @@ private CompletableFuture durableFinish( result.commitTimestamp(), old == null ? null : old.tx(), old == null ? null : old.initialVacuumObservationTimestamp(), - old == null ? null : old.cleanupCompletionTimestamp() + old == null ? null : old.cleanupCompletionTimestamp(), + old == null ? null : old.isFinishedDueToTimeout() ) ); @@ -820,7 +826,8 @@ private CompletableFuture makeFinishRequest( txResult.commitTimestamp(), old == null ? null : old.tx(), old == null ? null : old.initialVacuumObservationTimestamp(), - old == null ? null : old.cleanupCompletionTimestamp() + old == null ? null : old.cleanupCompletionTimestamp(), + old == null ? null : old.isFinishedDueToTimeout() )); assert isFinalState(updatedMeta.txState()) : diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java index f5f2fb78818..d2ec539a07a 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java @@ -72,7 +72,8 @@ public void stop() { * @param tx Transaction object. */ public void initialize(InternalTransaction tx) { - TxStateMeta previous = txStateMap.put(tx.id(), new TxStateMeta(PENDING, tx.coordinatorId(), null, null, tx)); + TxStateMeta previous = txStateMap.put(tx.id(), new TxStateMeta(PENDING, tx.coordinatorId(), null, null, tx, null)); + assert previous == null : "Transaction state has already defined [txId=" + tx.id() + ", state=" + previous.txState() + ']'; } @@ -256,7 +257,8 @@ private static TxStateMeta markInitialVacuumObservationTimestamp(TxStateMeta met meta.commitTimestamp(), meta.tx(), vacuumObservationTimestamp, - meta.cleanupCompletionTimestamp() + meta.cleanupCompletionTimestamp(), + meta.isFinishedDueToTimeout() ); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaMessage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaMessage.java index d08e0a1a5af..ce2e8de16ad 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaMessage.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaMessage.java @@ -39,6 +39,8 @@ public interface TxStateMetaMessage extends TransactionMetaMessage { /** Cleanup completion timestamp. */ @Nullable Long cleanupCompletionTimestamp(); + @Nullable Boolean isFinishedDueToTimeout(); + /** Converts to {@link TxStateMeta}. */ default TxStateMeta asTxStateMeta() { TablePartitionIdMessage commitPartitionId = commitPartitionId(); @@ -50,7 +52,8 @@ default TxStateMeta asTxStateMeta() { commitTimestamp(), null, initialVacuumObservationTimestamp(), - cleanupCompletionTimestamp() + cleanupCompletionTimestamp(), + isFinishedDueToTimeout() ); } From c9c1bcea6dd018f81a78c78d3669db241ca3b8b7 Mon Sep 17 00:00:00 2001 From: apakhomov Date: Mon, 17 Feb 2025 00:52:39 +0300 Subject: [PATCH 07/12] Add isTimeoutExceeded to InternalTransaction --- .../storage/InternalTableImpl.java | 10 +++-- .../internal/tx/ItTxTimeoutOneNodeTest.java | 2 +- .../internal/tx/InternalTransaction.java | 4 ++ .../FinishedReadOnlyTransactionTracker.java | 2 +- .../impl/IgniteAbstractTransactionImpl.java | 8 ++++ .../impl/PublicApiThreadingTransaction.java | 5 +++ .../tx/impl/ReadOnlyTransactionImpl.java | 2 +- .../tx/impl/ReadWriteTransactionImpl.java | 5 ++- .../tx/impl/TransactionInflights.java | 44 +++++++++++++++---- .../internal/tx/impl/TxManagerImpl.java | 4 +- 10 files changed, 68 insertions(+), 18 deletions(-) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index 73b6541b177..dd69bf732e9 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -678,10 +678,11 @@ private CompletableFuture trackingInvoke( if (!transactionInflights.addInflight(tx.id(), false)) { return failedFuture( new TransactionException(TX_ALREADY_FINISHED_ERR, format( - "Transaction is already finished [tableName={}, partId={}, txState={}].", + "Transaction is already finished [tableName={}, partId={}, txState={}, timeoutExceeded={}].", tableName, partId, - tx.state() + tx.state(), + tx.isTimeoutExceeded() ))); } @@ -2329,9 +2330,10 @@ private static long enlistmentConsistencyToken(ReplicaMeta replicaMeta) { private void checkTransactionFinishStarted(@Nullable InternalTransaction transaction) { if (transaction != null && transaction.isFinishingOrFinished()) { throw new TransactionException(TX_ALREADY_FINISHED_ERR, format( - "Transaction is already finished () [txId={}, readOnly={}].", + "Transaction is already finished () [txId={}, readOnly={}, timeoutExceeded={}].", transaction.id(), - transaction.isReadOnly() + transaction.isReadOnly(), + transaction.isTimeoutExceeded() )); } } diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java index 92bffd0aeee..284772247e3 100644 --- a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxTimeoutOneNodeTest.java @@ -116,7 +116,7 @@ private static boolean timeoutExceeded(Table table, Transaction rwTx) { doPutOn(table, rwTx); return false; } catch (TransactionException ex) { - if (ex.getMessage().contains("timeout exceeded")) { + if (ex.getMessage().contains("timeoutExceeded=true")) { return true; } else { fail("Expected TX_TIMEOUT_EXCEEDED error code, but got: " + ex.code()); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java index bb7f5b19169..7acf34333e8 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java @@ -147,4 +147,8 @@ public interface InternalTransaction extends Transaction { default CompletableFuture rollbackTimeoutExceededAsync() { return rollbackAsync(); } + + default boolean isTimeoutExceeded() { + return false; + } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java index 928557c5382..a87700476eb 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java @@ -96,6 +96,6 @@ private CompletableFuture sendCursorCleanupCommand(ClusterNode node, Finis } void onTransactionFinished(UUID id) { - transactionInflights.markReadOnlyTxFinished(id); + transactionInflights.markReadOnlyTxFinished(id, false); } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java index 19580dad161..e66ed7f8ea0 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java @@ -58,6 +58,8 @@ public abstract class IgniteAbstractTransactionImpl implements InternalTransacti /** Transaction timeout. */ protected final long timeout; + protected boolean timeoutExceeded; + /** * The constructor. * @@ -157,4 +159,10 @@ private static Throwable tryToCopyExceptionWithCause(ExecutionException exceptio public long timeout() { return timeout; } + + /** {@inheritDoc} */ + @Override + public boolean isTimeoutExceeded() { + return timeoutExceeded; + } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java index 9f9420efc05..3f240584fe2 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java @@ -155,4 +155,9 @@ public T unwrap(Class classToUnwrap) { public CompletableFuture kill() { return transaction.kill(); } + + @Override + public boolean isTimeoutExceeded() { + return transaction.isTimeoutExceeded(); + } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java index 1544e7d77ed..de6b7d96144 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java @@ -143,7 +143,7 @@ public CompletableFuture finish(boolean commit, HybridTimestamp executionT txFuture.complete(null); - ((TxManagerImpl) txManager).completeReadOnlyTransactionFuture(new TxIdAndTimestamp(readTimestamp, id())); + ((TxManagerImpl) txManager).completeReadOnlyTransactionFuture(new TxIdAndTimestamp(readTimestamp, id()), timeoutExceeded); return txFuture; } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java index 50c46c875d7..209e903f1a6 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java @@ -159,7 +159,8 @@ public CompletableFuture rollbackAsync() { @Override public CompletableFuture rollbackTimeoutExceededAsync() { return TransactionsExceptionMapperUtil.convertToPublicFuture( - finish(false, null, false, true), + finish(false, null, false, true) + .thenAccept(unused -> timeoutExceeded = true), TX_ROLLBACK_ERR ); } @@ -213,6 +214,7 @@ private CompletableFuture finishInternal( if (isComplete) { finishFuture = nullCompletedFuture(); + this.timeoutExceeded = timeoutExceeded; } else { killed = true; } @@ -228,6 +230,7 @@ private CompletableFuture finishInternal( if (isComplete) { finishFuture = finishFutureInternal.handle((unused, throwable) -> null); + this.timeoutExceeded = timeoutExceeded; } else { killed = true; } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java index cd6f09c75e5..5c56e5023af 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java @@ -138,13 +138,13 @@ void cancelWaitingInflights(TablePartitionId groupId) { } } - void markReadOnlyTxFinished(UUID txId) { + void markReadOnlyTxFinished(UUID txId, boolean timeoutExceeded) { txCtxMap.compute(txId, (k, ctx) -> { if (ctx == null) { - ctx = new ReadOnlyTxContext(); + ctx = new ReadOnlyTxContext(timeoutExceeded); } - ctx.finishTx(null); + ctx.finishTx(null, timeoutExceeded); return ctx; }); @@ -153,12 +153,12 @@ void markReadOnlyTxFinished(UUID txId) { ReadWriteTxContext lockTxForNewUpdates(UUID txId, Map> enlistedGroups) { return (ReadWriteTxContext) txCtxMap.compute(txId, (uuid, tuple0) -> { if (tuple0 == null) { - tuple0 = new ReadWriteTxContext(placementDriver, clockService); // No writes enlisted. + tuple0 = new ReadWriteTxContext(placementDriver, clockService, false); // No writes enlisted. } assert !tuple0.isTxFinishing() : "Transaction is already finished [id=" + uuid + "]."; - tuple0.finishTx(enlistedGroups); + tuple0.finishTx(enlistedGroups, false); return tuple0; }); @@ -186,11 +186,13 @@ void removeInflight(UUID txId) { abstract void onInflightsRemoved(); - abstract void finishTx(@Nullable Map> enlistedGroups); + abstract void finishTx(@Nullable Map> enlistedGroups, boolean timeoutExceeded); abstract boolean isTxFinishing(); abstract boolean isReadyToFinish(); + + abstract boolean isTimeoutExceeded(); } /** @@ -202,6 +204,15 @@ void removeInflight(UUID txId) { */ private static class ReadOnlyTxContext extends TxContext { private volatile boolean markedFinished; + private volatile boolean timeoutExceeded; + + ReadOnlyTxContext() { + // No-op. + } + + ReadOnlyTxContext(boolean timeoutExceeded) { + this.timeoutExceeded = timeoutExceeded; + } @Override public void onInflightsRemoved() { @@ -209,7 +220,7 @@ public void onInflightsRemoved() { } @Override - public void finishTx(@Nullable Map> enlistedGroups) { + public void finishTx(@Nullable Map> enlistedGroups, boolean timeoutExceeded) { markedFinished = true; } @@ -223,6 +234,11 @@ public boolean isReadyToFinish() { return markedFinished && inflights == 0; } + @Override + boolean isTimeoutExceeded() { + return timeoutExceeded; + } + @Override public String toString() { return "ReadOnlyTxContext [inflights=" + inflights + ']'; @@ -235,10 +251,16 @@ static class ReadWriteTxContext extends TxContext { private volatile CompletableFuture finishInProgressFuture = null; private volatile Map> enlistedGroups; private ClockService clockService; + private volatile boolean timeoutExceeded; private ReadWriteTxContext(PlacementDriver placementDriver, ClockService clockService) { + this(placementDriver, clockService, false); + } + + private ReadWriteTxContext(PlacementDriver placementDriver, ClockService clockService, boolean timeoutExceeded) { this.placementDriver = placementDriver; this.clockService = clockService; + this.timeoutExceeded = timeoutExceeded; } CompletableFuture performFinish(boolean commit, Function> finishAction) { @@ -326,8 +348,9 @@ public void onInflightsRemoved() { } @Override - public void finishTx(Map> enlistedGroups) { + public void finishTx(Map> enlistedGroups, boolean timeoutExceeded) { this.enlistedGroups = enlistedGroups; + this.timeoutExceeded = timeoutExceeded; finishInProgressFuture = new CompletableFuture<>(); } @@ -341,6 +364,11 @@ public boolean isReadyToFinish() { return waitRepFut.isDone(); } + @Override + boolean isTimeoutExceeded() { + return timeoutExceeded; + } + @Override public String toString() { return "ReadWriteTxContext [inflights=" + inflights + ", waitRepFut=" + waitRepFut diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index ee00c8e1281..6204cd999bf 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -1014,12 +1014,12 @@ public CompletableFuture executeWriteIntentSwitchAsync(Runnable runnable) return runAsync(runnable, writeIntentSwitchPool); } - void completeReadOnlyTransactionFuture(TxIdAndTimestamp txIdAndTimestamp) { + void completeReadOnlyTransactionFuture(TxIdAndTimestamp txIdAndTimestamp, boolean timeoutExceeded) { finishedTxs.add(1); UUID txId = txIdAndTimestamp.getTxId(); - transactionInflights.markReadOnlyTxFinished(txId); + transactionInflights.markReadOnlyTxFinished(txId, timeoutExceeded); } @Override From 207db5f387b7028aa4433273ec749ddf2c598477 Mon Sep 17 00:00:00 2001 From: apakhomov Date: Tue, 18 Feb 2025 11:48:11 +0300 Subject: [PATCH 08/12] Fix checkstyle --- .../ignite/client/fakes/FakeTxManager.java | 9 ++++++-- .../sql/engine/framework/NoOpTransaction.java | 6 ++--- .../distributed/ItTxStateLocalMapTest.java | 5 ++-- .../internal/table/ItColocationTest.java | 1 + .../PartitionReplicaListenerTest.java | 23 +++++++++++-------- ...ransactionExpirationRegistryBenchmark.java | 4 +++- .../ignite/internal/tx/TxStateMeta.java | 11 ++++++++- .../internal/tx/impl/OrphanDetectorTest.java | 10 ++++---- .../tx/impl/ReadWriteTransactionImplTest.java | 4 ++-- ...ctionExpirationRegistryConcurrentTest.java | 4 +++- 10 files changed, 51 insertions(+), 26 deletions(-) diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java index ae1ed0cc9ea..9b865f46815 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java @@ -156,7 +156,9 @@ public boolean implicit() { } @Override - public CompletableFuture finish(boolean commit, HybridTimestamp executionTimestamp, boolean full) { + public CompletableFuture finish( + boolean commit, HybridTimestamp executionTimestamp, boolean full, boolean timeoutExceeded + ) { return nullCompletedFuture(); } @@ -202,6 +204,7 @@ public CompletableFuture finish( HybridTimestampTracker timestampTracker, TablePartitionId commitPartition, boolean commit, + boolean timeoutExceeded, Map> enlistedGroups, UUID txId ) { @@ -256,7 +259,9 @@ public int pending() { } @Override - public void finishFull(HybridTimestampTracker timestampTracker, UUID txId, HybridTimestamp ts, boolean commit) { + public void finishFull( + HybridTimestampTracker timestampTracker, UUID txId, HybridTimestamp ts, boolean commit, boolean timeoutExceeded + ) { // No-op. } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java index 68b11f78595..72b2f9fad20 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java @@ -101,7 +101,7 @@ public void commit() throws TransactionException { @Override public CompletableFuture commitAsync() { - return finish(true, nullableHybridTimestamp(NULL_HYBRID_TIMESTAMP), false); + return finish(true, nullableHybridTimestamp(NULL_HYBRID_TIMESTAMP), false, false); } @Override @@ -111,7 +111,7 @@ public void rollback() throws TransactionException { @Override public CompletableFuture rollbackAsync() { - return finish(false, nullableHybridTimestamp(NULL_HYBRID_TIMESTAMP), false); + return finish(false, nullableHybridTimestamp(NULL_HYBRID_TIMESTAMP), false, false); } @Override @@ -168,7 +168,7 @@ public boolean implicit() { } @Override - public CompletableFuture finish(boolean commit, HybridTimestamp executionTimestamp, boolean full) { + public CompletableFuture finish(boolean commit, HybridTimestamp executionTimestamp, boolean full, boolean timeoutExceeded) { CompletableFuture fut = commit ? commitFut : rollbackFut; fut.complete(null); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java index b775743c679..d59c90dad2e 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java @@ -145,13 +145,13 @@ private void testTransaction(Consumer touchOp, boolean checkAfterTo ReadWriteTransactionImpl tx = (ReadWriteTransactionImpl) testCluster.igniteTransactions().begin(); - checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, coordinatorId, tx.commitPartition(), null, null), List.of(0)); + checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, coordinatorId, tx.commitPartition(), null, null, null), List.of(0)); checkLocalTxStateOnNodes(tx.id(), null, IntStream.range(1, NODES).boxed().collect(toList())); touchOp.accept(tx); if (checkAfterTouch) { - checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, coordinatorId, tx.commitPartition(), null, null)); + checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, coordinatorId, tx.commitPartition(), null, null, null)); } if (commit) { @@ -167,6 +167,7 @@ private void testTransaction(Consumer touchOp, boolean checkAfterTo coordinatorId, tx.commitPartition(), commit ? testCluster.clockServices.get(coord.name()).now() : null, + null, null ) ); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java index 96340dd5300..96a3fef2e19 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java @@ -220,6 +220,7 @@ public CompletableFuture finish( HybridTimestampTracker observableTimestampTracker, TablePartitionId commitPartition, boolean commitIntent, + boolean timeoutExceeded, Map> enlistedGroups, UUID txId ) { diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index cfee05b83ec..bcbd5995ae6 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -735,11 +735,12 @@ public void testTxStateReplicaRequestEmptyState() throws Exception { localNode.id(), commitPartitionId, null, + null, null )); return nullCompletedFuture(); - }).when(txManager).finish(any(), any(), anyBoolean(), any(), any()); + }).when(txManager).finish(any(), any(), anyBoolean(), any(), any(), any()); CompletableFuture fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest() .groupId(tablePartitionIdMessage(grpId)) @@ -782,7 +783,7 @@ void testExecuteRequestOnFinishedTx(TxState txState, RequestType requestType) { UUID txId = newTxId(); txStateStorage.putForRebalance(txId, new TxMeta(txState, singletonList(grpId), null)); - txManager.updateTxMeta(txId, old -> new TxStateMeta(txState, null, null, null, null)); + txManager.updateTxMeta(txId, old -> new TxStateMeta(txState, null, null, null, null, null)); BinaryRow testRow = binaryRow(0); @@ -917,7 +918,7 @@ public void testReadOnlySingleRowReplicaRequestResolveWriteIntentCommitted() thr pkStorage().put(testBinaryRow, rowId); testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID); - txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, localNode.id(), commitPartitionId, clock.now(), null)); + txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, localNode.id(), commitPartitionId, clock.now(), null, null)); CompletableFuture fut = doReadOnlySingleGet(testBinaryKey); @@ -935,7 +936,7 @@ public void testReadOnlySingleRowReplicaRequestResolveWriteIntentPending() throw pkStorage().put(testBinaryRow, rowId); testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID); - txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, localNode.id(), commitPartitionId, null, null)); + txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, localNode.id(), commitPartitionId, null, null, null)); CompletableFuture fut = doReadOnlySingleGet(testBinaryKey); @@ -954,7 +955,7 @@ public void testReadOnlySingleRowReplicaRequestResolveWriteIntentAborted() throw pkStorage().put(testBinaryRow, rowId); testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID); - txManager.updateTxMeta(txId, old -> new TxStateMeta(ABORTED, localNode.id(), commitPartitionId, null, null)); + txManager.updateTxMeta(txId, old -> new TxStateMeta(ABORTED, localNode.id(), commitPartitionId, null, null, null)); CompletableFuture fut = doReadOnlySingleGet(testBinaryKey); @@ -1624,7 +1625,9 @@ private void testWriteIntentOnPrimaryReplica( // Imitation of tx commit. txStateStorage.putForRebalance(txId, new TxMeta(COMMITTED, new ArrayList<>(), now)); - txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, UUID.randomUUID(), commitPartitionId, now, null)); + txManager.updateTxMeta(txId, old -> new TxStateMeta( + COMMITTED, UUID.randomUUID(), commitPartitionId, now, null, null) + ); CompletableFuture replicaCleanupFut = partitionReplicaListener.invoke( TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest() @@ -2206,8 +2209,10 @@ private static void configureTxManager(TxManager txManager) { doAnswer(invocation -> nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class)); - doAnswer(invocation -> nullCompletedFuture()).when(txManager).finish(any(), any(), anyBoolean(), any(), any()); - doAnswer(invocation -> nullCompletedFuture()).when(txManager).cleanup(any(), anyString(), any()); + doAnswer(invocation -> nullCompletedFuture()) + .when(txManager).finish(any(), any(), anyBoolean(), any(), any(), any()); + doAnswer(invocation -> nullCompletedFuture()) + .when(txManager).cleanup(any(), anyString(), any()); } private void testWritesAreSuppliedWithRequiredCatalogVersion(RequestType requestType, RwListenerInvocation listenerInvocation) { @@ -2882,7 +2887,7 @@ private static ReadOnlyDirectMultiRowReplicaRequest readOnlyDirectMultiRowReplic private void cleanup(UUID txId) { HybridTimestamp commitTs = clock.now(); - txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, UUID.randomUUID(), commitPartitionId, commitTs, null)); + txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, UUID.randomUUID(), commitPartitionId, commitTs, null, null)); WriteIntentSwitchReplicaRequest message = TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) diff --git a/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java b/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java index 4602dc31c2c..e2591bdbd97 100644 --- a/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java +++ b/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java @@ -164,7 +164,9 @@ public boolean implicit() { } @Override - public CompletableFuture finish(boolean commit, @Nullable HybridTimestamp executionTimestamp, boolean full) { + public CompletableFuture finish( + boolean commit, @Nullable HybridTimestamp executionTimestamp, boolean full, boolean timeoutExceeded + ) { return null; } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java index fffc70368d2..fee07a17823 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java @@ -98,7 +98,16 @@ public TxStateMeta( @Nullable Long initialVacuumObservationTimestamp, @Nullable Boolean isFinishedDueToTimeout ) { - this(txState, txCoordinatorId, commitPartitionId, commitTimestamp, tx, initialVacuumObservationTimestamp, null, isFinishedDueToTimeout); + this( + txState, + txCoordinatorId, + commitPartitionId, + commitTimestamp, + tx, + initialVacuumObservationTimestamp, + null, + isFinishedDueToTimeout + ); } /** diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java index 3df5bac5af2..2b93e512206 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java @@ -184,7 +184,7 @@ void testNoTriggerCommittedState() { UUID concurrentTxId = idGenerator.transactionIdFor(clock.now()); - TxStateMeta committedState = new TxStateMeta(TxState.COMMITTED, LOCAL_NODE.id(), tpId, clock.now(), null); + TxStateMeta committedState = new TxStateMeta(TxState.COMMITTED, LOCAL_NODE.id(), tpId, clock.now(), null, null); txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> committedState); @@ -216,7 +216,7 @@ void testNoTriggerAbortedState() { UUID concurrentTxId = idGenerator.transactionIdFor(clock.now()); - TxStateMeta abortedState = new TxStateMeta(TxState.ABORTED, LOCAL_NODE.id(), tpId, null, null); + TxStateMeta abortedState = new TxStateMeta(TxState.ABORTED, LOCAL_NODE.id(), tpId, null, null, null); txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> abortedState); @@ -245,7 +245,7 @@ void testNoTriggerFinishingState() { UUID concurrentTxId = idGenerator.transactionIdFor(clock.now()); - TxStateMeta finishingState = new TxStateMeta(TxState.FINISHING, LOCAL_NODE.id(), tpId, null, null); + TxStateMeta finishingState = new TxStateMeta(TxState.FINISHING, LOCAL_NODE.id(), tpId, null, null, null); txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> finishingState); @@ -277,7 +277,7 @@ void testNoTriggerCoordinatorAlive() { UUID concurrentTxId = idGenerator.transactionIdFor(clock.now()); - TxStateMeta pendingState = new TxStateMeta(TxState.PENDING, LOCAL_NODE.id(), tpId, null, null); + TxStateMeta pendingState = new TxStateMeta(TxState.PENDING, LOCAL_NODE.id(), tpId, null, null, null); txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> pendingState); @@ -311,7 +311,7 @@ void testTriggerOnLockConflictCoordinatorDead() { UUID concurrentTxId = idGenerator.transactionIdFor(clock.now()); - TxStateMeta pendingState = new TxStateMeta(TxState.PENDING, LOCAL_NODE.id(), tpId, null, null); + TxStateMeta pendingState = new TxStateMeta(TxState.PENDING, LOCAL_NODE.id(), tpId, null, null, null); txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> pendingState); diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java index 57f836f9d62..6e5ed524feb 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java @@ -96,7 +96,7 @@ public void effectiveSchemaTimestampIsBeginTimestamp() { private void startTxAndTryToEnlist(boolean commit) { HashSet finishedTxs = new HashSet<>(); - Mockito.when(txManager.finish(any(), any(), anyBoolean(), any(), any())).thenAnswer(invocation -> { + Mockito.when(txManager.finish(any(), any(), anyBoolean(), any(), any(), any())).thenAnswer(invocation -> { finishedTxs.add(invocation.getArgument(4)); return nullCompletedFuture(); @@ -104,7 +104,7 @@ private void startTxAndTryToEnlist(boolean commit) { Mockito.when(txManager.stateMeta(any())).thenAnswer(invocation -> { if (finishedTxs.contains(invocation.getArgument(0))) { - return new TxStateMeta(txState, randomUUID(), TX_COMMIT_PART, null, null); + return new TxStateMeta(txState, randomUUID(), TX_COMMIT_PART, null, null, null); } return null; diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java index d4573f1fb9e..57bd766a8f3 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java @@ -229,7 +229,9 @@ public boolean implicit() { } @Override - public CompletableFuture finish(boolean commit, @Nullable HybridTimestamp executionTimestamp, boolean full) { + public CompletableFuture finish( + boolean commit, @Nullable HybridTimestamp executionTimestamp, boolean full, boolean timeoutExceeded + ) { return null; } From 96c4f9e9f42a9e72f4deac81103515b31fb6d6fe Mon Sep 17 00:00:00 2001 From: apakhomov Date: Tue, 18 Feb 2025 23:58:23 +0300 Subject: [PATCH 09/12] Merge main --- .../partition/replicator/ReplicaTxFinishMarker.java | 3 ++- .../partition/replicator/raft/RaftTxFinishMarker.java | 3 ++- .../replication/PartitionReplicaListenerTest.java | 4 ++-- .../java/org/apache/ignite/internal/tx/TxStateMeta.java | 1 - .../ignite/internal/tx/impl/TxCleanupRequestSender.java | 7 ++++--- .../internal/tx/impl/ReadWriteTransactionImplTest.java | 2 +- .../impl/TransactionExpirationRegistryConcurrentTest.java | 5 +++-- 7 files changed, 14 insertions(+), 11 deletions(-) diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTxFinishMarker.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTxFinishMarker.java index 6f3397c81e9..d74a8a30566 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTxFinishMarker.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTxFinishMarker.java @@ -55,7 +55,8 @@ public void markFinished(UUID txId, TxState txState, @Nullable HybridTimestamp c txState == COMMITTED ? commitTimestamp : null, old == null ? null : old.tx(), old == null ? null : old.initialVacuumObservationTimestamp(), - old == null ? null : old.cleanupCompletionTimestamp() + old == null ? null : old.cleanupCompletionTimestamp(), + old == null ? null : old.isFinishedDueToTimeout() )); } } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTxFinishMarker.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTxFinishMarker.java index fc2d867d00a..0f63b2245ee 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTxFinishMarker.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTxFinishMarker.java @@ -58,7 +58,8 @@ public void markFinished( commit ? commitTimestamp : null, old == null ? null : old.tx(), old == null ? null : old.initialVacuumObservationTimestamp(), - old == null ? null : old.cleanupCompletionTimestamp() + old == null ? null : old.cleanupCompletionTimestamp(), + old == null ? null : old.isFinishedDueToTimeout() )); } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index 92a7f9091d6..69a8b0fa399 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -754,7 +754,7 @@ public void testTxStateReplicaRequestEmptyState() throws Exception { )); return nullCompletedFuture(); - }).when(txManager).finish(any(), any(), anyBoolean(), any(), any(), any()); + }).when(txManager).finish(any(), any(), anyBoolean(), any(), any(), any(), any()); CompletableFuture fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest() .groupId(tablePartitionIdMessage(grpId)) @@ -2254,7 +2254,7 @@ private static void configureTxManager(TxManager txManager) { doAnswer(invocation -> nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class)); doAnswer(invocation -> nullCompletedFuture()) - .when(txManager).finish(any(), any(), anyBoolean(), any(), any(), any()); + .when(txManager).finish(any(), any(), anyBoolean(), any(), any(), any(), any()); doAnswer(invocation -> nullCompletedFuture()) .when(txManager).cleanup(any(), anyString(), any()); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java index fee07a17823..fbb634042c7 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java @@ -212,7 +212,6 @@ public TxStateMetaMessage toTransactionMetaMessage( .commitTimestamp(commitTimestamp) .initialVacuumObservationTimestamp(initialVacuumObservationTimestamp) .cleanupCompletionTimestamp(cleanupCompletionTimestamp) - .isFinishedDueToTimeout(isFinishedDueToTimeout) .build(); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java index 0d5aa6c2fe1..4b4f3c1e759 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java @@ -125,9 +125,10 @@ private void markTxnCleanupReplicated(UUID txId, TxState state, ReplicationGroup oldMeta == null ? null : oldMeta.tx(), oldMeta == null ? null : oldMeta.initialVacuumObservationTimestamp(), cleanupCompletionTimestamp, - oldMeta == null ? null : oldMeta.isFinishedDueToTimeout() - ) - );} + oldMeta == null ? null : oldMeta.isFinishedDueToTimeout() + ) + ); + } } /** diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java index 7edc00fbe40..93e360cd8ef 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java @@ -98,7 +98,7 @@ public void effectiveSchemaTimestampIsBeginTimestamp() { private void startTxAndTryToEnlist(boolean commit) { HashSet finishedTxs = new HashSet<>(); - Mockito.when(txManager.finish(any(), any(), anyBoolean(), any(), any(), any())).thenAnswer(invocation -> { + Mockito.when(txManager.finish(any(), any(), anyBoolean(), any(), any(), any(), any())).thenAnswer(invocation -> { finishedTxs.add(invocation.getArgument(5)); return nullCompletedFuture(); diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java index 57bd766a8f3..39224ebc6fe 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.tx.InternalTransaction; @@ -183,7 +184,7 @@ public UUID id() { } @Override - public IgniteBiTuple enlistedNodeAndConsistencyToken(TablePartitionId tablePartitionId) { + public IgniteBiTuple enlistedNodeAndConsistencyToken(ReplicationGroupId replicationGroupId) { return null; } @@ -203,7 +204,7 @@ public TablePartitionId commitPartition() { } @Override - public IgniteBiTuple enlist(TablePartitionId tablePartitionId, + public IgniteBiTuple enlist(ReplicationGroupId replicationGroupId, int tableId, IgniteBiTuple nodeAndConsistencyToken) { return null; } From 860716de222fc75eabaac820c77abde118c048c1 Mon Sep 17 00:00:00 2001 From: apakhomov Date: Wed, 19 Feb 2025 12:04:37 +0300 Subject: [PATCH 10/12] Fix test --- .../distributed/replication/PartitionReplicaListenerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index 69a8b0fa399..eb4a966dbbb 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -2254,7 +2254,7 @@ private static void configureTxManager(TxManager txManager) { doAnswer(invocation -> nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class)); doAnswer(invocation -> nullCompletedFuture()) - .when(txManager).finish(any(), any(), anyBoolean(), any(), any(), any(), any()); + .when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(), any(), any(), any()); doAnswer(invocation -> nullCompletedFuture()) .when(txManager).cleanup(any(), anyString(), any()); } From a0df56907d0f3d516d09ff48098ac0c1fe322d2e Mon Sep 17 00:00:00 2001 From: apakhomov Date: Thu, 20 Feb 2025 14:39:02 +0300 Subject: [PATCH 11/12] Rename methods and add javadoc --- .../ignite/client/fakes/FakeTxManager.java | 10 ++++++++++ .../sql/engine/framework/NoOpTransaction.java | 13 +++++++++++++ .../distributed/storage/InternalTableImpl.java | 4 ++-- .../TransactionExpirationRegistryBenchmark.java | 11 +++++++++++ .../ignite/internal/tx/InternalTransaction.java | 17 ++++++++++------- .../tx/impl/IgniteAbstractTransactionImpl.java | 2 +- .../tx/impl/PublicApiThreadingTransaction.java | 9 +++++++-- ...sactionExpirationRegistryConcurrentTest.java | 10 ++++++++++ 8 files changed, 64 insertions(+), 12 deletions(-) diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java index 4064a761731..a50564cb2fd 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java @@ -179,6 +179,16 @@ public long timeout() { public CompletableFuture kill() { return nullCompletedFuture(); } + + @Override + public CompletableFuture rollbackTimeoutExceededAsync() { + return nullCompletedFuture(); + } + + @Override + public boolean isRolledBackWithTimeoutExceeded() { + return false; + } }; } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java index 06d68b7bf05..51f8e1bd40e 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java @@ -53,6 +53,8 @@ public final class NoOpTransaction implements InternalTransaction { private final boolean readOnly; + private boolean isRolledBackWithTimeoutExceeded = false; + private final CompletableFuture commitFut = new CompletableFuture<>(); private final CompletableFuture rollbackFut = new CompletableFuture<>(); @@ -201,6 +203,17 @@ public CompletableFuture kill() { return rollbackAsync(); } + @Override + public CompletableFuture rollbackTimeoutExceededAsync() { + this.isRolledBackWithTimeoutExceeded = true; + return rollbackAsync(); + } + + @Override + public boolean isRolledBackWithTimeoutExceeded() { + return isRolledBackWithTimeoutExceeded; + } + /** Returns a {@link CompletableFuture} that completes when this transaction commits. */ public CompletableFuture commitFuture() { return commitFut; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index 73c2ce8ed74..97c390bd431 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -683,7 +683,7 @@ private CompletableFuture trackingInvoke( tableName, partId, tx.state(), - tx.isTimeoutExceeded() + tx.isRolledBackWithTimeoutExceeded() ))); } @@ -2346,7 +2346,7 @@ private void checkTransactionFinishStarted(@Nullable InternalTransaction transac "Transaction is already finished () [txId={}, readOnly={}, timeoutExceeded={}].", transaction.id(), transaction.isReadOnly(), - transaction.isTimeoutExceeded() + transaction.isRolledBackWithTimeoutExceeded() )); } } diff --git a/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java b/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java index cc7ac76cec5..b71c8f5f3b9 100644 --- a/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java +++ b/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java @@ -19,6 +19,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import java.util.ArrayList; import java.util.List; @@ -189,6 +190,16 @@ public CompletableFuture kill() { return null; } + @Override + public CompletableFuture rollbackTimeoutExceededAsync() { + return nullCompletedFuture(); + } + + @Override + public boolean isRolledBackWithTimeoutExceeded() { + return false; + } + @Override public void commit() throws TransactionException { diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java index a860362b5a3..b89247ef7f4 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java @@ -146,15 +146,18 @@ IgniteBiTuple enlist( CompletableFuture kill(); /** - * Rolls back the transaction if the timeout is exceeded. + * Rolls back the transaction due to timeout exceeded. After this method is called, + * {@link #isRolledBackWithTimeoutExceeded()} will return {@code true}. * * @return The future. */ - default CompletableFuture rollbackTimeoutExceededAsync() { - return rollbackAsync(); - } + CompletableFuture rollbackTimeoutExceededAsync(); - default boolean isTimeoutExceeded() { - return false; - } + /** + * Checks if the transaction was rolled back due to timeout exceeded. The only way to roll back a transaction due to timeout + * exceeded is to call {@link #rollbackTimeoutExceededAsync()}. + * + * @return {@code true} if the transaction was rolled back due to timeout exceeded, {@code false} otherwise. + */ + boolean isRolledBackWithTimeoutExceeded(); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java index e66ed7f8ea0..b9eb082f33c 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java @@ -162,7 +162,7 @@ public long timeout() { /** {@inheritDoc} */ @Override - public boolean isTimeoutExceeded() { + public boolean isRolledBackWithTimeoutExceeded() { return timeoutExceeded; } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java index 0aa0b5585f6..4b984ebc081 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java @@ -161,7 +161,12 @@ public CompletableFuture kill() { } @Override - public boolean isTimeoutExceeded() { - return transaction.isTimeoutExceeded(); + public CompletableFuture rollbackTimeoutExceededAsync() { + return transaction.rollbackAsync(); + } + + @Override + public boolean isRolledBackWithTimeoutExceeded() { + return transaction.isRolledBackWithTimeoutExceeded(); } } diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java index 39224ebc6fe..96bcb244614 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryConcurrentTest.java @@ -251,6 +251,16 @@ public CompletableFuture kill() { return null; } + @Override + public CompletableFuture rollbackTimeoutExceededAsync() { + return null; + } + + @Override + public boolean isRolledBackWithTimeoutExceeded() { + return false; + } + @Override public void commit() throws TransactionException { From 3fbd9d7d9d15f382eeb09704d0ff2f5d26e613d9 Mon Sep 17 00:00:00 2001 From: apakhomov Date: Thu, 20 Feb 2025 15:01:38 +0300 Subject: [PATCH 12/12] Check commit flag and timeoutExceeded flag can not be both true --- .../java/org/apache/ignite/internal/tx/InternalTransaction.java | 2 +- .../apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java | 1 + .../ignite/internal/tx/impl/ReadWriteTransactionImpl.java | 2 ++ 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java index b89247ef7f4..0b2b83f5ca8 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java @@ -118,7 +118,7 @@ IgniteBiTuple enlist( * @param executionTimestamp The timestamp is the time when a read-only transaction is applied to the remote node. The parameter * is not used for read-write transactions. * @param full Full state transaction marker. - * @param timeoutExceeded Timeout exceeded marker. + * @param timeoutExceeded Timeout exceeded flag (commit flag must be {@code false}). * @return The future. */ CompletableFuture finish(boolean commit, @Nullable HybridTimestamp executionTimestamp, boolean full, boolean timeoutExceeded); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java index 778ad311186..76d337f8665 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java @@ -136,6 +136,7 @@ public CompletableFuture rollbackTimeoutExceededAsync() { @Override public CompletableFuture finish(boolean commit, HybridTimestamp executionTimestamp, boolean full, boolean timeoutExceeded) { assert !full : "Read-only transactions cannot be full."; + assert !(commit && timeoutExceeded) : "Transaction cannot commit with timeout exceeded."; if (!finishGuard.compareAndSet(false, true)) { return nullCompletedFuture(); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java index 1a203741c84..b26da1fdf1a 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java @@ -176,6 +176,8 @@ public CompletableFuture rollbackTimeoutExceededAsync() { public CompletableFuture finish( boolean commit, @Nullable HybridTimestamp executionTimestamp, boolean full, boolean timeoutExceeded ) { + assert !(commit && timeoutExceeded) : "Transaction cannot commit with timeout exceeded."; + if (finishFuture != null) { return finishFuture; }