Skip to content

Commit

Permalink
Rename DeltaLakeStatistics to ExtendedStatistics
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Mar 29, 2022
1 parent 884e756 commit 37d4520
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import io.trino.plugin.deltalake.procedure.DeltaLakeTableProcedureId;
import io.trino.plugin.deltalake.procedure.DeltaTableOptimizeHandle;
import io.trino.plugin.deltalake.statistics.DeltaLakeColumnStatistics;
import io.trino.plugin.deltalake.statistics.DeltaLakeStatistics;
import io.trino.plugin.deltalake.statistics.DeltaLakeStatisticsAccess;
import io.trino.plugin.deltalake.statistics.ExtendedStatistics;
import io.trino.plugin.deltalake.statistics.ExtendedStatisticsAccess;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
Expand Down Expand Up @@ -236,7 +236,7 @@ public class DeltaLakeMetadata
private final String nodeVersion;
private final String nodeId;
private final AtomicReference<Runnable> rollbackAction = new AtomicReference<>();
private final DeltaLakeStatisticsAccess statisticsAccess;
private final ExtendedStatisticsAccess statisticsAccess;
private final boolean deleteSchemaLocationsFallback;

public DeltaLakeMetadata(
Expand All @@ -254,7 +254,7 @@ public DeltaLakeMetadata(
long defaultCheckpointInterval,
boolean ignoreCheckpointWriteFailures,
boolean deleteSchemaLocationsFallback,
DeltaLakeStatisticsAccess statisticsAccess)
ExtendedStatisticsAccess statisticsAccess)
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
Expand Down Expand Up @@ -1655,9 +1655,9 @@ public ConnectorTableHandle getTableHandleForStatisticsCollection(ConnectorSessi
long version = tableSnapshot.getVersion();

String tableLocation = metastore.getTableLocation(tableName, session);
Optional<DeltaLakeStatistics> statistics = statisticsAccess.readDeltaLakeStatistics(session, tableLocation);
Optional<ExtendedStatistics> statistics = statisticsAccess.readExtendedStatistics(session, tableLocation);

Optional<Instant> alreadyAnalyzedModifiedTimeMax = statistics.map(DeltaLakeStatistics::getAlreadyAnalyzedModifiedTimeMax);
Optional<Instant> alreadyAnalyzedModifiedTimeMax = statistics.map(ExtendedStatistics::getAlreadyAnalyzedModifiedTimeMax);

// determine list of files we want to read based on what caller requested via files_modified_after and what files were already analyzed in the past
Optional<Instant> filesModifiedAfter = Optional.empty();
Expand Down Expand Up @@ -1689,7 +1689,7 @@ public ConnectorTableHandle getTableHandleForStatisticsCollection(ConnectorSessi
}

// verify that we do not extend set of analyzed columns
Optional<Set<String>> oldAnalyzeColumnNames = statistics.flatMap(DeltaLakeStatistics::getAnalyzedColumns);
Optional<Set<String>> oldAnalyzeColumnNames = statistics.flatMap(ExtendedStatistics::getAnalyzedColumns);
if (oldAnalyzeColumnNames.isPresent()) {
if (analyzeColumnNames.isEmpty() || !oldAnalyzeColumnNames.get().containsAll(analyzeColumnNames.get())) {
throw new TrinoException(INVALID_ANALYZE_PROPERTY,
Expand Down Expand Up @@ -1761,16 +1761,16 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) table;
AnalyzeHandle analyzeHandle = tableHandle.getAnalyzeHandle().orElseThrow(() -> new IllegalArgumentException("analyzeHandle not set"));
String location = metastore.getTableLocation(tableHandle.getSchemaTableName(), session);
Optional<DeltaLakeStatistics> oldStatistics = statisticsAccess.readDeltaLakeStatistics(session, location);
Optional<ExtendedStatistics> oldStatistics = statisticsAccess.readExtendedStatistics(session, location);

// more elaborate logic for handling statistics model evaluation may need to be introduced in the future
// for now let's have a simple check rejecting update
oldStatistics.ifPresent(statistics ->
checkArgument(
statistics.getModelVersion() == DeltaLakeStatistics.CURRENT_MODEL_VERSION,
statistics.getModelVersion() == ExtendedStatistics.CURRENT_MODEL_VERSION,
"Existing table statistics are incompatible, run the drop statistics procedure on this table before re-analyzing"));

Map<String, DeltaLakeColumnStatistics> oldColumnStatistics = oldStatistics.map(DeltaLakeStatistics::getColumnStatistics)
Map<String, DeltaLakeColumnStatistics> oldColumnStatistics = oldStatistics.map(ExtendedStatistics::getColumnStatistics)
.orElseGet(ImmutableMap::of);
Map<String, DeltaLakeColumnStatistics> newColumnStatistics = toDeltaLakeColumnStatistics(computedStatistics);

Expand Down Expand Up @@ -1806,12 +1806,12 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
analyzeHandle.getColumns().get()));
}

DeltaLakeStatistics mergedDeltaLakeStatistics = new DeltaLakeStatistics(
ExtendedStatistics mergedExtendedStatistics = new ExtendedStatistics(
finalAlreadyAnalyzedModifiedTimeMax,
mergedColumnStatistics,
analyzeHandle.getColumns());

statisticsAccess.updateDeltaLakeStatistics(session, location, mergedDeltaLakeStatistics);
statisticsAccess.updateExtendedStatistics(session, location, mergedExtendedStatistics);
}

private static Map<String, DeltaLakeColumnStatistics> toDeltaLakeColumnStatistics(Collection<ComputedStatistics> computedStatistics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import io.airlift.json.JsonCodec;
import io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore;
import io.trino.plugin.deltalake.statistics.CachingDeltaLakeStatisticsAccess;
import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory;
Expand Down Expand Up @@ -45,7 +45,7 @@ public class DeltaLakeMetadataFactory
private final TransactionLogWriterFactory transactionLogWriterFactory;
private final NodeManager nodeManager;
private final CheckpointWriterManager checkpointWriterManager;
private final CachingDeltaLakeStatisticsAccess statisticsAccess;
private final CachingExtendedStatisticsAccess statisticsAccess;
private final int domainCompactionThreshold;
private final boolean hideNonDeltaLakeTables;
private final boolean unsafeWritesEnabled;
Expand All @@ -67,7 +67,7 @@ public DeltaLakeMetadataFactory(
TransactionLogWriterFactory transactionLogWriterFactory,
NodeManager nodeManager,
CheckpointWriterManager checkpointWriterManager,
CachingDeltaLakeStatisticsAccess statisticsAccess,
CachingExtendedStatisticsAccess statisticsAccess,
HiveConfig hiveConfig)
{
this.hiveMetastoreFactory = requireNonNull(hiveMetastoreFactory, "hiveMetastore is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import io.trino.plugin.deltalake.metastore.DeltaLakeMetastore;
import io.trino.plugin.deltalake.procedure.DropExtendedStatsProcedure;
import io.trino.plugin.deltalake.procedure.VacuumProcedure;
import io.trino.plugin.deltalake.statistics.CachingDeltaLakeStatisticsAccess;
import io.trino.plugin.deltalake.statistics.CachingDeltaLakeStatisticsAccess.ForCachingDeltaLakeStatisticsAccess;
import io.trino.plugin.deltalake.statistics.DeltaLakeStatistics;
import io.trino.plugin.deltalake.statistics.DeltaLakeStatisticsAccess;
import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess;
import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess.ForCachingExtendedStatisticsAccess;
import io.trino.plugin.deltalake.statistics.ExtendedStatistics;
import io.trino.plugin.deltalake.statistics.ExtendedStatisticsAccess;
import io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager;
Expand Down Expand Up @@ -106,10 +106,10 @@ public void setup(Binder binder)

binder.bind(LocationService.class).to(HiveLocationService.class).in(Scopes.SINGLETON);
binder.bind(DeltaLakeMetadataFactory.class).in(Scopes.SINGLETON);
binder.bind(CachingDeltaLakeStatisticsAccess.class).in(Scopes.SINGLETON);
binder.bind(DeltaLakeStatisticsAccess.class).to(CachingDeltaLakeStatisticsAccess.class).in(Scopes.SINGLETON);
binder.bind(DeltaLakeStatisticsAccess.class).annotatedWith(ForCachingDeltaLakeStatisticsAccess.class).to(MetaDirStatisticsAccess.class).in(Scopes.SINGLETON);
jsonCodecBinder(binder).bindJsonCodec(DeltaLakeStatistics.class);
binder.bind(CachingExtendedStatisticsAccess.class).in(Scopes.SINGLETON);
binder.bind(ExtendedStatisticsAccess.class).to(CachingExtendedStatisticsAccess.class).in(Scopes.SINGLETON);
binder.bind(ExtendedStatisticsAccess.class).annotatedWith(ForCachingExtendedStatisticsAccess.class).to(MetaDirStatisticsAccess.class).in(Scopes.SINGLETON);
jsonCodecBinder(binder).bindJsonCodec(ExtendedStatistics.class);
binder.bind(HiveTransactionManager.class).in(Scopes.SINGLETON);
binder.bind(CheckpointSchemaManager.class).in(Scopes.SINGLETON);
jsonCodecBinder(binder).bindJsonCodec(LastCheckpoint.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.DeltaLakeTableHandle;
import io.trino.plugin.deltalake.statistics.CachingDeltaLakeStatisticsAccess;
import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess;
import io.trino.plugin.deltalake.statistics.DeltaLakeColumnStatistics;
import io.trino.plugin.deltalake.statistics.DeltaLakeStatistics;
import io.trino.plugin.deltalake.statistics.ExtendedStatistics;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
Expand Down Expand Up @@ -78,13 +78,13 @@ public class HiveMetastoreBackedDeltaLakeMetastore
private final HiveMetastore delegate;
private final TransactionLogAccess transactionLogAccess;
private final TypeManager typeManager;
private final CachingDeltaLakeStatisticsAccess statisticsAccess;
private final CachingExtendedStatisticsAccess statisticsAccess;

public HiveMetastoreBackedDeltaLakeMetastore(
HiveMetastore delegate,
TransactionLogAccess transactionLogAccess,
TypeManager typeManager,
CachingDeltaLakeStatisticsAccess statisticsAccess)
CachingExtendedStatisticsAccess statisticsAccess)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogSupport is null");
Expand Down Expand Up @@ -318,9 +318,9 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab

TableStatistics.Builder statsBuilder = new TableStatistics.Builder().setRowCount(Estimate.of(numRecords));

Optional<DeltaLakeStatistics> statistics = Optional.empty();
Optional<ExtendedStatistics> statistics = Optional.empty();
if (isExtendedStatisticsEnabled(session)) {
statistics = statisticsAccess.readDeltaLakeStatistics(session, tableHandle.getLocation());
statistics = statisticsAccess.readExtendedStatistics(session, tableHandle.getLocation());
}

for (DeltaLakeColumnHandle column : columns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import io.trino.plugin.deltalake.DeltaLakeMetadata;
import io.trino.plugin.deltalake.DeltaLakeMetadataFactory;
import io.trino.plugin.deltalake.statistics.DeltaLakeStatisticsAccess;
import io.trino.plugin.deltalake.statistics.ExtendedStatisticsAccess;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorAccessControl;
import io.trino.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -49,10 +49,10 @@ public class DropExtendedStatsProcedure
String.class);

private final DeltaLakeMetadataFactory metadataFactory;
private final DeltaLakeStatisticsAccess statsAccess;
private final ExtendedStatisticsAccess statsAccess;

@Inject
public DropExtendedStatsProcedure(DeltaLakeMetadataFactory metadataFactory, DeltaLakeStatisticsAccess statsAccess)
public DropExtendedStatsProcedure(DeltaLakeMetadataFactory metadataFactory, ExtendedStatisticsAccess statsAccess)
{
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory");
this.statsAccess = requireNonNull(statsAccess, "statsAccess");
Expand Down Expand Up @@ -81,6 +81,6 @@ public void dropStats(ConnectorSession session, ConnectorAccessControl accessCon
throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, format("Table '%s' does not exist", name));
}
accessControl.checkCanInsertIntoTable(null, name);
statsAccess.deleteDeltaLakeStatistics(session, metadata.getMetastore().getTableLocation(name, session));
statsAccess.deleteExtendedStatistics(session, metadata.getMetastore().getTableLocation(name, session));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,29 @@
import static java.time.temporal.ChronoUnit.HOURS;
import static java.util.Objects.requireNonNull;

public class CachingDeltaLakeStatisticsAccess
implements DeltaLakeStatisticsAccess
public class CachingExtendedStatisticsAccess
implements ExtendedStatisticsAccess
{
private static final Duration CACHE_EXPIRATION = Duration.of(1, HOURS);
private static final long CACHE_MAX_SIZE = 1000;

private final DeltaLakeStatisticsAccess delegate;
private final Cache<String, Optional<DeltaLakeStatistics>> cache = EvictableCacheBuilder.newBuilder()
private final ExtendedStatisticsAccess delegate;
private final Cache<String, Optional<ExtendedStatistics>> cache = EvictableCacheBuilder.newBuilder()
.expireAfterWrite(CACHE_EXPIRATION)
.maximumSize(CACHE_MAX_SIZE)
.build();

@Inject
public CachingDeltaLakeStatisticsAccess(@ForCachingDeltaLakeStatisticsAccess DeltaLakeStatisticsAccess delegate)
public CachingExtendedStatisticsAccess(@ForCachingExtendedStatisticsAccess ExtendedStatisticsAccess delegate)
{
this.delegate = requireNonNull(delegate, "delegate is null");
}

@Override
public Optional<DeltaLakeStatistics> readDeltaLakeStatistics(ConnectorSession session, String tableLocation)
public Optional<ExtendedStatistics> readExtendedStatistics(ConnectorSession session, String tableLocation)
{
try {
return uncheckedCacheGet(cache, tableLocation, () -> delegate.readDeltaLakeStatistics(session, tableLocation));
return uncheckedCacheGet(cache, tableLocation, () -> delegate.readExtendedStatistics(session, tableLocation));
}
catch (UncheckedExecutionException e) {
throwIfInstanceOf(e.getCause(), TrinoException.class);
Expand All @@ -68,16 +68,16 @@ public Optional<DeltaLakeStatistics> readDeltaLakeStatistics(ConnectorSession se
}

@Override
public void updateDeltaLakeStatistics(ConnectorSession session, String tableLocation, DeltaLakeStatistics statistics)
public void updateExtendedStatistics(ConnectorSession session, String tableLocation, ExtendedStatistics statistics)
{
delegate.updateDeltaLakeStatistics(session, tableLocation, statistics);
delegate.updateExtendedStatistics(session, tableLocation, statistics);
cache.invalidate(tableLocation);
}

@Override
public void deleteDeltaLakeStatistics(ConnectorSession session, String tableLocation)
public void deleteExtendedStatistics(ConnectorSession session, String tableLocation)
{
delegate.deleteDeltaLakeStatistics(session, tableLocation);
delegate.deleteExtendedStatistics(session, tableLocation);
cache.invalidate(tableLocation);
}

Expand All @@ -90,5 +90,5 @@ public void invalidateCache(String tableLocation)
@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@Qualifier
public @interface ForCachingDeltaLakeStatisticsAccess {};
public @interface ForCachingExtendedStatisticsAccess {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import static java.util.Objects.requireNonNull;

public class DeltaLakeStatistics
public class ExtendedStatistics
{
public static final long CURRENT_MODEL_VERSION = 4;

Expand All @@ -34,7 +34,7 @@ public class DeltaLakeStatistics
private final Map<String, DeltaLakeColumnStatistics> columnStatistics;
private final Optional<Set<String>> analyzedColumns;

public DeltaLakeStatistics(
public ExtendedStatistics(
Instant alreadyAnalyzedModifiedTimeMax,
Map<String, DeltaLakeColumnStatistics> columnStatistics,
Optional<Set<String>> analyzedColumns)
Expand All @@ -43,7 +43,7 @@ public DeltaLakeStatistics(
}

@JsonCreator
public DeltaLakeStatistics(
public ExtendedStatistics(
@JsonProperty("modelVersion") long modelVersion,
@JsonProperty("alreadyAnalyzedModifiedTimeMax") Instant alreadyAnalyzedModifiedTimeMax,
@JsonProperty("columnStatistics") Map<String, DeltaLakeColumnStatistics> columnStatistics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@

import java.util.Optional;

public interface DeltaLakeStatisticsAccess
public interface ExtendedStatisticsAccess
{
Optional<DeltaLakeStatistics> readDeltaLakeStatistics(
Optional<ExtendedStatistics> readExtendedStatistics(
ConnectorSession session,
String tableLocation);

void updateDeltaLakeStatistics(
void updateExtendedStatistics(
ConnectorSession session,
String tableLocation,
DeltaLakeStatistics statistics);
ExtendedStatistics statistics);

void deleteDeltaLakeStatistics(
void deleteExtendedStatistics(
ConnectorSession session,
String tableLocation);
}
Loading

0 comments on commit 37d4520

Please sign in to comment.