Skip to content

Commit

Permalink
Simplify OpenLineage configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo committed Jun 3, 2024
1 parent c22a337 commit 365df72
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ public RunEvent getStartEvent(UUID runID, QueryCreatedEvent queryCreatedEvent)
{
RunFacetsBuilder runFacetsBuilder = getBaseRunFacetsBuilder(queryCreatedEvent.getContext());

runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_METADATA.getText(),
runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_METADATA.asText(),
getTrinoMetadataFacet(queryCreatedEvent.getMetadata()));
runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_QUERY_CONTEXT.getText(),
runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_QUERY_CONTEXT.asText(),
getTrinoQueryContextFacet(queryCreatedEvent.getContext()));

return openLineage.newRunEventBuilder()
Expand All @@ -203,11 +203,11 @@ public RunEvent getCompletedEvent(UUID runID, QueryCompletedEvent queryCompleted

RunFacetsBuilder runFacetsBuilder = getBaseRunFacetsBuilder(queryCompletedEvent.getContext());

runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_METADATA.getText(),
runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_METADATA.asText(),
getTrinoMetadataFacet(queryCompletedEvent.getMetadata()));
runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_QUERY_CONTEXT.getText(),
runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_QUERY_CONTEXT.asText(),
getTrinoQueryContextFacet(queryCompletedEvent.getContext()));
runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_QUERY_STATISTICS.getText(),
runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_QUERY_STATISTICS.asText(),
getTrinoQueryStatisticsFacet(queryCompletedEvent.getStatistics()));

if (failed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private OpenLineageClient getClient(OpenLineageListenerConfig listenerConfig, Op
String[] disabledFacets = listenerConfig
.getDisabledFacets()
.stream()
.map(OpenLineageTrinoFacet::getText)
.map(OpenLineageTrinoFacet::asText)
.toArray(String[]::new);

clientBuilder.disableFacets(disabledFacets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,8 @@ public enum OpenLineageTrinoFacet
TRINO_QUERY_STATISTICS,
TRINO_QUERY_CONTEXT;

final String text;

OpenLineageTrinoFacet()
{
this.text = this.name().toLowerCase(ENGLISH);
}

public String getText()
public String asText()
{
return this.text;
}

public static OpenLineageTrinoFacet fromText(String text)
throws IllegalArgumentException
{
for (OpenLineageTrinoFacet facet : OpenLineageTrinoFacet.values()) {
if (facet.text.equals(text)) {
return facet;
}
}

throw new IllegalArgumentException(text);
return name().toLowerCase(ENGLISH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.openlineage.config;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
Expand All @@ -27,14 +26,14 @@
import java.util.Optional;
import java.util.Set;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Locale.ENGLISH;

public class OpenLineageListenerConfig
{
private OpenLineageTransport transport = OpenLineageTransport.CONSOLE;
private URI trinoURI;
private List<OpenLineageTrinoFacet> disabledFacets = ImmutableList.of();
private Set<OpenLineageTrinoFacet> disabledFacets = ImmutableSet.of();
private Optional<String> namespace = Optional.empty();

private Set<QueryType> includeQueryTypes = ImmutableSet.<QueryType>builder()
Expand Down Expand Up @@ -83,13 +82,13 @@ public Set<QueryType> getIncludeQueryTypes()
public OpenLineageListenerConfig setIncludeQueryTypes(List<String> includeQueryTypes)
{
this.includeQueryTypes = includeQueryTypes.stream()
.map(String::trim)
.map(value -> value.toUpperCase(ENGLISH))
.map(QueryType::valueOf)
.collect(toImmutableSet());
return this;
}

public List<OpenLineageTrinoFacet> getDisabledFacets()
public Set<OpenLineageTrinoFacet> getDisabledFacets()
{
return disabledFacets;
}
Expand All @@ -100,9 +99,9 @@ public OpenLineageListenerConfig setDisabledFacets(List<String> disabledFacets)
throws RuntimeException
{
this.disabledFacets = disabledFacets.stream()
.map(String::trim)
.map(OpenLineageTrinoFacet::fromText)
.collect(toImmutableList());
.map(value -> value.toUpperCase(ENGLISH))
.map(OpenLineageTrinoFacet::valueOf)
.collect(toImmutableSet());
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@
import org.junit.jupiter.api.Test;

import java.net.URI;
import java.util.Arrays;
import java.util.Map;

import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.trino.plugin.openlineage.OpenLineageTrinoFacet.TRINO_METADATA;
import static io.trino.plugin.openlineage.OpenLineageTrinoFacet.TRINO_QUERY_STATISTICS;
import static io.trino.spi.resourcegroups.QueryType.ALTER_TABLE_EXECUTE;
import static io.trino.spi.resourcegroups.QueryType.DATA_DEFINITION;
import static io.trino.spi.resourcegroups.QueryType.DELETE;
import static io.trino.spi.resourcegroups.QueryType.INSERT;
import static io.trino.spi.resourcegroups.QueryType.MERGE;
import static io.trino.spi.resourcegroups.QueryType.SELECT;
import static io.trino.spi.resourcegroups.QueryType.UPDATE;

final class TestOpenLineageListenerConfig
{
Expand All @@ -36,7 +44,13 @@ void testDefaults()
.setTrinoURI(null)
.setNamespace(null)
.setDisabledFacets(ImmutableList.of())
.setIncludeQueryTypes(Arrays.stream("ALTER_TABLE_EXECUTE,DELETE,INSERT,MERGE,UPDATE,DATA_DEFINITION".split(",")).toList()));
.setIncludeQueryTypes(ImmutableList.of(
ALTER_TABLE_EXECUTE.name(),
DELETE.name(),
INSERT.name(),
MERGE.name(),
UPDATE.name(),
DATA_DEFINITION.name())));
}

@Test
Expand All @@ -54,8 +68,8 @@ void testExplicitPropertyMappings()
OpenLineageListenerConfig expected = new OpenLineageListenerConfig()
.setTransport(OpenLineageTransport.HTTP)
.setTrinoURI(new URI("http://testtrino"))
.setIncludeQueryTypes(ImmutableList.of("SELECT", "DELETE"))
.setDisabledFacets(ImmutableList.of("trino_metadata", "trino_query_statistics"))
.setIncludeQueryTypes(ImmutableList.of(SELECT.name(), DELETE.name()))
.setDisabledFacets(ImmutableList.of(TRINO_METADATA.name(), TRINO_QUERY_STATISTICS.name()))
.setNamespace("testnamespace");

assertFullMapping(properties, expected);
Expand Down

0 comments on commit 365df72

Please sign in to comment.