Skip to content

Commit

Permalink
Push down version filter in the Delta $history metadata table
Browse files Browse the repository at this point in the history
  • Loading branch information
findinpath committed Mar 9, 2023
1 parent a9905fc commit 0760302
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.Type;
Expand All @@ -42,6 +44,7 @@
import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Streams.stream;
import static io.trino.spi.type.BigintType.BIGINT;
Expand All @@ -55,6 +58,7 @@
public class DeltaLakeHistoryTable
implements SystemTable
{
private static final int VERSION_COLUMN_INDEX = 0;
private final SchemaTableName tableName;
private final ConnectorTableMetadata tableMetadata;
private final List<Type> types;
Expand Down Expand Up @@ -113,14 +117,15 @@ public ConnectorTableMetadata getTableMetadata()
@Override
public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
VersionRange versionRange = extractVersionRange(constraint);
TimeZoneKey timeZoneKey = session.getTimeZoneKey();
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
Iterable<List<Object>> records = () ->
stream(new DeltaLakeTransactionLogIterator(
fileSystem,
new Path(metastore.getTableLocation(tableName, session)),
Optional.empty(),
Optional.empty()))
versionRange.startVersion,
versionRange.endVersion))
.flatMap(Collection::stream)
.map(DeltaLakeTransactionLogEntry::getCommitInfo)
.filter(Objects::nonNull)
Expand Down Expand Up @@ -163,4 +168,55 @@ private Object toVarcharVarcharMapBlock(Map<String, String> values)
blockBuilder.closeEntry();
return varcharToVarcharMapType.getObject(blockBuilder, 0);
}

private static VersionRange extractVersionRange(TupleDomain<Integer> constraint)
{
Domain versionDomain = constraint.getDomains()
.map(map -> map.get(VERSION_COLUMN_INDEX))
.orElse(Domain.all(BIGINT));
Optional<Long> startVersion = Optional.empty();
Optional<Long> endVersion = Optional.empty();
if (versionDomain.isAll() || versionDomain.isNone()) {
return new VersionRange(startVersion, endVersion);
}
List<Range> orderedRanges = versionDomain.getValues().getRanges().getOrderedRanges();
if (orderedRanges.size() == 1) {
// Opt for a rather pragmatical choice of extracting the version range
// only when dealing with a single range
Range range = orderedRanges.get(0);
if (range.isSingleValue()) {
long version = (long) range.getLowBoundedValue();
startVersion = Optional.of(version);
endVersion = Optional.of(version);
}
else {
if (!range.isLowUnbounded()) {
long version = (long) range.getLowBoundedValue();
if (!range.isLowInclusive()) {
version++;
}
startVersion = Optional.of(version);
}
if (!range.isHighUnbounded()) {
long version = (long) range.getHighBoundedValue();
if (!range.isHighInclusive()) {
version--;
}
endVersion = Optional.of(version);
}
}
}
return new VersionRange(startVersion, endVersion);
}

private record VersionRange(Optional<Long> startVersion, Optional<Long> endVersion)
{
@SuppressWarnings("UnusedVariable") // TODO: Remove once https://github.com/google/error-prone/issues/2713 is fixed
private VersionRange
{
requireNonNull(startVersion, "startVersion is null");
requireNonNull(endVersion, "endVersion is null");
verify(startVersion.orElse(0L) <= endVersion.orElse(startVersion.orElse(0L)), "startVersion is greater than endVersion");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public Map<String, Integer> getOpenCount()
return map.buildOrThrow();
}

public void resetOpenCount()
{
openedFiles.clear();
}

private void incrementOpenCount(String path)
{
openedFiles.add(path.substring(path.lastIndexOf('/') + 1));
Expand Down Expand Up @@ -91,37 +96,42 @@ public TrinoInputFile newInputFile(String location, long length)
@Override
public TrinoOutputFile newOutputFile(String location)
{
throw new UnsupportedOperationException();
return delegate.newOutputFile(location);
}

@Override
public void deleteFile(String location)
throws IOException
{
throw new UnsupportedOperationException();
delegate.deleteFile(location);
}

@Override
public void deleteFiles(Collection<String> locations)
throws IOException
{
throw new UnsupportedOperationException();
delegate.deleteFiles(locations);
}

@Override
public void deleteDirectory(String location)
throws IOException
{
throw new UnsupportedOperationException();
delegate.deleteDirectory(location);
}

@Override
public void renameFile(String source, String target)
throws IOException
{
throw new UnsupportedOperationException();
delegate.renameFile(source, target);
}

@Override
public FileIterator listFiles(String location)
throws IOException
{
throw new UnsupportedOperationException();
return delegate.listFiles(location);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ public void testHistoryTableWithLogEntriesRemoved()

assertThat(query("SELECT version, operation, read_version, isolation_level, is_blind_append FROM \"" + tableName + "$history\""))
.matches("""
VALUES
(BIGINT '3', VARCHAR 'WRITE', BIGINT '2', VARCHAR 'WriteSerializable', true),
(BIGINT '4', VARCHAR 'DELETE', BIGINT '3', VARCHAR 'WriteSerializable', false),
(BIGINT '5', VARCHAR 'SET TBLPROPERTIES', BIGINT '4', VARCHAR 'WriteSerializable', true)
""");
VALUES
(BIGINT '3', VARCHAR 'WRITE', BIGINT '2', VARCHAR 'WriteSerializable', true),
(BIGINT '4', VARCHAR 'DELETE', BIGINT '3', VARCHAR 'WriteSerializable', false),
(BIGINT '5', VARCHAR 'SET TBLPROPERTIES', BIGINT '4', VARCHAR 'WriteSerializable', true)
""");
}
}
Loading

0 comments on commit 0760302

Please sign in to comment.