Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24244: Support timeout in RW transactions #5209

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,14 @@ static CompletableFuture<ClientTransaction> beginAsync(
@Nullable String preferredNodeName,
@Nullable TransactionOptions options,
long observableTimestamp) {
if (options != null && options.timeoutMillis() != 0 && !options.readOnly()) {
// TODO: IGNITE-16193
throw new UnsupportedOperationException("Timeouts are not supported yet for RW transactions");
}

boolean readOnly = options != null && options.readOnly();
long timeout = options == null ? 0 : options.timeoutMillis();

return ch.serviceAsync(
ClientOp.TX_BEGIN,
w -> {
w.out().packBoolean(readOnly);
w.out().packLong(options == null ? 0 : options.timeoutMillis());
w.out().packLong(timeout);
w.out().packLong(observableTimestamp);
},
r -> readTx(r, readOnly),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ public boolean implicit() {
}

@Override
public CompletableFuture<Void> finish(boolean commit, HybridTimestamp executionTimestamp, boolean full) {
public CompletableFuture<Void> finish(
boolean commit, HybridTimestamp executionTimestamp, boolean full, boolean timeoutExceeded
) {
return nullCompletedFuture();
}

Expand Down Expand Up @@ -205,6 +207,7 @@ public CompletableFuture<Void> finish(
HybridTimestampTracker timestampTracker,
TablePartitionId commitPartition,
boolean commit,
boolean timeoutExceeded,
Map<ReplicationGroupId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups,
Set<Integer> enlistedTableIds,
UUID txId
Expand Down Expand Up @@ -260,7 +263,9 @@ public int pending() {
}

@Override
public void finishFull(HybridTimestampTracker timestampTracker, UUID txId, HybridTimestamp ts, boolean commit) {
public void finishFull(
HybridTimestampTracker timestampTracker, UUID txId, HybridTimestamp ts, boolean commit, boolean timeoutExceeded
) {
// No-op.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public void markFinished(UUID txId, TxState txState, @Nullable HybridTimestamp c
txState == COMMITTED ? commitTimestamp : null,
old == null ? null : old.tx(),
old == null ? null : old.initialVacuumObservationTimestamp(),
old == null ? null : old.cleanupCompletionTimestamp()
old == null ? null : old.cleanupCompletionTimestamp(),
old == null ? null : old.isFinishedDueToTimeout()
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public void markFinished(
commit ? commitTimestamp : null,
old == null ? null : old.tx(),
old == null ? null : old.initialVacuumObservationTimestamp(),
old == null ? null : old.cleanupCompletionTimestamp()
old == null ? null : old.cleanupCompletionTimestamp(),
old == null ? null : old.isFinishedDueToTimeout()
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.sql.type.SqlTypeName;
Expand Down Expand Up @@ -147,7 +148,6 @@ public void batchWithConflictShouldBeRejectedEntirely() {
() -> sql("INSERT INTO test VALUES (0, 0), (1, 1), (2, 2)")
);


assertQuery("SELECT count(*) FROM test")
.returns(1L)
.check();
Expand Down Expand Up @@ -214,7 +214,7 @@ public void testNullDefault() {
assertQuery("SELECT col FROM test_null_def WHERE id = 2").returns(null).check();
}

/**Test full MERGE command. */
/** Test full MERGE command. */
@Test
public void testMerge() {
clearAndPopulateMergeTable1();
Expand Down Expand Up @@ -373,9 +373,12 @@ public void testMergeBatch() {

assertQuery("SELECT count(*) FROM test2 WHERE b = 0").returns(10_000L).check();

sql("MERGE INTO test2 dst USING test1 src ON dst.a = src.a"
var longerTimeoutOptions = new TransactionOptions().readOnly(false).timeoutMillis(TimeUnit.MINUTES.toMillis(2));
var tx = igniteTx().begin(longerTimeoutOptions);
sql(tx, "MERGE INTO test2 dst USING test1 src ON dst.a = src.a"
+ " WHEN MATCHED THEN UPDATE SET b = 1 "
+ " WHEN NOT MATCHED THEN INSERT (key, a, b) VALUES (src.key, src.a, 2)");
tx.commit();

assertQuery("SELECT count(*) FROM test2 WHERE b = 0").returns(5_000L).check();
assertQuery("SELECT count(*) FROM test2 WHERE b = 1").returns(5_000L).check();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void commit() throws TransactionException {

@Override
public CompletableFuture<Void> commitAsync() {
return finish(true, nullableHybridTimestamp(NULL_HYBRID_TIMESTAMP), false);
return finish(true, nullableHybridTimestamp(NULL_HYBRID_TIMESTAMP), false, false);
}

@Override
Expand All @@ -112,7 +112,7 @@ public void rollback() throws TransactionException {

@Override
public CompletableFuture<Void> rollbackAsync() {
return finish(false, nullableHybridTimestamp(NULL_HYBRID_TIMESTAMP), false);
return finish(false, nullableHybridTimestamp(NULL_HYBRID_TIMESTAMP), false, false);
}

@Override
Expand Down Expand Up @@ -169,7 +169,7 @@ public boolean implicit() {
}

@Override
public CompletableFuture<Void> finish(boolean commit, HybridTimestamp executionTimestamp, boolean full) {
public CompletableFuture<Void> finish(boolean commit, HybridTimestamp executionTimestamp, boolean full, boolean timeoutExceeded) {
CompletableFuture<Void> fut = commit ? commitFut : rollbackFut;

fut.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ private void testTransaction(Consumer<Transaction> touchOp, boolean checkAfterTo

ReadWriteTransactionImpl tx = (ReadWriteTransactionImpl) testCluster.igniteTransactions().begin();

checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, coordinatorId, tx.commitPartition(), null, null), List.of(0));
checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, coordinatorId, tx.commitPartition(), null, null, null), List.of(0));
checkLocalTxStateOnNodes(tx.id(), null, IntStream.range(1, NODES).boxed().collect(toList()));

touchOp.accept(tx);

if (checkAfterTouch) {
checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, coordinatorId, tx.commitPartition(), null, null));
checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, coordinatorId, tx.commitPartition(), null, null, null));
}

if (commit) {
Expand All @@ -167,6 +167,7 @@ private void testTransaction(Consumer<Transaction> touchOp, boolean checkAfterTo
coordinatorId,
tx.commitPartition(),
commit ? testCluster.clockServices.get(coord.name()).now() : null,
null,
null
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public CompletableFuture<Void> finish(
HybridTimestampTracker observableTimestampTracker,
TablePartitionId commitPartition,
boolean commitIntent,
boolean timeoutExceeded,
Map<ReplicationGroupId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups,
Set<Integer> enlistedTableIds,
UUID txId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,8 @@ private void replicaTouch(UUID txId, UUID txCoordinatorId, HybridTimestamp commi
txCoordinatorId,
old == null ? null : old.commitPartitionId(),
full ? commitTimestamp : null,
old == null ? null : old.tx()
old == null ? null : old.tx(),
old == null ? null : old.isFinishedDueToTimeout()
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,8 @@ private CompletableFuture<?> processRequest(ReplicaRequest request, @Nullable Bo
req.coordinatorId(),
req.commitPartitionId().asTablePartitionId(),
null,
old == null ? null : old.tx()
old == null ? null : old.tx(),
old == null ? null : old.isFinishedDueToTimeout()
));
}
}
Expand Down Expand Up @@ -725,6 +726,7 @@ private CompletableFuture<Void> triggerTxRecovery(UUID txId, UUID senderId) {
// Tx recovery is executed on the commit partition.
replicationGroupId,
false,
false,
// Enlistment consistency token is not required for the rollback, so it is 0L.
Map.of(replicationGroupId, new IgniteBiTuple<>(clusterNodeResolver.getById(senderId), 0L)),
Set.of(replicationGroupId.tableId()),
Expand Down Expand Up @@ -842,7 +844,8 @@ private CompletableFuture<?> processOperationRequest(
req.coordinatorId(),
req.commitPartitionId().asTablePartitionId(),
null,
old == null ? null : old.tx()
old == null ? null : old.tx(),
old == null ? null : old.isFinishedDueToTimeout()
));

var opId = new OperationId(senderId, req.timestamp().longValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ private <R> CompletableFuture<R> trackingInvoke(
assert hasError || r instanceof TimestampAware;

// Timestamp is set to commit timestamp for full transactions.
tx.finish(!hasError, hasError ? null : ((TimestampAware) r).timestamp(), true);
tx.finish(!hasError, hasError ? null : ((TimestampAware) r).timestamp(), true, false);

if (e != null) {
sneakyThrow(e);
Expand All @@ -679,10 +679,11 @@ private <R> CompletableFuture<R> trackingInvoke(
if (!transactionInflights.addInflight(tx.id(), false)) {
return failedFuture(
new TransactionException(TX_ALREADY_FINISHED_ERR, format(
"Transaction is already finished [tableName={}, partId={}, txState={}].",
"Transaction is already finished [tableName={}, partId={}, txState={}, timeoutExceeded={}].",
tableName,
partId,
tx.state()
tx.state(),
tx.isTimeoutExceeded()
)));
}

Expand Down Expand Up @@ -880,7 +881,7 @@ private <R> CompletableFuture<R> sendReadOnlyToPrimaryReplica(
private <R> CompletableFuture<R> postEvaluate(CompletableFuture<R> fut, InternalTransaction tx) {
return fut.handle((BiFunction<R, Throwable, CompletableFuture<R>>) (r, e) -> {
if (e != null) {
return tx.finish(false, clockService.current(), false)
return tx.finish(false, clockService.current(), false, false)
.handle((ignored, err) -> {
if (err != null) {
e.addSuppressed(err);
Expand All @@ -891,7 +892,7 @@ private <R> CompletableFuture<R> postEvaluate(CompletableFuture<R> fut, Internal
}); // Preserve failed state.
}

return tx.finish(true, clockService.current(), false).thenApply(ignored -> r);
return tx.finish(true, clockService.current(), false, false).thenApply(ignored -> r);
}).thenCompose(identity());
}

Expand Down Expand Up @@ -2342,9 +2343,10 @@ private static long enlistmentConsistencyToken(ReplicaMeta replicaMeta) {
private void checkTransactionFinishStarted(@Nullable InternalTransaction transaction) {
if (transaction != null && transaction.isFinishingOrFinished()) {
throw new TransactionException(TX_ALREADY_FINISHED_ERR, format(
"Transaction is already finished () [txId={}, readOnly={}].",
"Transaction is already finished () [txId={}, readOnly={}, timeoutExceeded={}].",
transaction.id(),
transaction.isReadOnly()
transaction.isReadOnly(),
transaction.isTimeoutExceeded()
));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,11 +749,12 @@ public void testTxStateReplicaRequestEmptyState() throws Exception {
localNode.id(),
commitPartitionId,
null,
null,
null
));

return nullCompletedFuture();
}).when(txManager).finish(any(), any(), anyBoolean(), any(), any(), any());
}).when(txManager).finish(any(), any(), anyBoolean(), any(), any(), any(), any());

CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
.groupId(tablePartitionIdMessage(grpId))
Expand Down Expand Up @@ -796,7 +797,7 @@ void testExecuteRequestOnFinishedTx(TxState txState, RequestType requestType) {
UUID txId = newTxId();

txStateStorage.putForRebalance(txId, new TxMeta(txState, singletonList(grpId), null));
txManager.updateTxMeta(txId, old -> new TxStateMeta(txState, null, null, null, null));
txManager.updateTxMeta(txId, old -> new TxStateMeta(txState, null, null, null, null, null));

BinaryRow testRow = binaryRow(0);

Expand Down Expand Up @@ -931,7 +932,7 @@ public void testReadOnlySingleRowReplicaRequestResolveWriteIntentCommitted() thr

pkStorage().put(testBinaryRow, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, localNode.id(), commitPartitionId, clock.now(), null));
txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, localNode.id(), commitPartitionId, clock.now(), null, null));

CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey);

Expand All @@ -949,7 +950,7 @@ public void testReadOnlySingleRowReplicaRequestResolveWriteIntentPending() throw

pkStorage().put(testBinaryRow, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, localNode.id(), commitPartitionId, null, null));
txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, localNode.id(), commitPartitionId, null, null, null));

CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey);

Expand All @@ -968,7 +969,7 @@ public void testReadOnlySingleRowReplicaRequestResolveWriteIntentAborted() throw

pkStorage().put(testBinaryRow, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(ABORTED, localNode.id(), commitPartitionId, null, null));
txManager.updateTxMeta(txId, old -> new TxStateMeta(ABORTED, localNode.id(), commitPartitionId, null, null, null));

CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey);

Expand Down Expand Up @@ -1661,7 +1662,9 @@ private void testWriteIntentOnPrimaryReplica(

// Imitation of tx commit.
txStateStorage.putForRebalance(txId, new TxMeta(COMMITTED, new ArrayList<>(), now));
txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, UUID.randomUUID(), commitPartitionId, now, null));
txManager.updateTxMeta(txId, old -> new TxStateMeta(
COMMITTED, UUID.randomUUID(), commitPartitionId, now, null, null)
);

CompletableFuture<?> replicaCleanupFut = partitionReplicaListener.invoke(
TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest()
Expand Down Expand Up @@ -2250,8 +2253,10 @@ private static void configureTxManager(TxManager txManager) {

doAnswer(invocation -> nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class));

doAnswer(invocation -> nullCompletedFuture()).when(txManager).finish(any(), any(), anyBoolean(), any(), any(), any());
doAnswer(invocation -> nullCompletedFuture()).when(txManager).cleanup(any(), anyString(), any());
doAnswer(invocation -> nullCompletedFuture())
.when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(), any(), any(), any());
doAnswer(invocation -> nullCompletedFuture())
.when(txManager).cleanup(any(), anyString(), any());
}

private void testWritesAreSuppliedWithRequiredCatalogVersion(RequestType requestType, RwListenerInvocation listenerInvocation) {
Expand Down Expand Up @@ -2931,7 +2936,7 @@ private void cleanup(UUID txId) {
private void cleanup(UUID txId, boolean commit) {
HybridTimestamp commitTs = clock.now();

txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, UUID.randomUUID(), commitPartitionId, commitTs, null));
txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, UUID.randomUUID(), commitPartitionId, commitTs, null, null));

WriteIntentSwitchReplicaRequest message = TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2172,7 +2172,8 @@ public void testWriteIntentResolutionFallbackToCommitPartitionPath() {
new UUID(1, 2),
old.commitPartitionId(),
old.commitTimestamp(),
old == null ? null : old.tx()
old == null ? null : old.tx(),
old == null ? null : old.isFinishedDueToTimeout()
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.ignite.internal.tx.readonly;
package org.apache.ignite.internal.tx;

import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;

Expand All @@ -24,12 +24,11 @@
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.tx.impl.ReadOnlyTransactionImpl;
import org.apache.ignite.tx.Transaction;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

class ItClientReadOnlyTxTimeoutOneNodeTest extends ItReadOnlyTxTimeoutOneNodeTest {
class ItClientTxTimeoutOneNodeTest extends ItTxTimeoutOneNodeTest {
private IgniteClient client;

@BeforeEach
Expand All @@ -52,12 +51,12 @@ Ignite ignite() {
}

@Override
ReadOnlyTransactionImpl transactionImpl(Transaction tx) {
InternalTransaction toInternalTransaction(Transaction tx) {
long txId = ClientLazyTransaction.get(tx).startedTx().id();

ClientResourceRegistry resources = unwrapIgniteImpl(cluster.aliveNode()).clientInboundMessageHandler().resources();
try {
return resources.get(txId).get(ReadOnlyTransactionImpl.class);
return resources.get(txId).get(InternalTransaction.class);
} catch (IgniteInternalCheckedException e) {
throw new RuntimeException(e);
}
Expand Down
Loading