Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include Column Lineage Info for All Queries #23322

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1596,7 +1596,33 @@ protected Scope visitQuery(Query node, Optional<Scope> scope)
.withRelationType(RelationId.of(node), queryBodyScope.getRelationType())
.build();

// collect output columns info
ImmutableList.Builder<OutputColumn> outputColumnsBuilder = ImmutableList.builder();
for (Field field : queryScope.getRelationType().getVisibleFields()) {
OutputColumn outputColumn = new OutputColumn(new Column(field.getName().orElse(""), field.getType().toString()), analysis.getSourceColumns(field));
outputColumnsBuilder.add(outputColumn);
}

analysis.setScope(node, queryScope);
ImmutableList<OutputColumn> outputColumns = outputColumnsBuilder.build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: List

if (!outputColumns.isEmpty()) {
QualifiedObjectName qualifiedName = new QualifiedObjectName("", "", "");
CatalogHandle.CatalogVersion version = new CatalogHandle.CatalogVersion("1");
if (node.getQueryBody() instanceof QuerySpecification querySpecification) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is a bit brittle as we capture them only for simple queries and not for queries which has WITH clause in the begining.

Optional<Relation> from = querySpecification.getFrom();
if (from.isPresent() && from.get() instanceof Table) {
qualifiedName = createQualifiedObjectName(session, from.get(), ((Table) from.get()).getName());
try {
CatalogHandle catalogHandle = getRequiredCatalogHandle(metadata, session, node, qualifiedName.catalogName());
version = catalogHandle.getVersion();
}
catch (TrinoException e) {
// ignore since catalog might not be available
}
}
}
analysis.setUpdateTarget(version, qualifiedName, Optional.empty(), Optional.of(outputColumns));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if using updateTarget is a right approach here as it is used for a different set of operations and we can't bind it to the first table we encounter as a part of SELECT query

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did think about this and tried to first add a dedicated target but it required quite a bit of plumbing and changes creeped in across many classes and ultimately the EventListener as well; hence the snippet in the description of the PR. So I wasn't sure if we want to go down a major change.
I do agree it's somewhat awkward to tie a select to a first table but ultimately binding a select to table is kinda irrelevant conceptually and should be just ignored. I am happy to change course on this as well.

}
return queryScope;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.trino.spi.eventlistener.QueryCreatedEvent;
import io.trino.spi.eventlistener.QueryFailureInfo;
import io.trino.spi.eventlistener.QueryInputMetadata;
import io.trino.spi.eventlistener.QueryOutputMetadata;
import io.trino.spi.eventlistener.QueryStatistics;
import io.trino.spi.eventlistener.RoutineInfo;
import io.trino.spi.eventlistener.TableInfo;
Expand Down Expand Up @@ -879,7 +880,10 @@ public void testPrepareAndExecute()
queryCompletedEvent = queryEvents.getQueryCompletedEvent();
assertThat(queryCompletedEvent.getContext().getResourceGroupId().isPresent()).isTrue();
assertThat(queryCompletedEvent.getContext().getResourceGroupId().get()).isEqualTo(createResourceGroupId("global", "user-user"));
assertThat(queryCompletedEvent.getIoMetadata().getOutput()).isEqualTo(Optional.empty());
Optional<QueryOutputMetadata> output = queryCompletedEvent.getIoMetadata().getOutput();
assertThat(output).isNotEmpty();
assertThat(output.get().getColumns()).isNotEmpty();
assertThat(output.get().getColumns().get().size()).isEqualTo(1);
assertThat(queryCompletedEvent.getIoMetadata().getInputs().size()).isEqualTo(1);
assertThat(queryCompletedEvent.getContext().getClientInfo().get()).isEqualTo("{\"clientVersion\":\"testVersion\"}");
assertThat(getOnlyElement(queryCompletedEvent.getIoMetadata().getInputs()).getCatalogName()).isEqualTo("tpch");
Expand Down Expand Up @@ -1517,6 +1521,7 @@ public void testAllImmediateFailureEventsPresent()
private void assertLineage(String baseQuery, Set<String> inputTables, OutputColumnMetadata... outputColumnMetadata)
throws Exception
{
assertLineageInternal(baseQuery, inputTables, outputColumnMetadata);
assertLineageInternal("CREATE TABLE mock.default.create_new_table AS " + baseQuery, inputTables, outputColumnMetadata);
assertLineageInternal("CREATE VIEW mock.default.create_new_view AS " + baseQuery, inputTables, outputColumnMetadata);
assertLineageInternal("CREATE VIEW mock.default.create_new_materialized_view AS " + baseQuery, inputTables, outputColumnMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.spi.connector.ConnectorFactory;
import io.trino.spi.eventlistener.QueryCompletedEvent;
import io.trino.spi.eventlistener.QueryCreatedEvent;
import io.trino.spi.eventlistener.QueryOutputMetadata;
import io.trino.spi.eventlistener.QueryStatistics;
import io.trino.spi.eventlistener.SplitCompletedEvent;
import io.trino.spi.resourcegroups.QueryType;
Expand Down Expand Up @@ -122,7 +123,10 @@ public void testSplitsForNormalQuery()
QueryCompletedEvent queryCompletedEvent = queryEvents.getQueryCompletedEvent();
assertThat(queryCompletedEvent.getContext().getResourceGroupId().isPresent()).isTrue();
assertThat(queryCompletedEvent.getContext().getResourceGroupId().get()).isEqualTo(createResourceGroupId("global", "user-user"));
assertThat(queryCompletedEvent.getIoMetadata().getOutput()).isEqualTo(Optional.empty());
Optional<QueryOutputMetadata> output = queryCompletedEvent.getIoMetadata().getOutput();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of capturing this event for SELECT in QueryOutputMetadata ? Can we add additional field in QueryIOMetadata where there could be a Optional<List<OutputColumnMetadata>> which could be set only for SELECT statements in this way the logic could be simplified.

@martint - Is QueryIOMetadata a right place to add them or is there a better place we could store them ?

Copy link
Author

@behrooz-stripe behrooz-stripe Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did take a stab at storing these info separately from output and as I mentioned here it (changes) quickly leaks to many places (QueryInfo.java, QueryStateMachine.java, QueryMonitor.java, QueryCompletedEvent.java, ...) to plumb this info from analysis all the way to to event listener.
So basically my idea was to avoid that and reuse update target for carrying the same info. In any case, I put a gist of the actual diff if I were to hold this in a separate variable in the analysis, CompletedEvent, and QueryInfo. I am happy to update this PR to reflect that if you folks prefer that.
CC: @martint @Praveen2112

assertThat(output).isNotEmpty();
assertThat(output.get().getColumns()).isNotEmpty();
assertThat(output.get().getColumns().get().size()).isEqualTo(1);
assertThat(queryCompletedEvent.getIoMetadata().getInputs().size()).isEqualTo(1);
assertThat(queryCompletedEvent.getContext().getClientInfo().get()).isEqualTo("{\"clientVersion\":\"testVersion\"}");
assertThat(getOnlyElement(queryCompletedEvent.getIoMetadata().getInputs()).getCatalogName()).isEqualTo("tpch");
Expand Down
Loading