Skip to content

Commit

Permalink
Rewrite statistics during OPTIMIZE
Browse files Browse the repository at this point in the history
  • Loading branch information
pajaks authored and ebyhr committed Mar 11, 2024
1 parent 2967dcc commit 947f972
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1431,6 +1431,7 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle
{
IcebergOptimizeHandle optimizeHandle = (IcebergOptimizeHandle) executeHandle.getProcedureHandle();
Table icebergTable = transaction.table();
Optional<Long> beforeWriteSnapshotId = getCurrentSnapshotId(icebergTable);

// files to be deleted
ImmutableSet.Builder<DataFile> scannedDataFilesBuilder = ImmutableSet.builder();
Expand Down Expand Up @@ -1491,6 +1492,30 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle
rewriteFiles.validateFromSnapshot(snapshot.snapshotId());
commit(rewriteFiles, session);
transaction.commitTransaction();

// TODO (https://github.com/trinodb/trino/issues/15439) this may not exactly be the snapshot we committed, if there is another writer
long newSnapshotId = transaction.table().currentSnapshot().snapshotId();
transaction = null;

// TODO (https://github.com/trinodb/trino/issues/15439): it would be good to publish data and stats atomically
beforeWriteSnapshotId.ifPresent(previous ->
verify(previous != newSnapshotId, "Failed to get new snapshot ID"));

try {
beginTransaction(catalog.loadTable(session, executeHandle.getSchemaTableName()));
Table reloadedTable = transaction.table();
StatisticsFile newStatsFile = tableStatisticsWriter.rewriteStatisticsFile(session, reloadedTable, newSnapshotId);

transaction.updateStatistics()
.setStatistics(newSnapshotId, newStatsFile)
.commit();
transaction.commitTransaction();
}
catch (Exception e) {
// Write was committed, so at this point we cannot fail the query
// TODO (https://github.com/trinodb/trino/issues/15439): it would be good to publish data and stats atomically
log.error(e, "Failed to save table statistics");
}
transaction = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ public StatisticsFile writeStatisticsFile(
return writeStatisticsFile(session, table, fileIO, snapshotId, ndvSketches);
}

public StatisticsFile rewriteStatisticsFile(ConnectorSession session, Table table, long snapshotId)
{
TableOperations operations = ((HasTableOperations) table).operations();
FileIO fileIO = operations.io();
// This will rewrite old statistics file as ndvSketches map is empty
return writeStatisticsFile(session, table, fileIO, snapshotId, Map.of());
}

private GenericStatisticsFile writeStatisticsFile(ConnectorSession session, Table table, FileIO fileIO, long snapshotId, Map<Integer, CompactSketch> ndvSketches)
{
Snapshot snapshot = table.snapshot(snapshotId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,68 @@ public void testShowStatsAfterExpiration()
assertUpdate("DROP TABLE show_stats_after_expiration");
}

@Test
public void testShowStatsAfterOptimize()
{
String tableName = "show_stats_after_optimize_" + randomNameSuffix();

String catalog = getSession().getCatalog().orElseThrow();
Session writeSession = withStatsOnWrite(getSession(), false);
Session minimalSnapshotRetentionSession = Session.builder(getSession())
.setCatalogSessionProperty(catalog, EXPIRE_SNAPSHOTS_MIN_RETENTION, "0s")
.build();

String expireSnapshotQuery = "ALTER TABLE " + tableName + " EXECUTE expire_snapshots(retention_threshold => '0d')";

assertUpdate(writeSession, "CREATE TABLE " + tableName + "(key integer)");
// create several snapshots
assertUpdate(writeSession, "INSERT INTO " + tableName + " VALUES 1", 1);
assertUpdate(writeSession, "INSERT INTO " + tableName + " VALUES 2", 1);
assertUpdate(writeSession, "INSERT INTO " + tableName + " VALUES 3", 1);

assertUpdate("ANALYZE " + tableName);
assertUpdate(writeSession, "INSERT INTO " + tableName + " VALUES 4", 1);

assertQuery(
"SHOW STATS FOR " + tableName,
"""
VALUES
('key', null, 3, 0, null, '1', '4'), -- NDV present, stats "inherited" from previous snapshot
(null, null, null, null, 4, null, null)""");

assertUpdate(minimalSnapshotRetentionSession, expireSnapshotQuery);

// NDV is not present after expire_snapshot as last snapshot did not contained stats
assertQuery(
"SHOW STATS FOR " + tableName,
"""
VALUES
('key', null, null, 0, null, '1', '4'), -- NDV not present as expire_snapshot removed stats for previous snapshots
(null, null, null, null, 4, null, null)""");

assertUpdate("ANALYZE " + tableName);

assertQuery(
"SHOW STATS FOR " + tableName,
"""
VALUES
('key', null, 4, 0, null, '1', '4'), -- NDV present
(null, null, null, null, 4, null, null)""");

// Optimize should rewrite stats file
assertUpdate("ALTER TABLE " + tableName + " EXECUTE optimize");
assertUpdate(minimalSnapshotRetentionSession, expireSnapshotQuery);

assertQuery(
"SHOW STATS FOR " + tableName,
"""
VALUES
('key', null, 4, 0, null, '1', '4'), -- NDV present
(null, null, null, null, 4, null, null)""");

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testStatsAfterDeletingAllRows()
{
Expand Down

0 comments on commit 947f972

Please sign in to comment.