Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push down version filter in the Delta $history table #16192

Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Use cursor-based implementation for the Delta $history metadata table
findinpath committed Mar 9, 2023
commit 33fb62fec0ae0ffc104545d4918c248c70cfae3d
Original file line number Diff line number Diff line change
@@ -14,26 +14,30 @@
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.util.PageListBuilder;
import io.trino.spi.Page;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.FixedPageSource;
import io.trino.spi.connector.InMemoryRecordSet;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.TypeSignature.mapType;
import static io.trino.spi.type.VarcharType.VARCHAR;
@@ -44,7 +48,9 @@ public class DeltaLakeHistoryTable
implements SystemTable
{
private final ConnectorTableMetadata tableMetadata;
private final List<CommitInfoEntry> commitInfoEntries;
private final List<Type> types;
private final Iterable<CommitInfoEntry> commitInfoEntries;
private final Type varcharToVarcharMapType;

public DeltaLakeHistoryTable(SchemaTableName tableName, List<CommitInfoEntry> commitInfoEntries, TypeManager typeManager)
{
@@ -68,6 +74,10 @@ public DeltaLakeHistoryTable(SchemaTableName tableName, List<CommitInfoEntry> co
.add(new ColumnMetadata("is_blind_append", BOOLEAN))
//TODO add support for operationMetrics, userMetadata, engineInfo
.build());
types = tableMetadata.getColumns().stream()
.map(ColumnMetadata::getType)
.collect(toImmutableList());
varcharToVarcharMapType = typeManager.getType(mapType(VARCHAR.getTypeSignature(), VARCHAR.getTypeSignature()));
}

@Override
@@ -83,51 +93,45 @@ public ConnectorTableMetadata getTableMetadata()
}

@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
if (commitInfoEntries.isEmpty()) {
return new FixedPageSource(ImmutableList.of());
}
return new FixedPageSource(buildPages(session));
}

private List<Page> buildPages(ConnectorSession session)
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);
TimeZoneKey timeZoneKey = session.getTimeZoneKey();
Iterable<List<Object>> records = Iterables.transform(commitInfoEntries, commitInfoEntry -> getRecord(commitInfoEntry, timeZoneKey));

commitInfoEntries.forEach(commitInfoEntry -> {
pagesBuilder.beginRow();

pagesBuilder.appendBigint(commitInfoEntry.getVersion());
pagesBuilder.appendTimestampTzMillis(commitInfoEntry.getTimestamp(), timeZoneKey);
write(commitInfoEntry.getUserId(), pagesBuilder);
write(commitInfoEntry.getUserName(), pagesBuilder);
write(commitInfoEntry.getOperation(), pagesBuilder);
if (commitInfoEntry.getOperationParameters() == null) {
pagesBuilder.appendNull();
}
else {
pagesBuilder.appendVarcharVarcharMap(commitInfoEntry.getOperationParameters());
}
write(commitInfoEntry.getClusterId(), pagesBuilder);
pagesBuilder.appendBigint(commitInfoEntry.getReadVersion());
write(commitInfoEntry.getIsolationLevel(), pagesBuilder);
commitInfoEntry.isBlindAppend().ifPresentOrElse(pagesBuilder::appendBoolean, pagesBuilder::appendNull);

pagesBuilder.endRow();
});

return pagesBuilder.build();
return new InMemoryRecordSet(types, records).cursor();
}

private static void write(String value, PageListBuilder pagesBuilder)
private List<Object> getRecord(CommitInfoEntry commitInfoEntry, TimeZoneKey timeZoneKey)
{
if (value == null) {
pagesBuilder.appendNull();
List<Object> columns = new ArrayList<>();
columns.add(commitInfoEntry.getVersion());
columns.add(packDateTimeWithZone(commitInfoEntry.getTimestamp(), timeZoneKey));
columns.add(commitInfoEntry.getUserId());
columns.add(commitInfoEntry.getUserName());
columns.add(commitInfoEntry.getOperation());
if (commitInfoEntry.getOperationParameters() == null) {
columns.add(null);
}
else {
pagesBuilder.appendVarchar(value);
columns.add(toVarcharVarcharMapBlock(commitInfoEntry.getOperationParameters()));
}
columns.add(commitInfoEntry.getClusterId());
columns.add(commitInfoEntry.getReadVersion());
columns.add(commitInfoEntry.getIsolationLevel());
columns.add(commitInfoEntry.isBlindAppend().orElse(null));

return columns;
}

private Object toVarcharVarcharMapBlock(Map<String, String> values)
{
BlockBuilder blockBuilder = varcharToVarcharMapType.createBlockBuilder(null, 1);
BlockBuilder singleMapBlockBuilder = blockBuilder.beginBlockEntry();
values.forEach((key, value) -> {
VARCHAR.writeString(singleMapBlockBuilder, key);
VARCHAR.writeString(singleMapBlockBuilder, value);
});
blockBuilder.closeEntry();
return varcharToVarcharMapType.getObject(blockBuilder, 0);
}
}