-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for storing metadata to metastore in Delta Lake #21463
Conversation
29de71c
to
374a418
Compare
Pls rebase to deal with code conflicts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very promissing !!!
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
...rc/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreAccessOperations.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java
Outdated
Show resolved
Hide resolved
...n/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastore.java
Outdated
Show resolved
Hide resolved
...rc/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreAccessOperations.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
@@ -42,4 +42,6 @@ public interface DeltaLakeMetastore | |||
void dropTable(SchemaTableName schemaTableName, String tableLocation, boolean deleteData); | |||
|
|||
void renameTable(SchemaTableName from, SchemaTableName to); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need compatibility tests with Delta Lake OSS where Spark writes to the table and Trino is constrained to update the metadata caching properties on any read/write operation.
8785fd6
to
0fc669a
Compare
Addressed comments partially. I will add more tests to TestDeltaLakeMetastoreAccessOperations. |
plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/DeltaLakeQueryRunner.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Show resolved
Hide resolved
3cb75c9
to
0d7fdfd
Compare
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
...src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java
Outdated
Show resolved
Hide resolved
...rc/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreAccessOperations.java
Outdated
Show resolved
Hide resolved
...rc/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreAccessOperations.java
Outdated
Show resolved
Hide resolved
...rc/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreAccessOperations.java
Show resolved
Hide resolved
...rc/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreAccessOperations.java
Show resolved
Hide resolved
...src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
} | ||
tableComment.ifPresent(comment -> builder.put(TABLE_COMMENT, comment)); | ||
builder.put(TRINO_LAST_TRANSACTION_VERSION, Long.toString(version)); | ||
builder.put(TRINO_METADATA_SCHEMA_STRING, schemaString); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you taken into consideration base64(compress(schema))
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question.
Regardless of whether we compress or not, we should have a check on length.
trino/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergUtil.java
Lines 90 to 92 in d60b4d0
boolean canPersistComment = (comment == null || comment.length() <= GLUE_TABLE_PARAMETER_LENGTH_LIMIT); | |
boolean canPersistColumnInfo = glueColumns.isPresent(); | |
boolean canPersistMetadata = canPersistComment && canPersistColumnInfo; |
...in/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java
Show resolved
Hide resolved
303cf68
to
050edfa
Compare
@@ -626,6 +650,9 @@ public LocatedTableHandle getTableHandle( | |||
return null; | |||
} | |||
verifySupportedColumnMapping(getColumnMappingMode(metadataEntry, protocolEntry)); | |||
if (cacheTableMetadata && endVersion.isEmpty() && !isCachedVersionSameAsLastTransaction(metastoreTable.get(), tableSnapshot)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of referencing multiple times the same table in a query, is this check supposed to avoid calling metastore table replace repeatedly for the same table?
I don't follow where we ensure that the call is made only once.
I'm thinking that this should be ensured with the help of a ConcurrentMap like in io.trino.plugin.deltalake.DeltaLakeMetadata#getSnapshot(io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.SchemaTableName, java.lang.String, java.util.Optional<java.lang.Long>)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow where we ensure that the call is made only once.
good point. i don't think we do
we should maintain a sort of a queue of pending updates
and scheduling new update logically remove previous entry
the ordering of updates doesn't matter (we can deliver them out of order, as long as we deduplicate), and we want to deduplicate, so we only need a Map<SchemaTableName, /* update info */>
Moreover, the update should be propagated to the metastore after transaction is committed.
Thus
- DeltaLakeMetadata should collect stuff to update (there shouldn't be any duplicates here, as DeltaLakeMetadata should not observe two different versions of one table, unless AS OF is used0)
- on commit, it should move the collected stuff to update to update manager singleton, to perform actual update
- the update manager should deduplicate (updates coming from different transactions) and perform actual updates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(skimmed)
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java
Outdated
Show resolved
Hide resolved
@@ -626,6 +650,9 @@ public LocatedTableHandle getTableHandle( | |||
return null; | |||
} | |||
verifySupportedColumnMapping(getColumnMappingMode(metadataEntry, protocolEntry)); | |||
if (cacheTableMetadata && endVersion.isEmpty() && !isCachedVersionSameAsLastTransaction(metastoreTable.get(), tableSnapshot)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow where we ensure that the call is made only once.
good point. i don't think we do
we should maintain a sort of a queue of pending updates
and scheduling new update logically remove previous entry
the ordering of updates doesn't matter (we can deliver them out of order, as long as we deduplicate), and we want to deduplicate, so we only need a Map<SchemaTableName, /* update info */>
Moreover, the update should be propagated to the metastore after transaction is committed.
Thus
- DeltaLakeMetadata should collect stuff to update (there shouldn't be any duplicates here, as DeltaLakeMetadata should not observe two different versions of one table, unless AS OF is used0)
- on commit, it should move the collected stuff to update to update manager singleton, to perform actual update
- the update manager should deduplicate (updates coming from different transactions) and perform actual updates
|
||
TrinoFileSystem fileSystem = fileSystemFactory.create(session); | ||
|
||
Stream<RelationCommentMetadata> tables = streamTables(session, schemaName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like efficient operation, but it's not good API at least for Glue. Getting table names is as expensive as getting all table information (because that's what's happening under the covers)
We should add something like HiveMetastore.streamTables
that would return full table objects in an iterative manner. I happen to have something like that implemented, so I can share the code later. For now, we can just use listTables
, without extracting new streamTables
method.
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
...rc/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreAccessOperations.java
Outdated
Show resolved
Hide resolved
...rc/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreAccessOperations.java
Show resolved
Hide resolved
.add(GET_TABLE) | ||
.add(REPLACE_TABLE) | ||
.build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And without cache, we do only GET_TABLE, right?
would it be possible to have "do cache metadata" as a boolean variable in the test
so that the expected can be expressed as a conditional expression based on "do cache metadata"? this would make the test tell the story more directly
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java
Outdated
Show resolved
Hide resolved
050edfa
to
7114f75
Compare
Just rebased on master to resolve conflicts. (No change) |
810d795
to
57d174d
Compare
...ava/io/trino/plugin/deltalake/metastore/glue/v1/DeltaLakeGlueV1MetastoreTableOperations.java
Outdated
Show resolved
Hide resolved
...ava/io/trino/plugin/deltalake/metastore/glue/v1/DeltaLakeGlueV1MetastoreTableOperations.java
Show resolved
Hide resolved
65c0f8e
to
e1e35b9
Compare
Rebased on master to resolve logical conflicts. |
71390ac
to
69b1e5a
Compare
69b1e5a
to
ecaa8e7
Compare
Rebased on master to resolve conflicts. |
@@ -53,6 +53,11 @@ public void commit(ConnectorTransactionHandle transaction) | |||
{ | |||
MemoizedMetadata deltaLakeMetadata = transactions.remove(transaction); | |||
checkArgument(deltaLakeMetadata != null, "no such transaction: %s", transaction); | |||
deltaLakeMetadata.optionalGet().ifPresent(metadata -> { | |||
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { | |||
metadata.commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated question: Why is ThreadContextClassLoader
required for metadata.commit()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably to ensure that metadata.commit() call will be executed in the context of the delta lake plugin classloader
@wendigo Please take another look. |
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Show resolved
Hide resolved
...in/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java
Show resolved
Hide resolved
8295cc1
to
537164b
Compare
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % fixups
This is preparatory change for the next commit that stores the table definition to metastore.
3afd97a
to
e010a3d
Compare
e010a3d
to
33f8454
Compare
Description
This PR adds a new
delta.metastore.store-table-metadata
config property (true by default) and stores the last transaction number and schema string to table parameter on the metastore when the property is enabled.Conditions we update the metastore:
The cached information is used when listing table comments or columns.
Test result with 100 tables having 7 transaction logs:
We could improve more once we use Glue specific API call (AWSGlue#getTables). This can be handled in follow-up.
Release notes
(x) Release notes are required, with the following suggested text: