From 36b119e37f8f3511148654cee00a62ef1d41c820 Mon Sep 17 00:00:00 2001 From: Alexander Polovtcev Date: Tue, 23 Jul 2024 17:04:03 +0300 Subject: [PATCH] IGNITE-22797 Fix estimated size for insert-after-delete scenario (#4125) --- .../AbstractMvPartitionStorageTest.java | 35 +++++++++++ .../storage/impl/TestMvPartitionStorage.java | 12 +++- .../mv/AddWriteCommittedInvokeClosure.java | 21 ++++--- .../mv/CommitWriteInvokeClosure.java | 58 +++++++++++-------- .../RocksDbMvPartitionStorageTest.java | 12 ++++ 5 files changed, 103 insertions(+), 35 deletions(-) diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java index 92adba279a1..32e168a959a 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java @@ -1490,6 +1490,41 @@ public void estimatedSizeNeverFallsBelowZeroUsingCommitWrite() { assertThat(storage.estimatedSize(), is(0L)); } + @Test + public void estimatedSizeIncreasedAfterTombstoneUsingWriteCommitted() { + addWriteCommitted(ROW_ID, binaryRow, clock.now()); + + assertThat(storage.estimatedSize(), is(1L)); + + addWriteCommitted(ROW_ID, null, clock.now()); + + assertThat(storage.estimatedSize(), is(0L)); + + addWriteCommitted(ROW_ID, binaryRow, clock.now()); + + assertThat(storage.estimatedSize(), is(1L)); + } + + @Test + public void estimatedSizeIncreasedAfterTombstoneUsingCommiteWrite() { + UUID txId = UUID.randomUUID(); + + addWrite(ROW_ID, binaryRow, txId); + commitWrite(ROW_ID, clock.now()); + + assertThat(storage.estimatedSize(), is(1L)); + + addWrite(ROW_ID, null, txId); + commitWrite(ROW_ID, clock.now()); + + assertThat(storage.estimatedSize(), is(0L)); + + addWrite(ROW_ID, binaryRow, txId); + commitWrite(ROW_ID, clock.now()); + + assertThat(storage.estimatedSize(), is(1L)); + } + @Test public void estimatedSizeShowsLatestRowsNumberUsingWriteCommited() { assertThat(storage.estimatedSize(), is(0L)); diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java index 058b8574b3b..b3a27210749 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java @@ -320,13 +320,19 @@ public synchronized void addWriteCommitted( private @Nullable VersionChain resolveCommittedVersionChain(VersionChain committedVersionChain) { VersionChain nextChain = committedVersionChain.next; + boolean isNewValueTombstone = committedVersionChain.row == null; + if (nextChain != null) { - if (committedVersionChain.row == null) { - if (nextChain.row == null) { + boolean isOldValueTombstone = nextChain.row == null; + + if (isOldValueTombstone) { + if (isNewValueTombstone) { // Avoid creating tombstones for tombstones. return nextChain; } + ESTIMATED_SIZE_UPDATER.incrementAndGet(this); + } else if (isNewValueTombstone) { ESTIMATED_SIZE_UPDATER.decrementAndGet(this); } @@ -334,7 +340,7 @@ public synchronized void addWriteCommitted( // several times, the same tuple will be inserted into the GC queue (timestamp and rowId don't change in this case). gcQueue.add(committedVersionChain); } else { - if (committedVersionChain.row == null) { + if (isNewValueTombstone) { // If there is only one version, and it is a tombstone, then remove the chain. return null; } diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java index a2d4ebf103e..ef60f805577 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java @@ -63,6 +63,9 @@ class AddWriteCommittedInvokeClosure implements InvokeClosure { */ private long rowLinkForAddToGcQueue = NULL_LINK; + @Nullable + private RowVersion prevRowVersion; + AddWriteCommittedInvokeClosure( RowId rowId, @Nullable BinaryRow row, @@ -97,10 +100,10 @@ public void call(@Nullable VersionChain oldRow) throws IgniteInternalCheckedExce newRow = VersionChain.createCommitted(rowId, newVersion.link(), newVersion.nextLink()); } else { - RowVersion current = storage.readRowVersion(oldRow.headLink(), DONT_LOAD_VALUE); + prevRowVersion = storage.readRowVersion(oldRow.headLink(), DONT_LOAD_VALUE); // If the current and new version are tombstones, then there is no need to add a new version. - if (current.isTombstone() && row == null) { + if (prevRowVersion.isTombstone() && row == null) { operationType = OperationType.NOOP; } else { operationType = OperationType.PUT; @@ -145,12 +148,14 @@ void afterCompletion() { } if (operationType == OperationType.PUT) { - if (row == null) { - storage.decrementEstimatedSize(); - } else if (rowLinkForAddToGcQueue == NULL_LINK) { - // Checking for NULL_LINK allows us to distinguish if a new version chain was created or not. In other words if this is - // an insert or an update to an existing row. - storage.incrementEstimatedSize(); + if (prevRowVersion == null || prevRowVersion.isTombstone()) { + if (row != null) { + storage.incrementEstimatedSize(); + } + } else { + if (row == null) { + storage.decrementEstimatedSize(); + } } } } diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java index 2a7565da3c2..e52cb498cba 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java @@ -73,10 +73,11 @@ class CommitWriteInvokeClosure implements InvokeClosure { private final UpdateTimestampHandler updateTimestampHandler; - /** - * Flag indicating that we are committing a tombstone. - */ - private boolean isCurrentRowTombstone = false; + @Nullable + private RowVersion currentRowVersion; + + @Nullable + private RowVersion prevRowVersion; CommitWriteInvokeClosure( RowId rowId, @@ -129,30 +130,33 @@ public void call(@Nullable VersionChain oldRow) throws IgniteInternalCheckedExce operationType = OperationType.PUT; - RowVersion current = storage.readRowVersion(oldRow.headLink(), DONT_LOAD_VALUE); - RowVersion next = oldRow.hasNextLink() ? storage.readRowVersion(oldRow.nextLink(), DONT_LOAD_VALUE) : null; + currentRowVersion = storage.readRowVersion(oldRow.headLink(), DONT_LOAD_VALUE); - isCurrentRowTombstone = current.isTombstone(); + assert currentRowVersion != null; - if (next == null && isCurrentRowTombstone) { + prevRowVersion = oldRow.hasNextLink() ? storage.readRowVersion(oldRow.nextLink(), DONT_LOAD_VALUE) : null; + + if (prevRowVersion == null && currentRowVersion.isTombstone()) { // If there is only one version, and it is a tombstone, then remove the chain. operationType = OperationType.REMOVE; return; } + boolean isPreviousRowTombstone = prevRowVersion != null && prevRowVersion.isTombstone(); + // If the previous and current version are tombstones, then delete the current version. - if (next != null && isCurrentRowTombstone && next.isTombstone()) { - toRemove = current; + if (isPreviousRowTombstone && currentRowVersion.isTombstone()) { + toRemove = currentRowVersion; - newRow = VersionChain.createCommitted(oldRow.rowId(), next.link(), next.nextLink()); + newRow = VersionChain.createCommitted(rowId, prevRowVersion.link(), prevRowVersion.nextLink()); } else { - updateTimestampLink = oldRow.headLink(); + updateTimestampLink = currentRowVersion.link(); - newRow = VersionChain.createCommitted(oldRow.rowId(), oldRow.headLink(), oldRow.nextLink()); + newRow = VersionChain.createCommitted(rowId, currentRowVersion.link(), currentRowVersion.nextLink()); - if (oldRow.hasNextLink()) { - rowLinkForAddToGcQueue = oldRow.headLink(); + if (currentRowVersion.hasNextLink()) { + rowLinkForAddToGcQueue = currentRowVersion.link(); } } } @@ -194,6 +198,12 @@ public void onUpdate() { void afterCompletion() { assert operationType == OperationType.PUT || toRemove == null : "toRemove=" + toRemove + ", op=" + operationType; + if (operationType == OperationType.NOOP) { + return; + } + + assert currentRowVersion != null; + if (toRemove != null) { storage.removeRowVersion(toRemove); } @@ -202,15 +212,15 @@ void afterCompletion() { gcQueue.add(rowId, timestamp, rowLinkForAddToGcQueue); } - // We need to check the "toRemove" field in order to avoid a situation when we are committing a tombstone - // over an existing tombstone. - if (operationType == OperationType.PUT && toRemove == null) { - if (isCurrentRowTombstone) { - storage.decrementEstimatedSize(); - } else if (rowLinkForAddToGcQueue == NULL_LINK) { - // Checking for NULL_LINK allows us to distinguish if a new version chain was created or not. In other words if this is - // an insert or an update to an existing row. - storage.incrementEstimatedSize(); + if (operationType == OperationType.PUT) { + if (prevRowVersion == null || prevRowVersion.isTombstone()) { + if (!currentRowVersion.isTombstone()) { + storage.incrementEstimatedSize(); + } + } else { + if (currentRowVersion.isTombstone()) { + storage.decrementEstimatedSize(); + } } } } diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java index 8648722bbe6..0364ac9dc1d 100644 --- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java +++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java @@ -108,6 +108,18 @@ public void estimatedSizeNeverFallsBelowZeroUsingCommitWrite() { super.estimatedSizeNeverFallsBelowZeroUsingCommitWrite(); } + @Disabled("https://issues.apache.org/jira/browse/IGNITE-22617") + @Override + public void estimatedSizeIncreasedAfterTombstoneUsingWriteCommitted() { + super.estimatedSizeIncreasedAfterTombstoneUsingWriteCommitted(); + } + + @Disabled("https://issues.apache.org/jira/browse/IGNITE-22617") + @Override + public void estimatedSizeIncreasedAfterTombstoneUsingCommiteWrite() { + super.estimatedSizeIncreasedAfterTombstoneUsingCommiteWrite(); + } + @Disabled("https://issues.apache.org/jira/browse/IGNITE-22617") @Override public void estimatedSizeShowsLatestRowsNumberUsingWriteCommited() {