Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush transaction log cache in Delta flush_metadata_cache procedure #16466

Merged
merged 5 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,20 @@ existing Delta Lake table from the metastores without deleting the data::

CALL example.system.unregister_table(schema_name => 'testdb', table_name => 'customer_orders')

.. _delta-lake-flush-metadata-cache:

Flush metadata cache
^^^^^^^^^^^^^^^^^^^^

* ``system.flush_metadata_cache()``

Flush all metadata caches.

* ``system.flush_metadata_cache(schema_name => ..., table_name => ...)``

Flush metadata caches entries connected with selected table.
Procedure requires named parameters to be passed

.. _delta-lake-write-support:

Updating data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public DeltaLakeTableHandle getTableHandle(ConnectorSession session, SchemaTable
return new DeltaLakeTableHandle(
dataTableName.getSchemaName(),
dataTableName.getTableName(),
metastore.getTableLocation(dataTableName, session),
metastore.getTableLocation(dataTableName),
metadata,
TupleDomain.all(),
TupleDomain.all(),
Expand Down Expand Up @@ -449,7 +449,7 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
{
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) table;
String location = metastore.getTableLocation(tableHandle.getSchemaTableName(), session);
String location = metastore.getTableLocation(tableHandle.getSchemaTableName());
Map<String, String> columnComments = getColumnComments(tableHandle.getMetadataEntry());
Map<String, Boolean> columnsNullability = getColumnsNullability(tableHandle.getMetadataEntry());
List<String> constraints = ImmutableList.<String>builder()
Expand Down Expand Up @@ -1415,7 +1415,7 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tabl

Optional<Long> checkpointInterval = handle.getMetadataEntry().getCheckpointInterval();

String tableLocation = metastore.getTableLocation(handle.getSchemaTableName(), session);
String tableLocation = metastore.getTableLocation(handle.getSchemaTableName());

boolean writeCommitted = false;
try {
Expand Down Expand Up @@ -1660,7 +1660,7 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl
private boolean allowWrite(ConnectorSession session, DeltaLakeTableHandle tableHandle)
{
try {
String tableLocation = metastore.getTableLocation(tableHandle.getSchemaTableName(), session);
String tableLocation = metastore.getTableLocation(tableHandle.getSchemaTableName());
Path tableMetadataDirectory = new Path(new Path(tableLocation).getParent().toString(), tableHandle.getTableName());
boolean requiresOptIn = transactionLogWriterFactory.newWriter(session, tableMetadataDirectory.toString()).isUnsafe();
return !requiresOptIn || unsafeWritesEnabled;
Expand Down Expand Up @@ -2260,7 +2260,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
{
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) table;
AnalyzeHandle analyzeHandle = tableHandle.getAnalyzeHandle().orElseThrow(() -> new IllegalArgumentException("analyzeHandle not set"));
String location = metastore.getTableLocation(tableHandle.getSchemaTableName(), session);
String location = metastore.getTableLocation(tableHandle.getSchemaTableName());
Optional<Instant> maxFileModificationTime = getMaxFileModificationTime(computedStatistics);
updateTableStatistics(
session,
Expand Down Expand Up @@ -2554,7 +2554,7 @@ private List<CommitInfoEntry> getCommitInfoEntries(SchemaTableName table, Connec
{
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
try {
return TransactionLogTail.loadNewTail(fileSystem, metastore.getTableLocation(table, session), Optional.empty()).getFileEntries().stream()
return TransactionLogTail.loadNewTail(fileSystem, metastore.getTableLocation(table), Optional.empty()).getFileEntries().stream()
.map(DeltaLakeTransactionLogEntry::getCommitInfo)
.filter(Objects::nonNull)
.collect(toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.deltalake.metastore.DeltaLakeMetastore;
import io.trino.plugin.deltalake.procedure.DropExtendedStatsProcedure;
import io.trino.plugin.deltalake.procedure.FlushMetadataCacheProcedure;
import io.trino.plugin.deltalake.procedure.OptimizeTableProcedure;
import io.trino.plugin.deltalake.procedure.RegisterTableProcedure;
import io.trino.plugin.deltalake.procedure.UnregisterTableProcedure;
Expand Down Expand Up @@ -147,6 +148,7 @@ public void setup(Binder binder)
procedures.addBinding().toProvider(VacuumProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(FlushMetadataCacheProcedure.class).in(Scopes.SINGLETON);

Multibinder<TableProcedureMetadata> tableProcedures = newSetBinder(binder, TableProcedureMetadata.class);
tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private Stream<DeltaLakeSplit> getSplits(
Constraint constraint)
{
DeltaLakeMetastore metastore = getMetastore(session, transaction);
String tableLocation = metastore.getTableLocation(tableHandle.getSchemaTableName(), session);
String tableLocation = metastore.getTableLocation(tableHandle.getSchemaTableName());
List<AddFileEntry> validDataFiles = metastore.getValidDataFiles(tableHandle.getSchemaTableName(), session);
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint();
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint = tableHandle.getNonPartitionConstraint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public interface DeltaLakeMetastore

ProtocolEntry getProtocol(ConnectorSession session, TableSnapshot table);

String getTableLocation(SchemaTableName table, ConnectorSession session);
String getTableLocation(SchemaTableName table);

TableSnapshot getSnapshot(SchemaTableName table, ConnectorSession session);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected void setup(Binder binder)
bindMetastoreModule("file", new DeltaLakeFileMetastoreModule());
bindMetastoreModule("glue", new DeltaLakeGlueMetastoreModule());

install(new DecoratedHiveMetastoreModule());
install(new DecoratedHiveMetastoreModule(false));
}

private void bindMetastoreModule(String name, Module module)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,21 @@ public List<String> getAllTables(String databaseName)
public Optional<Table> getTable(String databaseName, String tableName)
{
Optional<Table> candidate = delegate.getTable(databaseName, tableName);
candidate.ifPresent(table -> {
if (isHiveOrPrestoView(table)) {
// this is a Hive view, hence not a table
throw new NotADeltaLakeTableException(databaseName, tableName);
}
if (!TABLE_PROVIDER_VALUE.equalsIgnoreCase(table.getParameters().get(TABLE_PROVIDER_PROPERTY))) {
throw new NotADeltaLakeTableException(databaseName, tableName);
}
});
candidate.ifPresent(HiveMetastoreBackedDeltaLakeMetastore::verifyDeltaLakeTable);
return candidate;
}

public static void verifyDeltaLakeTable(Table table)
{
if (isHiveOrPrestoView(table)) {
// this is a Hive view, hence not a table
throw new NotADeltaLakeTableException(table.getDatabaseName(), table.getTableName());
}
if (!TABLE_PROVIDER_VALUE.equalsIgnoreCase(table.getParameters().get(TABLE_PROVIDER_PROPERTY))) {
throw new NotADeltaLakeTableException(table.getDatabaseName(), table.getTableName());
}
}

@Override
public void createDatabase(Database database)
{
Expand Down Expand Up @@ -167,7 +170,7 @@ public void createTable(ConnectorSession session, Table table, PrincipalPrivileg
@Override
public void dropTable(ConnectorSession session, String databaseName, String tableName, boolean deleteData)
{
String tableLocation = getTableLocation(new SchemaTableName(databaseName, tableName), session);
String tableLocation = getTableLocation(new SchemaTableName(databaseName, tableName));
delegate.dropTable(databaseName, tableName, deleteData);
statisticsAccess.invalidateCache(tableLocation);
transactionLogAccess.invalidateCaches(tableLocation);
Expand Down Expand Up @@ -202,11 +205,16 @@ public ProtocolEntry getProtocol(ConnectorSession session, TableSnapshot tableSn
}

@Override
public String getTableLocation(SchemaTableName table, ConnectorSession session)
public String getTableLocation(SchemaTableName tableName)
{
Table table = getTable(tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));
return getTableLocation(table);
}

public static String getTableLocation(Table table)
{
Map<String, String> serdeParameters = getTable(table.getSchemaName(), table.getTableName())
.orElseThrow(() -> new TableNotFoundException(table))
.getStorage().getSerdeParameters();
Map<String, String> serdeParameters = table.getStorage().getSerdeParameters();
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
String location = serdeParameters.get(PATH_PROPERTY);
if (location == null) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, format("No %s property defined for table: %s", PATH_PROPERTY, table));
Expand All @@ -218,7 +226,7 @@ public String getTableLocation(SchemaTableName table, ConnectorSession session)
public TableSnapshot getSnapshot(SchemaTableName table, ConnectorSession session)
{
try {
return transactionLogAccess.loadSnapshot(table, getTableLocation(table, session), session);
return transactionLogAccess.loadSnapshot(table, getTableLocation(table), session);
}
catch (NotADeltaLakeTableException e) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ public void dropStats(ConnectorSession session, ConnectorAccessControl accessCon
throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, format("Table '%s' does not exist", name));
}
accessControl.checkCanInsertIntoTable(null, name);
statsAccess.deleteExtendedStatistics(session, metadata.getMetastore().getTableLocation(name, session));
statsAccess.deleteExtendedStatistics(session, metadata.getMetastore().getTableLocation(name));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.procedure;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore;
import io.trino.spi.TrinoException;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.procedure.Procedure;

import javax.inject.Inject;
import javax.inject.Provider;

import java.lang.invoke.MethodHandle;
import java.util.Optional;

import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.getTableLocation;
import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.verifyDeltaLakeTable;
import static io.trino.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.invoke.MethodHandles.lookup;
import static java.util.Objects.requireNonNull;

public class FlushMetadataCacheProcedure
implements Provider<Procedure>
{
private static final String PROCEDURE_NAME = "flush_metadata_cache";

private static final String PARAM_SCHEMA_NAME = "SCHEMA_NAME";
private static final String PARAM_TABLE_NAME = "TABLE_NAME";

private final HiveMetastoreFactory metastoreFactory;
private final Optional<CachingHiveMetastore> cachingHiveMetastore;
private final TransactionLogAccess transactionLogAccess;

private static final MethodHandle FLUSH_METADATA_CACHE;

static {
try {
FLUSH_METADATA_CACHE = lookup().unreflect(FlushMetadataCacheProcedure.class.getMethod("flushMetadataCache", ConnectorSession.class, String.class, String.class));
}
catch (ReflectiveOperationException e) {
throw new AssertionError(e);
}
}

@Inject
public FlushMetadataCacheProcedure(
HiveMetastoreFactory metastoreFactory,
Optional<CachingHiveMetastore> cachingHiveMetastore,
TransactionLogAccess transactionLogAccess)
{
this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null");
this.cachingHiveMetastore = requireNonNull(cachingHiveMetastore, "cachingHiveMetastore is null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to try to delegate to Hive's flush procedure?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to delegate to the connector at first, but the code was a little redundant. Let me merge as-is. I will take another look later.

this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");
}

@Override
public Procedure get()
{
return new Procedure(
"system",
PROCEDURE_NAME,
ImmutableList.of(
new Procedure.Argument(PARAM_SCHEMA_NAME, VARCHAR, false, null),
new Procedure.Argument(PARAM_TABLE_NAME, VARCHAR, false, null)),
FLUSH_METADATA_CACHE.bindTo(this),
true);
}

public void flushMetadataCache(ConnectorSession session, String schemaName, String tableName)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
doFlushMetadataCache(session, Optional.ofNullable(schemaName), Optional.ofNullable(tableName));
}
}

private void doFlushMetadataCache(ConnectorSession session, Optional<String> schemaName, Optional<String> tableName)
{
if (schemaName.isEmpty() && tableName.isEmpty()) {
cachingHiveMetastore.ifPresent(CachingHiveMetastore::flushCache);
transactionLogAccess.flushCache();
}
else if (schemaName.isPresent() && tableName.isPresent()) {
HiveMetastore metastore = metastoreFactory.createMetastore(Optional.of(session.getIdentity()));
Table table = metastore.getTable(schemaName.get(), tableName.get())
.orElseThrow(() -> new TableNotFoundException(new SchemaTableName(schemaName.get(), tableName.get())));
verifyDeltaLakeTable(table);
cachingHiveMetastore.ifPresent(caching -> caching.invalidateTable(table.getDatabaseName(), table.getTableName()));
transactionLogAccess.invalidateCaches(getTableLocation(table));
}
else {
throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, "Illegal parameter set passed");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ public TableSnapshot loadSnapshot(SchemaTableName table, String tableLocation, C
return snapshot;
}

public void flushCache()
{
tableSnapshots.invalidateAll();
activeDataFileCache.invalidateAll();
}

public void invalidateCaches(String tableLocation)
{
tableSnapshots.invalidate(tableLocation);
Expand Down
Loading