Skip to content

Commit

Permalink
IGNITE-22797 Fix estimated size for insert-after-delete scenario (#4125)
Browse files Browse the repository at this point in the history
  • Loading branch information
sashapolo authored Jul 23, 2024
1 parent 5d65a00 commit 36b119e
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1490,6 +1490,41 @@ public void estimatedSizeNeverFallsBelowZeroUsingCommitWrite() {
assertThat(storage.estimatedSize(), is(0L));
}

@Test
public void estimatedSizeIncreasedAfterTombstoneUsingWriteCommitted() {
addWriteCommitted(ROW_ID, binaryRow, clock.now());

assertThat(storage.estimatedSize(), is(1L));

addWriteCommitted(ROW_ID, null, clock.now());

assertThat(storage.estimatedSize(), is(0L));

addWriteCommitted(ROW_ID, binaryRow, clock.now());

assertThat(storage.estimatedSize(), is(1L));
}

@Test
public void estimatedSizeIncreasedAfterTombstoneUsingCommiteWrite() {
UUID txId = UUID.randomUUID();

addWrite(ROW_ID, binaryRow, txId);
commitWrite(ROW_ID, clock.now());

assertThat(storage.estimatedSize(), is(1L));

addWrite(ROW_ID, null, txId);
commitWrite(ROW_ID, clock.now());

assertThat(storage.estimatedSize(), is(0L));

addWrite(ROW_ID, binaryRow, txId);
commitWrite(ROW_ID, clock.now());

assertThat(storage.estimatedSize(), is(1L));
}

@Test
public void estimatedSizeShowsLatestRowsNumberUsingWriteCommited() {
assertThat(storage.estimatedSize(), is(0L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,21 +320,27 @@ public synchronized void addWriteCommitted(
private @Nullable VersionChain resolveCommittedVersionChain(VersionChain committedVersionChain) {
VersionChain nextChain = committedVersionChain.next;

boolean isNewValueTombstone = committedVersionChain.row == null;

if (nextChain != null) {
if (committedVersionChain.row == null) {
if (nextChain.row == null) {
boolean isOldValueTombstone = nextChain.row == null;

if (isOldValueTombstone) {
if (isNewValueTombstone) {
// Avoid creating tombstones for tombstones.
return nextChain;
}

ESTIMATED_SIZE_UPDATER.incrementAndGet(this);
} else if (isNewValueTombstone) {
ESTIMATED_SIZE_UPDATER.decrementAndGet(this);
}

// Calling it from the compute is fine. Concurrent writes of the same row are impossible, and if we call the compute closure
// several times, the same tuple will be inserted into the GC queue (timestamp and rowId don't change in this case).
gcQueue.add(committedVersionChain);
} else {
if (committedVersionChain.row == null) {
if (isNewValueTombstone) {
// If there is only one version, and it is a tombstone, then remove the chain.
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ class AddWriteCommittedInvokeClosure implements InvokeClosure<VersionChain> {
*/
private long rowLinkForAddToGcQueue = NULL_LINK;

@Nullable
private RowVersion prevRowVersion;

AddWriteCommittedInvokeClosure(
RowId rowId,
@Nullable BinaryRow row,
Expand Down Expand Up @@ -97,10 +100,10 @@ public void call(@Nullable VersionChain oldRow) throws IgniteInternalCheckedExce

newRow = VersionChain.createCommitted(rowId, newVersion.link(), newVersion.nextLink());
} else {
RowVersion current = storage.readRowVersion(oldRow.headLink(), DONT_LOAD_VALUE);
prevRowVersion = storage.readRowVersion(oldRow.headLink(), DONT_LOAD_VALUE);

// If the current and new version are tombstones, then there is no need to add a new version.
if (current.isTombstone() && row == null) {
if (prevRowVersion.isTombstone() && row == null) {
operationType = OperationType.NOOP;
} else {
operationType = OperationType.PUT;
Expand Down Expand Up @@ -145,12 +148,14 @@ void afterCompletion() {
}

if (operationType == OperationType.PUT) {
if (row == null) {
storage.decrementEstimatedSize();
} else if (rowLinkForAddToGcQueue == NULL_LINK) {
// Checking for NULL_LINK allows us to distinguish if a new version chain was created or not. In other words if this is
// an insert or an update to an existing row.
storage.incrementEstimatedSize();
if (prevRowVersion == null || prevRowVersion.isTombstone()) {
if (row != null) {
storage.incrementEstimatedSize();
}
} else {
if (row == null) {
storage.decrementEstimatedSize();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ class CommitWriteInvokeClosure implements InvokeClosure<VersionChain> {

private final UpdateTimestampHandler updateTimestampHandler;

/**
* Flag indicating that we are committing a tombstone.
*/
private boolean isCurrentRowTombstone = false;
@Nullable
private RowVersion currentRowVersion;

@Nullable
private RowVersion prevRowVersion;

CommitWriteInvokeClosure(
RowId rowId,
Expand Down Expand Up @@ -129,30 +130,33 @@ public void call(@Nullable VersionChain oldRow) throws IgniteInternalCheckedExce

operationType = OperationType.PUT;

RowVersion current = storage.readRowVersion(oldRow.headLink(), DONT_LOAD_VALUE);
RowVersion next = oldRow.hasNextLink() ? storage.readRowVersion(oldRow.nextLink(), DONT_LOAD_VALUE) : null;
currentRowVersion = storage.readRowVersion(oldRow.headLink(), DONT_LOAD_VALUE);

isCurrentRowTombstone = current.isTombstone();
assert currentRowVersion != null;

if (next == null && isCurrentRowTombstone) {
prevRowVersion = oldRow.hasNextLink() ? storage.readRowVersion(oldRow.nextLink(), DONT_LOAD_VALUE) : null;

if (prevRowVersion == null && currentRowVersion.isTombstone()) {
// If there is only one version, and it is a tombstone, then remove the chain.
operationType = OperationType.REMOVE;

return;
}

boolean isPreviousRowTombstone = prevRowVersion != null && prevRowVersion.isTombstone();

// If the previous and current version are tombstones, then delete the current version.
if (next != null && isCurrentRowTombstone && next.isTombstone()) {
toRemove = current;
if (isPreviousRowTombstone && currentRowVersion.isTombstone()) {
toRemove = currentRowVersion;

newRow = VersionChain.createCommitted(oldRow.rowId(), next.link(), next.nextLink());
newRow = VersionChain.createCommitted(rowId, prevRowVersion.link(), prevRowVersion.nextLink());
} else {
updateTimestampLink = oldRow.headLink();
updateTimestampLink = currentRowVersion.link();

newRow = VersionChain.createCommitted(oldRow.rowId(), oldRow.headLink(), oldRow.nextLink());
newRow = VersionChain.createCommitted(rowId, currentRowVersion.link(), currentRowVersion.nextLink());

if (oldRow.hasNextLink()) {
rowLinkForAddToGcQueue = oldRow.headLink();
if (currentRowVersion.hasNextLink()) {
rowLinkForAddToGcQueue = currentRowVersion.link();
}
}
}
Expand Down Expand Up @@ -194,6 +198,12 @@ public void onUpdate() {
void afterCompletion() {
assert operationType == OperationType.PUT || toRemove == null : "toRemove=" + toRemove + ", op=" + operationType;

if (operationType == OperationType.NOOP) {
return;
}

assert currentRowVersion != null;

if (toRemove != null) {
storage.removeRowVersion(toRemove);
}
Expand All @@ -202,15 +212,15 @@ void afterCompletion() {
gcQueue.add(rowId, timestamp, rowLinkForAddToGcQueue);
}

// We need to check the "toRemove" field in order to avoid a situation when we are committing a tombstone
// over an existing tombstone.
if (operationType == OperationType.PUT && toRemove == null) {
if (isCurrentRowTombstone) {
storage.decrementEstimatedSize();
} else if (rowLinkForAddToGcQueue == NULL_LINK) {
// Checking for NULL_LINK allows us to distinguish if a new version chain was created or not. In other words if this is
// an insert or an update to an existing row.
storage.incrementEstimatedSize();
if (operationType == OperationType.PUT) {
if (prevRowVersion == null || prevRowVersion.isTombstone()) {
if (!currentRowVersion.isTombstone()) {
storage.incrementEstimatedSize();
}
} else {
if (currentRowVersion.isTombstone()) {
storage.decrementEstimatedSize();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ public void estimatedSizeNeverFallsBelowZeroUsingCommitWrite() {
super.estimatedSizeNeverFallsBelowZeroUsingCommitWrite();
}

@Disabled("https://issues.apache.org/jira/browse/IGNITE-22617")
@Override
public void estimatedSizeIncreasedAfterTombstoneUsingWriteCommitted() {
super.estimatedSizeIncreasedAfterTombstoneUsingWriteCommitted();
}

@Disabled("https://issues.apache.org/jira/browse/IGNITE-22617")
@Override
public void estimatedSizeIncreasedAfterTombstoneUsingCommiteWrite() {
super.estimatedSizeIncreasedAfterTombstoneUsingCommiteWrite();
}

@Disabled("https://issues.apache.org/jira/browse/IGNITE-22617")
@Override
public void estimatedSizeShowsLatestRowsNumberUsingWriteCommited() {
Expand Down

0 comments on commit 36b119e

Please sign in to comment.