Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add concurrent writes reconciliation for non-blind INSERT in Delta Lake #20983

Conversation

findinpath
Copy link
Contributor

@findinpath findinpath commented Mar 7, 2024

Description

Follow-up from #18506

Add concurrent writes reconciliation for non-blind INSERT in Delta Lake

On tables with isolation level WriteSerializable, allow committing
non-blind append INSERT operations in a concurrent context by
placing these operations right after any other previously concurrently
completed write operations.

For disambiguation, INSERT operations that contain subqueries reading the same
table are considered non-blind inserts.

Disallow committing insert operations in any of the following cases:

  • table schema change has been committed in the meantime
  • table protocol change has been committed in the meantime
  • add files committed in the meantime should be read by
    the current insert operations
  • remove files committed in the meantime conflict with the add files read
    by the current insert operations

Relevant example taken from Databricks documentation in regards to the
distinction between WriteSerializable and Serializable isolation levels:

For example, consider txn1, a long running delete and txn2,
which inserts blindly data into the table.
txn2 and txn1 complete and they are recorded in the order
txn2, txn1
into the history of the table.
According to the history, the data inserted in txn2 should not exist
in the table. For Serializable level, a reader would never see data
inserted by txn2. However, for the WriteSerializable level, a reader
could at some point see the data inserted by txn2.

A few words about WriteSerializable isolation level taken from delta.io javadocs:

This isolation level will ensure snapshot isolation consistency guarantee
between write operations only.
In other words, if only the write operations are considered, then
there exists a serializable sequence between them that would produce the same
result as seen in the table.

Consider the Delta Lake table t with the following definition:

CREATE TABLE t(data integer);
INSERT INTO t VALUES 0;

There will be always reconciliation when dealing with the following concurrent
INSERT statements:

INSERT INTO t SELECT count(*) FROM t; -- non-blind insert
INSERT INTO t VALUES 2; -- blind insert

The expected state of the table t will be independently of the order of the way
in which the operations were actually committed to the transaction log:

0
1
2

On the other hand, there can't be reconciliation when dealing with the following
concurrent INSERT statements:

INSERT INTO t SELECT count(*) FROM t; -- non-blind insert
INSERT INTO t SELECT count(*) FROM t; -- non-blind insert

The reason why the reconciliation will fail is because there can't be picked any
serializable sequence of statements to explain the state of the table if both
statements would commit successfully.
Just for the sake of demonstration, if both statements would complete successfully,
the table state would be:

0
1
1

However, independently how we would pick a serializable sequence of operations
performed on the table, the table state would look like:

0
1
2

Additional context and related issues

https://docs.delta.io/latest/concurrency-control.html
https://docs.databricks.com/en/optimizations/isolation-level.html#isolation-levels

Delta Lake OSS Source Code

https://github.com/delta-io/delta/blob/28881f5710e7b0b0bf2d959e36a5b6ebd75c62ae/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala

Delta Lake OSS Code source explained

https://books.japila.pl/delta-lake-internals/OptimisticTransactionImpl/#checkForConflicts

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text:

# Delta Lake
* Concurrent write reconciliation for non-blind append INSERT operation. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Mar 7, 2024
@github-actions github-actions bot added the delta-lake Delta Lake connector label Mar 7, 2024
@findinpath findinpath self-assigned this Mar 7, 2024
@ebyhr
Copy link
Member

ebyhr commented Mar 7, 2024

/test-with-secrets sha=e452a09cf3bf6726cd1461b197f0797c784d75c7

Copy link

github-actions bot commented Mar 7, 2024

The CI workflow run with tests that require additional secrets has been started: https://github.com/trinodb/trino/actions/runs/8196441291

@findinpath findinpath requested review from ebyhr and pajaks March 8, 2024 08:05

Map<DeltaLakeColumnHandle, Domain> enforcedDomains = enforcedPartitionConstraint.getDomains().orElseThrow();
Optional<AddFileEntry> unreadAddAction = addedFiles.stream()
.filter(addAction -> partitionMatchesPredicate(addAction.getCanonicalPartitionValues(), enforcedDomains))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it match source enforceDomains with source addAction. Or with target?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reconciliation is done AFAIU based on the enforced partition constraints of the source tables (for non-blind INSERT or UPDATE/MERGE/DELETE operations).
If the operations writes new data - lNSERT operations - , this is not relevant for the WriteSerializable isolation level.
This aspect becomes relevant when dealing with Serializable operations on the same partition of the table - out of scope for the current PR.

@findinpath findinpath marked this pull request as draft March 13, 2024 14:09
@findinpath findinpath force-pushed the findinpath/delta-lake-enhance-concurrent-inserts branch from e452a09 to 8001d8b Compare March 13, 2024 21:53
@findinpath findinpath marked this pull request as ready for review March 13, 2024 21:53
@findinpath findinpath requested review from pajaks and alexjo2144 March 13, 2024 21:53
}

Map<DeltaLakeColumnHandle, Domain> enforcedDomains = enforcedSourcePartitionConstraints.getDomains().orElseThrow();
Optional<AddFileEntry> unreadAddAction = addedFiles.stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't name of this variable be more like conflictingAddAction? Right now unreadAddAction indicates that those add action are not conflicting because we don't read from them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The files are not necessarily conflicting.
It may very well be that the operations don't overlap at row level.
Here we're just aborting the operation if we're findining new active files that should have been taken into account

}

@Test
public void testConcurrentInsertsSelectingFromDifferentPartitionsOfSameTable()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why this test is not in BaseDeltaLakeConnectorSmokeTest as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm reluctant that there is added value to test extensively on BaseDeltaLakeConnectorSmokeTest.
The class would not be a "smoke" test class anymore.

I trust that the local file store does indeed throw FileAlreadyExistsException when a concurrent operation creates a namesake transaction log file and that's why I'd be inclined to say TestDeltaLakeLocalConcurrentWritesTest is fit to hold an increased battery of tests for concurrent writes reconciliation functionality.

@findinpath findinpath force-pushed the findinpath/delta-lake-enhance-concurrent-inserts branch 2 times, most recently from cf5c68e to 9da931b Compare March 15, 2024 08:30
@findinpath findinpath force-pushed the findinpath/delta-lake-enhance-concurrent-inserts branch from 9da931b to 646b9a7 Compare March 15, 2024 12:05
Copy link
Member

@ebyhr ebyhr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost good to me.

@findinpath findinpath force-pushed the findinpath/delta-lake-enhance-concurrent-inserts branch 2 times, most recently from fc7bb1e to 3e80e92 Compare March 21, 2024 13:40
private final List<MetadataEntry> metadataUpdates;
private final Optional<ProtocolEntry> protocol;
private final Optional<CommitInfoEntry> commitInfo;
private final List<RemoveFileEntry> removedFiles;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you summarize the data in this summary more? Looks like you could throw out any add file commits that are blind appends entirely, and for other commits you need a set of the partitions but not the rest of the data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping only:

  • canonical partition keys for the added files
  • a flag stating whether or not the commit contains remove files.

Copy link
Member

@alexjo2144 alexjo2144 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can tell the conflict handling here is still safe, just a few questions/comments.

@findinpath findinpath force-pushed the findinpath/delta-lake-enhance-concurrent-inserts branch 2 times, most recently from e2c0755 to 6ad7272 Compare March 22, 2024 17:04
@findinpath findinpath requested a review from alexjo2144 March 22, 2024 17:04
@findinpath findinpath force-pushed the findinpath/delta-lake-enhance-concurrent-inserts branch from 6ad7272 to b622209 Compare March 22, 2024 17:05
On tables with isolation level WriteSerializable, allow committing
non-blind append INSERT operations in a concurrent context by
placing these operations right after any other previously concurrently
completed write operations.

For disambiguation, INSERT operations that contain subqueries reading the same
table are considered non-blind inserts.

Disallow committing insert operations in any of the following cases:

- table schema change has been committed in the meantime
- table protocol change has been committed in the meantime
- add files committed in the meantime should be read by
  the current insert operations
- remove files committed in the meantime conflict with the add files read
  by the current insert operations

 Relevant example taken from Databricks documentation in regards to the
 distinction between `WriteSerializable` and `Serializable` isolation levels:

 > For example, consider `txn1`, a long running delete and `txn2`,
 > which inserts blindly data into the table.
 > `txn2` and `txn1` complete and they are recorded in the order
 > `txn2, txn1`
 > into the history of the table.
 > According to the history, the data inserted in `txn2` should not exist
 > in the table. For `Serializable` level, a reader would never see data
 > inserted by `txn2`. However, for the `WriteSerializable` level, a reader
 > could at some point see the data inserted by `txn2`.

 A few words about WriteSerializable isolation level taken from delta.io javadocs:

 > This isolation level will ensure snapshot isolation consistency guarantee
 > between write operations only.
 > In other words, if only the write operations are considered, then
 > there exists a serializable sequence between them that would produce the same
 > result as seen in the table.

 Consider the Delta Lake table `t` with the following definition:

 ```
 CREATE TABLE t(data integer);
 INSERT INTO t VALUES 0;
 ```

There will be always reconciliation when dealing with the following concurrent
 `INSERT` statements:

```
INSERT INTO t SELECT count(*) FROM t; -- non-blind insert
INSERT INTO t VALUES 2; -- blind insert
```

The expected state of the table `t` will be independently of the order of the way
in which the operations were actually committed to the transaction log:

```
0
1
2
```

On the other hand, there can't be reconciliation when dealing with the following
 concurrent `INSERT` statements:

```
INSERT INTO t SELECT count(*) FROM t; -- non-blind insert
INSERT INTO t SELECT count(*) FROM t; -- non-blind insert
```

The reason why the reconciliation will fail is because there can't be picked any
serializable sequence of statements to explain the state of the table if both
statements would commit successfully.
Just for the sake of demonstration, if both statements would complete successfully,
the table state would be:

```
0
1
1
```

However, independently how we would pick a serializable sequence of operations
performed on the table, the table state would look like:

```
0
1
2
```
@findinpath findinpath force-pushed the findinpath/delta-lake-enhance-concurrent-inserts branch from 014a5e4 to 4d0024c Compare March 23, 2024 05:28
@findinpath findinpath requested a review from ebyhr March 26, 2024 05:56
@findinpath
Copy link
Contributor Author

@ebyhr , @alexjo2144 AC.
Please make another round of review 🙏

@ebyhr
Copy link
Member

ebyhr commented Mar 27, 2024

/test-with-secrets sha=4d0024c45c16cde39d18dc160e6eef83abcd619e

Copy link

The CI workflow run with tests that require additional secrets has been started: https://github.com/trinodb/trino/actions/runs/8448260075

@ebyhr ebyhr merged commit 06c6ef0 into trinodb:master Mar 28, 2024
24 checks passed
@github-actions github-actions bot added this to the 444 milestone Mar 28, 2024
@ebyhr
Copy link
Member

ebyhr commented Mar 29, 2024

@findinpath Could you take a look at the following failure? It's happening on master:

Error:    TestDeltaLakeLocalConcurrentWritesTest.testConcurrentInsertsSelectingFromTheSameTable:177->testConcurrentInsertsSelectingFromTheSameTable:228->AbstractTestQueryFramework.assertQuery:350 For query 20240329_103745_00065_e4xfv: 
 SELECT * FROM test_concurrent_inserts_select_from_same_table_h28nk6v7lf
not equal
Actual rows (up to 100 of 1 extra rows shown, 3 rows in total):
    [1, 10]
Expected rows (up to 100 of 1 missing rows shown, 3 rows in total):
    [2, 10]

https://github.com/trinodb/trino/actions/runs/8480015101/job/23235027443

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector
Development

Successfully merging this pull request may close these issues.

4 participants