Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add $file_modified_time column in Iceberg #13082

Merged
merged 1 commit into from
Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -610,11 +610,13 @@ path metadata as a hidden column in each table:

* ``$path``: Full file system path name of the file for this row

You can use this column in your SQL statements like any other column. This
* ``$file_modified_time``: Timestamp of the last modification of the file for this row

You can use these columns in your SQL statements like any other column. This
can be selected directly, or used in conditional statements. For example, you
can inspect the file path for each record::

SELECT *, "$path"
SELECT *, "$path", "$file_modified_time"
FROM iceberg.web.page_views;

Retrieve all records that belong to a specific file using ``"$path"`` filter::
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -623,6 +625,12 @@ Retrieve all records that belong to a specific file using ``"$path"`` filter::
FROM iceberg.web.page_views
WHERE "$path" = '/usr/iceberg/table/web.page_views/data/file_01.parquet'

Retrieve all records that belong to a specific file using ``"$file_modified_time"`` filter::
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO an example with inequality based filtering would be more practical, but nbd


SELECT *
FROM iceberg.web.page_views
WHERE "$file_modified_time" = CAST('2022-07-01 01:02:03.456 UTC' AS timestamp with time zone)

.. _iceberg-metadata-tables:

Metadata tables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Objects;
import java.util.Optional;

import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_MODIFIED_TIME;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_PATH;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataColumns.IS_DELETED;
Expand Down Expand Up @@ -168,6 +169,12 @@ public boolean isIsDeletedColumn()
return id == IS_DELETED.fieldId();
}

@JsonIgnore
public boolean isFileModifiedTimeColumn()
{
return id == FILE_MODIFIED_TIME.getId();
}

@Override
public int hashCode()
{
Expand Down Expand Up @@ -216,6 +223,25 @@ public static ColumnMetadata pathColumnMetadata()
.build();
}

public static IcebergColumnHandle fileModifiedTimeColumnHandle()
{
return new IcebergColumnHandle(
columnIdentity(FILE_MODIFIED_TIME),
FILE_MODIFIED_TIME.getType(),
ImmutableList.of(),
FILE_MODIFIED_TIME.getType(),
Optional.empty());
}

public static ColumnMetadata fileModifiedTimeColumnMetadata()
{
return ColumnMetadata.builder()
.setName(FILE_MODIFIED_TIME.getColumnName())
.setType(FILE_MODIFIED_TIME.getType())
.setHidden(true)
.build();
}

private static ColumnIdentity columnIdentity(IcebergMetadataColumn metadata)
{
return new ColumnIdentity(metadata.getId(), metadata.getColumnName(), metadata.getTypeCategory(), ImmutableList.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,14 @@
import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression;
import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_UPDATE_ROW_ID_COLUMN_ID;
import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_UPDATE_ROW_ID_COLUMN_NAME;
import static io.trino.plugin.iceberg.IcebergColumnHandle.fileModifiedTimeColumnHandle;
import static io.trino.plugin.iceberg.IcebergColumnHandle.fileModifiedTimeColumnMetadata;
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnHandle;
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnMetadata;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_MODIFIED_TIME;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_PATH;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getExpireSnapshotMinRetention;
Expand Down Expand Up @@ -525,6 +528,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
columnHandles.put(columnHandle.getName(), columnHandle);
}
columnHandles.put(FILE_PATH.getColumnName(), pathColumnHandle());
columnHandles.put(FILE_MODIFIED_TIME.getColumnName(), fileModifiedTimeColumnHandle());
return columnHandles.buildOrThrow();
}

Expand Down Expand Up @@ -1415,6 +1419,7 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema
ImmutableList.Builder<ColumnMetadata> columns = ImmutableList.builder();
columns.addAll(getColumnMetadatas(icebergTable));
columns.add(pathColumnMetadata());
columns.add(fileModifiedTimeColumnMetadata());

return new ConnectorTableMetadata(table, columns.build(), getIcebergTableProperties(icebergTable), getTableComment(icebergTable));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory;
import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory.PRIMITIVE;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.VarcharType.VARCHAR;

public enum IcebergMetadataColumn
{
FILE_PATH(MetadataColumns.FILE_PATH.fieldId(), "$path", VARCHAR, PRIMITIVE),
FILE_MODIFIED_TIME(Integer.MAX_VALUE - 1001, "$file_modified_time", TIMESTAMP_TZ_MILLIS, PRIMITIVE), // https://github.com/apache/iceberg/issues/5240
/**/;

private static final Set<Integer> COLUMNS_ID = Stream.of(values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockMissingException;
Expand Down Expand Up @@ -120,6 +121,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -142,7 +144,9 @@
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CURSOR_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_MISSING_DATA;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_MODIFIED_TIME;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_PATH;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getOrcLazyReadSmallRanges;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getOrcMaxBufferSize;
Expand All @@ -169,6 +173,8 @@
import static io.trino.spi.predicate.Utils.nativeValueToBlock;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.UuidType.UUID;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
Expand Down Expand Up @@ -297,6 +303,18 @@ public ConnectorPageSource createPageSource(
fileSize = fileIoProvider.createFileIo(hdfsContext, session.getQueryId())
.newInputFile(split.getPath()).getLength();
}
OptionalLong fileModifiedTime = OptionalLong.empty();
if (requiredColumns.stream().anyMatch(IcebergColumnHandle::isFileModifiedTimeColumn)) {
try {
FileStatus fileStatus = hdfsEnvironment.doAs(
session.getIdentity(),
() -> hdfsEnvironment.getFileSystem(hdfsContext, new Path(split.getPath())).getFileStatus(new Path(split.getPath())));
fileModifiedTime = OptionalLong.of(fileStatus.getModificationTime());
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, e);
}
}

ReaderPageSource dataPageSource = createDataPageSource(
session,
Expand All @@ -305,6 +323,7 @@ public ConnectorPageSource createPageSource(
split.getStart(),
split.getLength(),
fileSize,
fileModifiedTime,
split.getFileFormat(),
split.getSchemaAsJson().map(SchemaParser::fromJson),
requiredColumns,
Expand Down Expand Up @@ -433,6 +452,7 @@ private ConnectorPageSource openDeletes(
0,
delete.fileSizeInBytes(),
delete.fileSizeInBytes(),
OptionalLong.empty(),
IcebergFileFormat.fromIceberg(delete.format()),
Optional.of(schemaFromHandles(columns)),
columns,
Expand All @@ -449,6 +469,7 @@ private ReaderPageSource createDataPageSource(
long start,
long length,
long fileSize,
OptionalLong fileModifiedTime,
IcebergFileFormat fileFormat,
Optional<Schema> fileSchema,
List<IcebergColumnHandle> dataColumns,
Expand All @@ -466,6 +487,7 @@ private ReaderPageSource createDataPageSource(
start,
length,
fileSize,
fileModifiedTime,
dataColumns,
predicate,
orcReaderOptions
Expand All @@ -490,6 +512,7 @@ private ReaderPageSource createDataPageSource(
start,
length,
fileSize,
fileModifiedTime,
dataColumns,
parquetReaderOptions
.withMaxReadBlockSize(getParquetMaxReadBlockSize(session)),
Expand All @@ -503,6 +526,7 @@ private ReaderPageSource createDataPageSource(
path,
start,
length,
fileModifiedTime,
fileSchema.orElseThrow(),
nameMapping,
dataColumns);
Expand All @@ -519,6 +543,7 @@ private static ReaderPageSource createOrcPageSource(
long start,
long length,
long fileSize,
OptionalLong fileModifiedTime,
List<IcebergColumnHandle> columns,
TupleDomain<IcebergColumnHandle> effectivePredicate,
OrcReaderOptions options,
Expand Down Expand Up @@ -585,6 +610,9 @@ else if (partitionKeys.containsKey(column.getId())) {
else if (column.isPathColumn()) {
columnAdaptations.add(ColumnAdaptation.constantColumn(nativeValueToBlock(FILE_PATH.getType(), utf8Slice(path.toString()))));
}
else if (column.isFileModifiedTimeColumn()) {
columnAdaptations.add(ColumnAdaptation.constantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(fileModifiedTime.orElseThrow(), UTC_KEY))));
}
else if (column.isUpdateRowIdColumn()) {
// $row_id is a composite of multiple physical columns. It is assembled by the IcebergPageSource
columnAdaptations.add(ColumnAdaptation.nullColumn(column.getType()));
Expand Down Expand Up @@ -862,6 +890,7 @@ private static ReaderPageSource createParquetPageSource(
long start,
long length,
long fileSize,
OptionalLong fileModifiedTime,
List<IcebergColumnHandle> regularColumns,
ParquetReaderOptions options,
TupleDomain<IcebergColumnHandle> effectivePredicate,
Expand Down Expand Up @@ -948,6 +977,9 @@ else if (partitionKeys.containsKey(column.getId())) {
else if (column.isPathColumn()) {
constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_PATH.getType(), utf8Slice(path.toString())));
}
else if (column.isFileModifiedTimeColumn()) {
constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(fileModifiedTime.orElseThrow(), UTC_KEY)));
}
else if (column.isUpdateRowIdColumn()) {
// $row_id is a composite of multiple physical columns, it is assembled by the IcebergPageSource
trinoTypes.add(column.getType());
Expand Down Expand Up @@ -1019,6 +1051,7 @@ private ReaderPageSource createAvroPageSource(
Path path,
long start,
long length,
OptionalLong fileModifiedTime,
Schema fileSchema,
Optional<NameMapping> nameMapping,
List<IcebergColumnHandle> columns)
Expand Down Expand Up @@ -1055,6 +1088,9 @@ private ReaderPageSource createAvroPageSource(
if (column.isPathColumn()) {
constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_PATH.getType(), utf8Slice(path.toString())));
}
else if (column.isFileModifiedTimeColumn()) {
constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(fileModifiedTime.orElseThrow(), UTC_KEY)));
}
// For delete
else if (column.isRowPositionColumn()) {
rowIndexChannels.add(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import static com.google.common.collect.Iterables.getLast;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.MoreCollectors.onlyElement;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static io.trino.SystemSessionProperties.PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS;
import static io.trino.SystemSessionProperties.SCALE_WRITERS;
import static io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
Expand Down Expand Up @@ -116,8 +117,10 @@
import static java.lang.String.format;
import static java.lang.String.join;
import static java.time.ZoneOffset.UTC;
import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
import static java.util.Collections.nCopies;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableList;
Expand Down Expand Up @@ -4505,6 +4508,38 @@ public void testPathHiddenColumn()
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testFileModifiedTimeHiddenColumn()
{
ZonedDateTime beforeTime = (ZonedDateTime) computeScalar("SELECT current_timestamp(3)");
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_file_modified_time_", "(col) AS VALUES (1)")) {
MaterializedResult expectedColumns = resultBuilder(getSession(), VARCHAR, VARCHAR, VARCHAR, VARCHAR)
.row("col", "integer", "", "")
.build();
MaterializedResult actualColumns = computeActual("DESCRIBE " + table.getName());
// Describe output should not have the $file_modified_time hidden column
assertEquals(actualColumns, expectedColumns);

ZonedDateTime fileModifiedTime = (ZonedDateTime) computeScalar("SELECT \"$file_modified_time\" FROM " + table.getName());
ZonedDateTime afterTime = (ZonedDateTime) computeScalar("SELECT current_timestamp(3)");
assertThat(fileModifiedTime).isBetween(beforeTime, afterTime);
}
}

@Test
public void testDeleteWithFileModifiedTimeColumn()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete_with_file_modified_time_", "(key int)")) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES (1)", 1);
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
sleepUninterruptibly(1, MILLISECONDS);
assertUpdate("INSERT INTO " + table.getName() + " VALUES (2)", 1);

ZonedDateTime oldModifiedTime = (ZonedDateTime) computeScalar("SELECT \"$file_modified_time\" FROM " + table.getName() + " WHERE key = 1");
assertUpdate("DELETE FROM " + table.getName() + " WHERE \"$file_modified_time\" = from_iso8601_timestamp('" + oldModifiedTime.format(ISO_OFFSET_DATE_TIME) + "')", 1);
assertQuery("SELECT * FROM " + table.getName(), "VALUES 2");
}
}

@Test
public void testExpireSnapshots()
throws Exception
Expand Down