From 365df72d9c5d245de19b36134ad55ef9a6d67c83 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Mon, 3 Jun 2024 08:33:42 +0200 Subject: [PATCH] Simplify OpenLineage configuration --- .../openlineage/OpenLineageListener.java | 10 ++++---- .../OpenLineageListenerModule.java | 2 +- .../openlineage/OpenLineageTrinoFacet.java | 23 ++----------------- .../config/OpenLineageListenerConfig.java | 15 ++++++------ .../TestOpenLineageListenerConfig.java | 22 ++++++++++++++---- 5 files changed, 33 insertions(+), 39 deletions(-) diff --git a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListener.java b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListener.java index 57825f3db31e..c3dc9d2b9596 100644 --- a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListener.java +++ b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListener.java @@ -184,9 +184,9 @@ public RunEvent getStartEvent(UUID runID, QueryCreatedEvent queryCreatedEvent) { RunFacetsBuilder runFacetsBuilder = getBaseRunFacetsBuilder(queryCreatedEvent.getContext()); - runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_METADATA.getText(), + runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_METADATA.asText(), getTrinoMetadataFacet(queryCreatedEvent.getMetadata())); - runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_QUERY_CONTEXT.getText(), + runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_QUERY_CONTEXT.asText(), getTrinoQueryContextFacet(queryCreatedEvent.getContext())); return openLineage.newRunEventBuilder() @@ -203,11 +203,11 @@ public RunEvent getCompletedEvent(UUID runID, QueryCompletedEvent queryCompleted RunFacetsBuilder runFacetsBuilder = getBaseRunFacetsBuilder(queryCompletedEvent.getContext()); - runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_METADATA.getText(), + runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_METADATA.asText(), getTrinoMetadataFacet(queryCompletedEvent.getMetadata())); - runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_QUERY_CONTEXT.getText(), + runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_QUERY_CONTEXT.asText(), getTrinoQueryContextFacet(queryCompletedEvent.getContext())); - runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_QUERY_STATISTICS.getText(), + runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_QUERY_STATISTICS.asText(), getTrinoQueryStatisticsFacet(queryCompletedEvent.getStatistics())); if (failed) { diff --git a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListenerModule.java b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListenerModule.java index 0b6cdc045e49..024e7f2f16ad 100644 --- a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListenerModule.java +++ b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListenerModule.java @@ -65,7 +65,7 @@ private OpenLineageClient getClient(OpenLineageListenerConfig listenerConfig, Op String[] disabledFacets = listenerConfig .getDisabledFacets() .stream() - .map(OpenLineageTrinoFacet::getText) + .map(OpenLineageTrinoFacet::asText) .toArray(String[]::new); clientBuilder.disableFacets(disabledFacets); diff --git a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageTrinoFacet.java b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageTrinoFacet.java index a7084bd6c314..1c8f0015761d 100644 --- a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageTrinoFacet.java +++ b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageTrinoFacet.java @@ -21,27 +21,8 @@ public enum OpenLineageTrinoFacet TRINO_QUERY_STATISTICS, TRINO_QUERY_CONTEXT; - final String text; - - OpenLineageTrinoFacet() - { - this.text = this.name().toLowerCase(ENGLISH); - } - - public String getText() + public String asText() { - return this.text; - } - - public static OpenLineageTrinoFacet fromText(String text) - throws IllegalArgumentException - { - for (OpenLineageTrinoFacet facet : OpenLineageTrinoFacet.values()) { - if (facet.text.equals(text)) { - return facet; - } - } - - throw new IllegalArgumentException(text); + return name().toLowerCase(ENGLISH); } } diff --git a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/config/OpenLineageListenerConfig.java b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/config/OpenLineageListenerConfig.java index 2926a14df3ca..5d1b2bda9b33 100644 --- a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/config/OpenLineageListenerConfig.java +++ b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/config/OpenLineageListenerConfig.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.openlineage.config; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; @@ -27,14 +26,14 @@ import java.util.Optional; import java.util.Set; -import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static java.util.Locale.ENGLISH; public class OpenLineageListenerConfig { private OpenLineageTransport transport = OpenLineageTransport.CONSOLE; private URI trinoURI; - private List disabledFacets = ImmutableList.of(); + private Set disabledFacets = ImmutableSet.of(); private Optional namespace = Optional.empty(); private Set includeQueryTypes = ImmutableSet.builder() @@ -83,13 +82,13 @@ public Set getIncludeQueryTypes() public OpenLineageListenerConfig setIncludeQueryTypes(List includeQueryTypes) { this.includeQueryTypes = includeQueryTypes.stream() - .map(String::trim) + .map(value -> value.toUpperCase(ENGLISH)) .map(QueryType::valueOf) .collect(toImmutableSet()); return this; } - public List getDisabledFacets() + public Set getDisabledFacets() { return disabledFacets; } @@ -100,9 +99,9 @@ public OpenLineageListenerConfig setDisabledFacets(List disabledFacets) throws RuntimeException { this.disabledFacets = disabledFacets.stream() - .map(String::trim) - .map(OpenLineageTrinoFacet::fromText) - .collect(toImmutableList()); + .map(value -> value.toUpperCase(ENGLISH)) + .map(OpenLineageTrinoFacet::valueOf) + .collect(toImmutableSet()); return this; } diff --git a/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageListenerConfig.java b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageListenerConfig.java index c6bfaaef54b7..5ccf49c99d15 100644 --- a/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageListenerConfig.java +++ b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageListenerConfig.java @@ -19,12 +19,20 @@ import org.junit.jupiter.api.Test; import java.net.URI; -import java.util.Arrays; import java.util.Map; import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static io.trino.plugin.openlineage.OpenLineageTrinoFacet.TRINO_METADATA; +import static io.trino.plugin.openlineage.OpenLineageTrinoFacet.TRINO_QUERY_STATISTICS; +import static io.trino.spi.resourcegroups.QueryType.ALTER_TABLE_EXECUTE; +import static io.trino.spi.resourcegroups.QueryType.DATA_DEFINITION; +import static io.trino.spi.resourcegroups.QueryType.DELETE; +import static io.trino.spi.resourcegroups.QueryType.INSERT; +import static io.trino.spi.resourcegroups.QueryType.MERGE; +import static io.trino.spi.resourcegroups.QueryType.SELECT; +import static io.trino.spi.resourcegroups.QueryType.UPDATE; final class TestOpenLineageListenerConfig { @@ -36,7 +44,13 @@ void testDefaults() .setTrinoURI(null) .setNamespace(null) .setDisabledFacets(ImmutableList.of()) - .setIncludeQueryTypes(Arrays.stream("ALTER_TABLE_EXECUTE,DELETE,INSERT,MERGE,UPDATE,DATA_DEFINITION".split(",")).toList())); + .setIncludeQueryTypes(ImmutableList.of( + ALTER_TABLE_EXECUTE.name(), + DELETE.name(), + INSERT.name(), + MERGE.name(), + UPDATE.name(), + DATA_DEFINITION.name()))); } @Test @@ -54,8 +68,8 @@ void testExplicitPropertyMappings() OpenLineageListenerConfig expected = new OpenLineageListenerConfig() .setTransport(OpenLineageTransport.HTTP) .setTrinoURI(new URI("http://testtrino")) - .setIncludeQueryTypes(ImmutableList.of("SELECT", "DELETE")) - .setDisabledFacets(ImmutableList.of("trino_metadata", "trino_query_statistics")) + .setIncludeQueryTypes(ImmutableList.of(SELECT.name(), DELETE.name())) + .setDisabledFacets(ImmutableList.of(TRINO_METADATA.name(), TRINO_QUERY_STATISTICS.name())) .setNamespace("testnamespace"); assertFullMapping(properties, expected);