Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add concurrent writes reconciliation for blind append INSERT in Delta Lake #18506

Conversation

findinpath
Copy link
Contributor

@findinpath findinpath commented Aug 2, 2023

Description

Allow committing blind append INSERT operations in a concurrent context by
placing these operations right after any other previously concurrently
completed write operations.

Disallow committing blind insert operations in any of the following cases:

  • table schema change has been committed in the meantime
  • table protocol change has been committed in the meantime

INSERT operations that contain subqueries reading the same table are
subject to concurrent write failures.

Relates to #16985

This change primarily affects storages which offer "put-If-Absent" consistency guarantees (ABFS, GCS, ).
In case of AWS S3 it is attempted to offer a "best effort" alternative to having a full-blown locking mechanism, by retrying to persist the transaction log file corresponding to the transaction with jitter.

https://docs.databricks.com/en/optimizations/isolation-level.html#isolation-levels

Additional context and related issues

https://docs.delta.io/latest/concurrency-control.html
https://docs.databricks.com/en/optimizations/isolation-level.html#isolation-levels

Delta Lake OSS Source Code

https://github.com/delta-io/delta/blob/28881f5710e7b0b0bf2d959e36a5b6ebd75c62ae/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala

Delta Lake OSS Code source explained

https://books.japila.pl/delta-lake-internals/OptimisticTransactionImpl/#checkForConflicts

This PR introduces the trino-spi method in ConnectorMetadata

    /**
     * Finish insert query
     */
    default Optional<ConnectorOutputMetadata> finishInsert(
            ConnectorSession session,
            ConnectorInsertTableHandle insertHandle,
            List<ConnectorTableHandle> sourceTableHandles,
            Collection<Slice> fragments,
            Collection<ComputedStatistics> computedStatistics)
    {
        // Delegate to deprecated SPI to not break existing connectors
        return finishInsert(session, insertHandle, fragments, computedStatistics);
    }

This is done in order to be able to figure out whether the INSERT is not a blind append operation

INSERT INTO tableA SELECT * FROM tableA WHERE partitionkey = xxx

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(x) Release notes are required, with the following suggested text:

# Delta Lake
* Concurrent write reconciliation for blind append INSERT operation. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Aug 2, 2023
@findinpath findinpath force-pushed the findinpath/delta-concurrent-writes-reconciliation branch from 8caeeee to fa9b21d Compare August 2, 2023 20:57
@github-actions github-actions bot added the delta-lake Delta Lake connector label Aug 2, 2023
@findinpath findinpath force-pushed the findinpath/delta-concurrent-writes-reconciliation branch 3 times, most recently from b3514ff to c8a13ef Compare August 3, 2023 06:43
@findinpath findinpath marked this pull request as ready for review August 3, 2023 06:43
@findinpath findinpath requested review from findepi, ebyhr and homar August 3, 2023 06:43
@findinpath findinpath force-pushed the findinpath/delta-concurrent-writes-reconciliation branch 3 times, most recently from b723285 to 882f911 Compare August 3, 2023 09:30
@findinpath findinpath force-pushed the findinpath/delta-concurrent-writes-reconciliation branch 2 times, most recently from 661f4ba to 0023308 Compare August 3, 2023 13:26
@findinpath findinpath self-assigned this Sep 5, 2023
@findinpath findinpath force-pushed the findinpath/delta-concurrent-writes-reconciliation branch from 0023308 to bf19346 Compare September 5, 2023 11:17
@ebyhr
Copy link
Member

ebyhr commented Sep 5, 2023

/test-with-secrets sha=bf19346564de038c46513dccb0ef1630bbe4f5f6

@github-actions
Copy link

github-actions bot commented Sep 5, 2023

The CI workflow run with tests that require additional secrets has been started: https://github.com/trinodb/trino/actions/runs/6084460202

@findinpath findinpath force-pushed the findinpath/delta-concurrent-writes-reconciliation branch from bf19346 to 507bae0 Compare September 5, 2023 20:26
@findinpath
Copy link
Contributor Author

@ebyhr pls do run the PR again with secrets. I've discovered a little issue in identifying transaction conflicts in my code.

@findinpath findinpath force-pushed the findinpath/delta-concurrent-writes-reconciliation branch from 507bae0 to a4d1c72 Compare September 5, 2023 20:35
writeCommitted = true;
writeCheckpointIfNeeded(session, handle.getTableName(), handle.getLocation(), checkpointInterval, commitVersion);
writeCheckpointIfNeeded(session, handle.getTableName(), handle.getLocation(), handle.getMetadataEntry().getCheckpointInterval(), commitVersion);

if (isCollectExtendedStatisticsColumnStatisticsOnWrite(session) && !computedStatistics.isEmpty() && !dataFileInfos.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current mechanism also prevents concurrent writes to extended statistics. With this new change there is possibility to race condition when writing them.

Copy link
Contributor Author

@findinpath findinpath Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be only one checkpoint per version written - if the process manages to write the x.log transaction log, there's no other process trying to create a checkpoint for the same version.

Comment relates to #16088

This could probably be done on a mechanism similar to what we have on https://github.com/trinodb/trino/blob/675defb40e12734f5d067848a14857ff66c80019/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/TransactionLogSynchronizer.java implementations. Doesn't make sense because we currently update the same stats file, not always a new one.

@findinpath findinpath force-pushed the findinpath/delta-concurrent-writes-reconciliation branch from a4d1c72 to c70f3a7 Compare September 6, 2023 13:32
@findinpath findinpath force-pushed the findinpath/delta-concurrent-writes-reconciliation branch from c70f3a7 to 3f4a218 Compare September 14, 2023 10:23
@findinpath findinpath force-pushed the findinpath/delta-concurrent-writes-reconciliation branch from 2e39b72 to f3d4706 Compare February 2, 2024 13:39
Copy link
Member

@alexjo2144 alexjo2144 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good besides a few nits

@@ -2263,6 +2269,63 @@ public void testPartitionFilterIncluded()
}
}

@Test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd run this a few times

@Disabled
public void testConcurrentInsertsReconciliationForBlindInserts()
{
// testConcurrentInsertsReconciliation requires safe writes capability to avoid test flakiness
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all the writes are happening in Trino, this should work. The writes on s3 are unsafe if you're also writing from Spark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all the writes are happening in Trino, this should work.

It works most of the time, but sometimes it fails - it happened in the CI and this is why I've decided to disable the test.

IIRC it was linked to Transaction log locked... messages.

@findinpath findinpath force-pushed the findinpath/delta-concurrent-writes-reconciliation branch from f3d4706 to c3b4200 Compare February 12, 2024 17:04
In the context of retrying to flush a transaction log file from multiple
Trino queries it may happen that multiple locks will exist for the same
transaction log file.
Add the possibility to perform analysis on the dependencies
of the INSERT statement.
Specifically one connector could potentially figure out
whether concurrent INSERT operations which add data into
the same table as the one from which data is being selected
collide with each other.
@findinpath
Copy link
Contributor Author

findinpath commented Feb 12, 2024

Rebasing on master to address conflicts.
The commits affected:

  • Add scaffolding for using LocalTransactionLogSynchronizer in tests commit
  • the test TestDeltaLakeLocalConcurrentWritesTest from Add concurrent writes reconciliation for blind append INSERT operations commit

@findinpath findinpath force-pushed the findinpath/delta-concurrent-writes-reconciliation branch 2 times, most recently from 6231236 to e3c91e5 Compare February 12, 2024 17:25
optionalCommitInfo = Optional.of(transactionLogEntry.getCommitInfo());
}
else if (transactionLogEntry.getAdd() != null) {
addedFilesBuilder.add(transactionLogEntry.getAdd());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't need these saved in memory yet, let's not keep them around. You'll need them for updates/deletes but we're not there yet. Same with remove entries. The ones that are singletons not lists are fine with me.

@@ -2446,6 +2543,26 @@ private CommitInfoEntry getCommitInfoEntry(
long createdTime,
String operation,
long readVersion)
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably just inline this one, rather than have two helper methods which just call a constructor

@alexjo2144
Copy link
Member

Make sure to get a run with secrets done

@ebyhr
Copy link
Member

ebyhr commented Feb 13, 2024

/test-with-secrets sha=e3c91e5238fbe099145d20924dcb13804f645ac9

Copy link

github-actions bot commented Feb 13, 2024

The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/7893679486

Allow committing blind append INSERT operations in a concurrent context by
placing these operations right after any other previously concurrently
completed write operations.

Disallow committing blind insert operations in any of the following cases:

- table schema change has been committed in the meantime
- table protocol change has been committed in the meantime

INSERT operations that contain subqueries reading the same table are
subject to concurrent write failures.
@findinpath findinpath force-pushed the findinpath/delta-concurrent-writes-reconciliation branch from e3c91e5 to efe6102 Compare February 15, 2024 13:11
@ebyhr
Copy link
Member

ebyhr commented Feb 16, 2024

/test-with-secrets sha=efe610257a489242ee65cb62b54d8919adfa217f

Copy link

github-actions bot commented Feb 16, 2024

The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/7925295984

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector
Development

Successfully merging this pull request may close these issues.

7 participants