From 99b9b37a9a0c4676e8b43f3012ec7b7f5315dd55 Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Fri, 31 May 2024 18:30:04 +0200 Subject: [PATCH] Add OpenLineage event listener plugin Co-authored-by: Naoki Takezoe --- .github/workflows/ci.yml | 1 + core/trino-server/src/main/provisio/trino.xml | 6 + docs/src/main/sphinx/admin.md | 1 + .../admin/event-listeners-openlineage.md | 245 ++++++++++++ plugin/trino-openlineage/pom.xml | 179 +++++++++ .../openlineage/OpenLineageListener.java | 356 ++++++++++++++++++ .../OpenLineageListenerFactory.java | 45 +++ .../OpenLineageListenerModule.java | 77 ++++ .../plugin/openlineage/OpenLineagePlugin.java | 28 ++ .../openlineage/OpenLineageTransport.java | 21 ++ .../openlineage/OpenLineageTrinoFacet.java | 45 +++ .../config/OpenLineageListenerConfig.java | 128 +++++++ .../OpenLineageClientHttpTransportConfig.java | 139 +++++++ .../OpenLineageConsoleTransport.java | 26 ++ .../transport/OpenLineageTransport.java | 22 ++ .../http/OpenLineageHttpTransport.java | 81 ++++ .../plugin/openlineage/MarquezServer.java | 161 ++++++++ .../OpenLineageListenerQueryRunner.java | 87 +++++ ...tOpenLineageClientHttpTransportConfig.java | 65 ++++ ...ineageEventListenerMarquezIntegration.java | 135 +++++++ .../openlineage/TestOpenLineageListener.java | 104 +++++ .../TestOpenLineageListenerConfig.java | 63 ++++ .../plugin/openlineage/TrinoEventData.java | 156 ++++++++ .../src/test/resources/marquez.yaml | 30 ++ pom.xml | 1 + 25 files changed, 2202 insertions(+) create mode 100644 docs/src/main/sphinx/admin/event-listeners-openlineage.md create mode 100644 plugin/trino-openlineage/pom.xml create mode 100644 plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListener.java create mode 100644 plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListenerFactory.java create mode 100644 plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListenerModule.java create mode 100644 plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineagePlugin.java create mode 100644 plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageTransport.java create mode 100644 plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageTrinoFacet.java create mode 100644 plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/config/OpenLineageListenerConfig.java create mode 100644 plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/config/http/OpenLineageClientHttpTransportConfig.java create mode 100644 plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/transport/OpenLineageConsoleTransport.java create mode 100644 plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/transport/OpenLineageTransport.java create mode 100644 plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/transport/http/OpenLineageHttpTransport.java create mode 100644 plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/MarquezServer.java create mode 100644 plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/OpenLineageListenerQueryRunner.java create mode 100644 plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageClientHttpTransportConfig.java create mode 100644 plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageEventListenerMarquezIntegration.java create mode 100644 plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageListener.java create mode 100644 plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageListenerConfig.java create mode 100644 plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TrinoEventData.java create mode 100644 plugin/trino-openlineage/src/test/resources/marquez.yaml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6f5eec5ee2ce..2ffb10efff53 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -481,6 +481,7 @@ jobs: - { modules: plugin/trino-mariadb } - { modules: plugin/trino-mongodb } - { modules: plugin/trino-mysql } + - { modules: plugin/trino-openlineage } - { modules: plugin/trino-opensearch } - { modules: plugin/trino-oracle } - { modules: plugin/trino-phoenix5 } diff --git a/core/trino-server/src/main/provisio/trino.xml b/core/trino-server/src/main/provisio/trino.xml index 7a2e0a6fe8f2..c0f920271f81 100644 --- a/core/trino-server/src/main/provisio/trino.xml +++ b/core/trino-server/src/main/provisio/trino.xml @@ -218,6 +218,12 @@ + + + + + + diff --git a/docs/src/main/sphinx/admin.md b/docs/src/main/sphinx/admin.md index cac3a4cffd4b..2edc1f666826 100644 --- a/docs/src/main/sphinx/admin.md +++ b/docs/src/main/sphinx/admin.md @@ -24,4 +24,5 @@ admin/fault-tolerant-execution admin/event-listeners-http admin/event-listeners-mysql +admin/event-listeners-openlineage ``` diff --git a/docs/src/main/sphinx/admin/event-listeners-openlineage.md b/docs/src/main/sphinx/admin/event-listeners-openlineage.md new file mode 100644 index 000000000000..288e2317e359 --- /dev/null +++ b/docs/src/main/sphinx/admin/event-listeners-openlineage.md @@ -0,0 +1,245 @@ +# OpenLineage event listener + +The OpenLineage event listener plugin allows streaming of lineage information, +encoded in +JSON format aligned with OpenLineage specification, to an external, OpenLineage +copmpatible API, by POSTing them +to a specified URI. + +## Rationale + +This event listener is aiming to capture every query that creates or modifies +trino tables and transform it into lineage +information. Linage can be understood as relationship/flow between data/tables. +OpenLineage is a widely used open-source +standard for capturing lineage information from variety of system including (but +not limited to) Spark, Airflow, Flink. + +:::{list-table} Trino Query attributes mapping to OpenLineage attributes +:widths: 40, 40 +:header-rows: 1 + +* + - Trino + - OpenLineage +* + - `{UUID(Query Id)}` + - Run ID +* + - `{queryCreatedEvent.getCreateTime()} or {queryCompletedEvent.getEndTime()} ` + - Run Event Time +* + - Query Id + - Job Facet Name +* + - `trino:// + {openlineage-event-listener.trino.uri.getHost()} + ":" + {openlineage-event-listener.trino.uri.getPort()}` + - Job Facet Namespace (default, can be overriden) +* + - `{schema}.{table}` + - Dataset Name +* + - `trino:// + {openlineage-event-listener.trino.uri.getHost()} + ":" + {openlineage-event-listener.trino.uri.getPort()}` + - Dataset Namespace + +::: + +(trino-facets)= + +### Available Trino Facets + +#### Trino Metadata + +Facet containing properties (if present): + +- `queryPlan` +- `transactionId` - transaction id used for query processing + +related to query based on which OpenLineage Run Event was generated. + +Available in both `Start` and `Complete/Fail` OpenLineage events. + +If you want to disable this facet, add `trino_metadata` to +`openlineage-event-listener.disabled-facets`. + +#### Trino Query Context + +Facet containing properties: + +- `serverVersion` - version of Trino server that was used to process the query +- `environment` - inherited from `node.environment` of [](node-properties) +- `queryType` - one of query types configured via + `openlineage-event-listener.trino.include-query-types` + +related to query based on which OpenLineage Run Event was generated. + +Available in both `Start` and `Complete/Fail` OpenLineage events. + +If you want to disable this facet, add `trino_query_context` to +`openlineage-event-listener.disabled-facets`. + +#### Trino Query Statistics + +Facet containing full contents of query statistics of completed. Available only +in OpenLineage `Complete/Fail` events. + +If you want to disable this facet, add `trino_query_statistics` to +`openlineage-event-listener.disabled-facets`. + +(openlineage-event-listener-requirements)= + +## Requirements + +You need to perform the following steps: + +- Provide an HTTP/S service that accepts POST events with a JSON body and is + compatible with the OpenLineage API format. +- Configure `openlineage-event-listener.transport.url` in the event listener + properties file with the URI of the service +- Configure `openlineage-event-listener.trino.uri` so proper OpenLineage job + namespace is render within produced events. Needs to be proper uri with scheme, + host and port (otherwise plugin will fail to start). +- Configure what events to send as detailed + in [](openlineage-event-listener-configuration) + +(openlineage-event-listener-configuration)= + +## Configuration + +To configure the OpenLineage event listener, create an event listener properties +file in `etc` named `openlineage-event-listener.properties` with the following +contents as an example of minimal required configuration: + +```properties +event-listener.name=openlineage +openlineage-event-listener.trino.uri=
+``` + +Add `etc/openlineage-event-listener.properties` to `event-listener.config-files` +in [](config-properties): + +```properties +event-listener.config-files=etc/openlineage-event-listener.properties,... +``` + +:::{list-table} OpenLineage event listener configuration properties +:widths: 40, 40, 20 +:header-rows: 1 + +* + - Property name + - Description + - Default +* + - openlineage-event-listener.transport + - Type of transport to use when emitting lineage information. + See [](supported-transport-types) for list of available options with + descriptions. + - `CONSOLE` +* + - openlineage-event-listener.trino.host + - Trino hostname. Used to render Job Namespace in OpenLineage. Required. + - None. +* + - openlineage-event-listener.trino.port + - Trino port. Used to render Job Namespace in OpenLineage. Required. + - None. +* + - openlineage-event-listener.trino.include-query-types + - Which types of queries should be taken into account when emitting lineage + information. List of values split by comma. Each value must be + matching `io.trino.spi.resourcegroups.QueryType` enum. Query types not + included here are filtered out. + - `DELETE,INSERT,MERGE,UPDATE,ALTER_TABLE_EXECUTE` +* + - openlineage-event-listener.disabled-facets + - Which [](trino-facets) should be not included in final OpenLineage event. + Allowed values: `trino_metadata`, `trino_query_context`, + `trino_query_statistics`. + - None. +* + - openlineage-event-listener.namespace + - Custom namespace to be used for Job `namespace` attribute. If blank will + default to Dataset Namespace. + - None. + +::: + +(supported-transport-types)= +### Supported Transport Types + +- `CONSOLE` - sends OpenLineage JSON event to Trino coordinator standard output. +- `HTTP` - sends OpenLineage JSON event to OpenLineage compatible HTTP endpoint. + +:::{list-table} OpenLineage `HTTP` Transport Configuration properties +:widths: 40, 40, 20 +:header-rows: 1 + +* + - Property name + - Description + - Default +* + - openlineage-event-listener.transport.url + - URL of OpenLineage . Required if `HTTP` transport is configured. + - None. +* + - openlineage-event-listener.transport.endpoint + - Custom path for OpenLineage compatible endpoint. If configured, there + cannot be any custom path within + `openlineage-event-listener.transport.url`. + - `/api/v1`. +* + - openlineage-event-listener.transport.api-key + - API key (string value) used to authenticate with the service. + at `openlineage-event-listener.transport.url`. + - None. +* + - openlineage-event-listener.transport.timeout + - [Timeout](prop-type-duration) when making HTTP Requests. + - `5000ms` +* + - openlineage-event-listener.transport.headers + - List of custom HTTP headers to be sent along with the events. See + [](openlineage-event-listener-custom-headers) for more details. + - Empty +* + - openlineage-event-listener.transport.url-params + - List of custom url params to be added to final HTTP Request. See + [](openlineage-event-listener-custom-url-params) for more details. + - Empty + +::: + +(openlineage-event-listener-custom-headers)= + +### Custom HTTP headers + +Providing custom HTTP headers is a useful mechanism for sending metadata along +with event messages. + +Providing headers follows the pattern of `key:value` pairs separated by commas: + +```text +openlineage-event-listener.transport.headers="Header-Name-1:header value 1,Header-Value-2:header value 2,..." +``` + +If you need to use a comma(`,`) or colon(`:`) in a header name or value, +escape it using a backslash (`\`). + +Keep in mind that these are static, so they can not carry information +taken from the event itself. + +(openlineage-event-listener-custom-url-params)= + +### Custom URL Params + +Providing additional URL Params included in final HTTP Request. + +Providing url params follows the pattern of `key:value` pairs separated by commas: + +```text +openlineage-event-listener.transport.url-params="Param-Name-1:param value 1,Param-Value-2:param value 2,..." +``` + +Keep in mind that these are static, so they can not carry information +taken from the event itself. diff --git a/plugin/trino-openlineage/pom.xml b/plugin/trino-openlineage/pom.xml new file mode 100644 index 000000000000..596b16607650 --- /dev/null +++ b/plugin/trino-openlineage/pom.xml @@ -0,0 +1,179 @@ + + + 4.0.0 + + + io.trino + trino-root + 449-SNAPSHOT + ../../pom.xml + + + trino-openlineage + trino-plugin + Trino - OpenLineage Event Listener + + + ${project.parent.basedir} + 1.12.0 + + + + + com.fasterxml.jackson.core + jackson-databind + + + + com.google.guava + guava + + + + com.google.inject + guice + + + + io.airlift + bootstrap + + + + io.airlift + configuration + + + + io.airlift + json + + + + io.airlift + log + + + + io.airlift + units + + + + io.openlineage + openlineage-java + ${openlineage.version} + + + + + + + commons-logging + commons-logging + + + + + + jakarta.validation + jakarta.validation-api + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + io.airlift + slice + provided + + + + io.opentelemetry + opentelemetry-api + provided + + + + io.opentelemetry + opentelemetry-context + provided + + + + io.trino + trino-spi + provided + + + + org.openjdk.jol + jol-core + provided + + + + io.airlift + testing + test + + + + io.trino + trino-main + test + + + + io.trino + trino-memory + test + + + + io.trino + trino-testing + test + + + + io.trino + trino-testing-services + test + + + + io.trino + trino-tpch + test + + + + org.assertj + assertj-core + test + + + + org.junit.jupiter + junit-jupiter-api + test + + + + org.testcontainers + postgresql + test + + + + org.testcontainers + testcontainers + test + + + diff --git a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListener.java b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListener.java new file mode 100644 index 000000000000..3ed6d3d1f1ad --- /dev/null +++ b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListener.java @@ -0,0 +1,356 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Inject; +import io.airlift.json.ObjectMapperProvider; +import io.airlift.log.Logger; +import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineage.DatasetFacetsBuilder; +import io.openlineage.client.OpenLineage.InputDataset; +import io.openlineage.client.OpenLineage.InputDatasetBuilder; +import io.openlineage.client.OpenLineage.JobBuilder; +import io.openlineage.client.OpenLineage.OutputDataset; +import io.openlineage.client.OpenLineage.RunEvent; +import io.openlineage.client.OpenLineage.RunFacet; +import io.openlineage.client.OpenLineage.RunFacetsBuilder; +import io.openlineage.client.OpenLineageClient; +import io.trino.plugin.openlineage.config.OpenLineageListenerConfig; +import io.trino.spi.eventlistener.EventListener; +import io.trino.spi.eventlistener.OutputColumnMetadata; +import io.trino.spi.eventlistener.QueryCompletedEvent; +import io.trino.spi.eventlistener.QueryContext; +import io.trino.spi.eventlistener.QueryCreatedEvent; +import io.trino.spi.eventlistener.QueryFailureInfo; +import io.trino.spi.eventlistener.QueryIOMetadata; +import io.trino.spi.eventlistener.QueryMetadata; +import io.trino.spi.eventlistener.QueryOutputMetadata; +import io.trino.spi.eventlistener.QueryStatistics; +import io.trino.spi.eventlistener.TableInfo; +import io.trino.spi.resourcegroups.QueryType; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class OpenLineageListener + implements EventListener +{ + private static final Logger logger = Logger.get(OpenLineageListener.class); + + private final OpenLineage openLineage = new OpenLineage(URI.create("https://github.com/trinodb/trino/plugin/trino-openlineage")); + private final OpenLineageClient client; + private final String jobNamespace; + private final String datasetNamespace; + private final Set includeQueryTypes; + private final ObjectMapper queryStatisticsMapper; + + @Inject + public OpenLineageListener(OpenLineageClient client, OpenLineageListenerConfig listenerConfig) + { + this.client = requireNonNull(client); + + String defaultNamespace = listenerConfig.getTrinoURI().toString(); + + if (!listenerConfig.getTrinoURI().getScheme().isEmpty()) { + defaultNamespace = defaultNamespace.replace(listenerConfig.getTrinoURI().getScheme(), "trino"); + } + else { + defaultNamespace = String.format("trino://%s", defaultNamespace); + } + + this.jobNamespace = listenerConfig.getNamespace().orElse(defaultNamespace); + this.datasetNamespace = defaultNamespace; + + this.includeQueryTypes = ImmutableSet.copyOf(listenerConfig.getIncludeQueryTypes()); + + this.queryStatisticsMapper = new ObjectMapperProvider().get(); + } + + @Override + public void queryCreated(QueryCreatedEvent queryCreatedEvent) + { + if (queryTypeSupported(queryCreatedEvent.getContext())) { + UUID runID = getQueryId(queryCreatedEvent.getMetadata()); + + RunEvent event = getStartEvent(runID, queryCreatedEvent); + client.emit(event); + return; + } + logger.debug(format("Query type %s not supported. Supported query types %s", + queryCreatedEvent.getContext().getQueryType().toString(), + this.includeQueryTypes)); + } + + @Override + public void queryCompleted(QueryCompletedEvent queryCompletedEvent) + { + if (queryTypeSupported(queryCompletedEvent.getContext())) { + UUID runID = getQueryId(queryCompletedEvent.getMetadata()); + + RunEvent event = getCompletedEvent(runID, queryCompletedEvent); + client.emit(event); + return; + } + logger.debug(format("Query type %s not supported. Supported query types %s", + queryCompletedEvent.getContext().getQueryType().toString(), + this.includeQueryTypes)); + } + + private boolean queryTypeSupported(QueryContext queryContext) + { + return queryContext + .getQueryType() + .map(this.includeQueryTypes::contains) + .orElse(false); + } + + private UUID getQueryId(QueryMetadata queryMetadata) + { + return UUID.nameUUIDFromBytes(queryMetadata.getQueryId().getBytes(StandardCharsets.UTF_8)); + } + + private RunFacet getTrinoQueryContextFacet(QueryContext queryContext) + { + RunFacet queryContextFacet = openLineage.newRunFacet(); + + ImmutableMap.Builder properties = ImmutableMap.builder(); + + properties.put("server_address", queryContext.getServerAddress()); + properties.put("environment", queryContext.getEnvironment()); + + queryContext.getQueryType().ifPresent(queryType -> + properties.put("query_type", queryType.toString())); + + queryContextFacet + .getAdditionalProperties() + .putAll(properties.buildOrThrow()); + + return queryContextFacet; + } + + private RunFacet getTrinoMetadataFacet(QueryMetadata queryMetadata) + { + RunFacet trinoMetadataFacet = openLineage.newRunFacet(); + + ImmutableMap.Builder properties = ImmutableMap.builder(); + + queryMetadata.getPlan().ifPresent( + queryPlan -> properties.put("query_plan", queryPlan)); + + queryMetadata.getTransactionId().ifPresent( + transactionId -> properties.put("transaction_id", transactionId)); + + trinoMetadataFacet + .getAdditionalProperties() + .putAll(properties.buildOrThrow()); + + return trinoMetadataFacet; + } + + private RunFacet getTrinoQueryStatisticsFacet(QueryStatistics queryStatistics) + { + RunFacet trinoQueryStatisticsFacet = openLineage.newRunFacet(); + + ImmutableMap.Builder properties = ImmutableMap.builder(); + + this.queryStatisticsMapper.convertValue(queryStatistics, HashMap.class).forEach( + (key, value) -> { + if (key != null && value != null) { + properties.put(key.toString(), value.toString()); + } + }); + + trinoQueryStatisticsFacet + .getAdditionalProperties() + .putAll(properties.buildOrThrow()); + + return trinoQueryStatisticsFacet; + } + + public RunEvent getStartEvent(UUID runID, QueryCreatedEvent queryCreatedEvent) + { + RunFacetsBuilder runFacetsBuilder = getBaseRunFacetsBuilder(queryCreatedEvent.getContext()); + + runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_METADATA.getText(), + getTrinoMetadataFacet(queryCreatedEvent.getMetadata())); + runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_QUERY_CONTEXT.getText(), + getTrinoQueryContextFacet(queryCreatedEvent.getContext())); + + return openLineage.newRunEventBuilder() + .eventType(RunEvent.EventType.START) + .eventTime(queryCreatedEvent.getCreateTime().atZone(ZoneId.of("UTC"))) + .run(openLineage.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build()) + .job(getBaseJobBuilder(queryCreatedEvent.getMetadata()).build()) + .build(); + } + + public RunEvent getCompletedEvent(UUID runID, QueryCompletedEvent queryCompletedEvent) + { + boolean failed = queryCompletedEvent.getMetadata().getQueryState().equals("FAILED"); + + RunFacetsBuilder runFacetsBuilder = getBaseRunFacetsBuilder(queryCompletedEvent.getContext()); + + runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_METADATA.getText(), + getTrinoMetadataFacet(queryCompletedEvent.getMetadata())); + runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_QUERY_CONTEXT.getText(), + getTrinoQueryContextFacet(queryCompletedEvent.getContext())); + runFacetsBuilder.put(OpenLineageTrinoFacet.TRINO_QUERY_STATISTICS.getText(), + getTrinoQueryStatisticsFacet(queryCompletedEvent.getStatistics())); + + if (failed) { + queryCompletedEvent + .getFailureInfo() + .flatMap(QueryFailureInfo::getFailureMessage) + .ifPresent(failureMessage -> runFacetsBuilder + .errorMessage(openLineage + .newErrorMessageRunFacetBuilder() + .message(failureMessage) + .build())); + } + + return openLineage.newRunEventBuilder() + .eventType( + failed + ? RunEvent.EventType.FAIL + : RunEvent.EventType.COMPLETE) + .eventTime(queryCompletedEvent.getEndTime().atZone(ZoneId.of("UTC"))) + .run(openLineage.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build()) + .job(getBaseJobBuilder(queryCompletedEvent.getMetadata()).build()) + .inputs(buildInputs(queryCompletedEvent.getMetadata())) + .outputs(buildOutputs(queryCompletedEvent.getIoMetadata())) + .build(); + } + + private RunFacetsBuilder getBaseRunFacetsBuilder(QueryContext queryContext) + { + return openLineage.newRunFacetsBuilder() + .processing_engine(openLineage.newProcessingEngineRunFacetBuilder() + .name("trino") + .version(queryContext.getServerVersion()) + .build()); + } + + private JobBuilder getBaseJobBuilder(QueryMetadata queryMetadata) + { + return openLineage.newJobBuilder() + .namespace(this.jobNamespace) + .name(queryMetadata.getQueryId()) + .facets(openLineage.newJobFacetsBuilder() + .jobType(openLineage.newJobTypeJobFacet("BATCH", "TRINO", "QUERY")) + .sql(openLineage.newSQLJobFacet(queryMetadata.getQuery())) + .build()); + } + + private List buildInputs(QueryMetadata queryMetadata) + { + return queryMetadata + .getTables() + .stream() + .filter(TableInfo::isDirectlyReferenced) + .map(table -> { + String datasetName = getDatasetName(table); + InputDatasetBuilder inputDatasetBuilder = openLineage + .newInputDatasetBuilder() + .namespace(this.datasetNamespace) + .name(datasetName); + + DatasetFacetsBuilder datasetFacetsBuilder = openLineage.newDatasetFacetsBuilder() + .schema(openLineage.newSchemaDatasetFacetBuilder() + .fields( + table + .getColumns() + .stream() + .map(field -> openLineage.newSchemaDatasetFacetFieldsBuilder() + .name(field.getColumn()) + .build() + ).toList()) + .build()); + + return inputDatasetBuilder + .facets(datasetFacetsBuilder.build()) + .build(); + }) + .collect(toImmutableList()); + } + + private List buildOutputs(QueryIOMetadata ioMetadata) + { + Optional outputs = ioMetadata.getOutput(); + if (outputs.isPresent()) { + QueryOutputMetadata outputMetadata = outputs.get(); + List outputColumns = outputMetadata.getColumns().orElse(new ArrayList<>()); + + OpenLineage.ColumnLineageDatasetFacetFieldsBuilder columnLineageDatasetFacetFieldsBuilder = openLineage.newColumnLineageDatasetFacetFieldsBuilder(); + + outputColumns.forEach(column -> + columnLineageDatasetFacetFieldsBuilder.put(column.getColumnName(), + openLineage.newColumnLineageDatasetFacetFieldsAdditionalBuilder() + .inputFields(column + .getSourceColumns() + .stream() + .map(inputColumn -> openLineage.newColumnLineageDatasetFacetFieldsAdditionalInputFieldsBuilder() + .field(inputColumn.getColumnName()) + .namespace(this.datasetNamespace) + .name(getDatasetName(inputColumn.getCatalog(), inputColumn.getSchema(), inputColumn.getTable())) + .build()) + .toList() + ).build())); + + return ImmutableList.of( + openLineage.newOutputDatasetBuilder() + .namespace(this.datasetNamespace) + .name(getDatasetName(outputMetadata.getCatalogName(), outputMetadata.getSchema(), outputMetadata.getTable())) + .facets(openLineage.newDatasetFacetsBuilder() + .columnLineage(openLineage.newColumnLineageDatasetFacet(columnLineageDatasetFacetFieldsBuilder.build())) + .schema(openLineage.newSchemaDatasetFacetBuilder() + .fields( + outputColumns.stream() + .map(column -> openLineage.newSchemaDatasetFacetFieldsBuilder() + .name(column.getColumnName()) + .type(column.getColumnType()) + .build()) + .toList() + ).build() + ).build() + ).build()); + } + + return ImmutableList.of(); + } + + private String getDatasetName(TableInfo tableInfo) + { + return getDatasetName(tableInfo.getCatalog(), tableInfo.getSchema(), tableInfo.getTable()); + } + + private String getDatasetName(String catalogName, String schemaName, String tableName) + { + return format("%s.%s.%s", catalogName, schemaName, tableName); + } +} diff --git a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListenerFactory.java b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListenerFactory.java new file mode 100644 index 000000000000..d952bb7e7e8a --- /dev/null +++ b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListenerFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage; + +import com.google.inject.Injector; +import io.airlift.bootstrap.Bootstrap; +import io.trino.spi.eventlistener.EventListener; +import io.trino.spi.eventlistener.EventListenerFactory; + +import java.util.Map; + +public class OpenLineageListenerFactory + implements EventListenerFactory +{ + @Override + public String getName() + { + return "openlineage"; + } + + @Override + public EventListener create(Map config) + { + Bootstrap app = new Bootstrap( + new OpenLineageListenerModule()); + + Injector injector = app + .doNotInitializeLogging() + .setRequiredConfigurationProperties(config) + .initialize(); + + return injector.getInstance(OpenLineageListener.class); + } +} diff --git a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListenerModule.java b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListenerModule.java new file mode 100644 index 000000000000..743397948cf1 --- /dev/null +++ b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListenerModule.java @@ -0,0 +1,77 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage; + +import com.google.inject.Binder; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.Singleton; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.openlineage.client.OpenLineageClient; +import io.trino.plugin.openlineage.config.OpenLineageListenerConfig; +import io.trino.plugin.openlineage.config.http.OpenLineageClientHttpTransportConfig; +import io.trino.plugin.openlineage.transport.OpenLineageConsoleTransport; +import io.trino.plugin.openlineage.transport.OpenLineageTransport; +import io.trino.plugin.openlineage.transport.http.OpenLineageHttpTransport; + +import static io.airlift.configuration.ConditionalModule.conditionalModule; +import static io.airlift.configuration.ConfigBinder.configBinder; +import static io.trino.plugin.openlineage.OpenLineageTransport.CONSOLE; +import static io.trino.plugin.openlineage.OpenLineageTransport.HTTP; + +public class OpenLineageListenerModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + binder.bind(OpenLineageListener.class).in(Scopes.SINGLETON); + + configBinder(binder).bindConfig(OpenLineageListenerConfig.class); + + install(conditionalModule( + OpenLineageListenerConfig.class, + config -> config.getTransport().equals(CONSOLE), + internalBinder -> { + internalBinder.bind(OpenLineageTransport.class).to(OpenLineageConsoleTransport.class); + })); + + install(conditionalModule( + OpenLineageListenerConfig.class, + config -> config.getTransport().equals(HTTP), + internalBinder -> { + configBinder(internalBinder).bindConfig(OpenLineageClientHttpTransportConfig.class); + internalBinder.bind(OpenLineageTransport.class).to(OpenLineageHttpTransport.class); + })); + } + + @Provides + @Singleton + private OpenLineageClient getClient(OpenLineageListenerConfig listenerConfig, OpenLineageTransport openLineageTransport) + throws Exception + { + OpenLineageClient.Builder clientBuilder = OpenLineageClient.builder(); + clientBuilder.transport(openLineageTransport.buildTransport()); + + String[] disabledFacets = listenerConfig + .getDisabledFacets() + .stream() + .map(OpenLineageTrinoFacet::getText) + .toArray(String[]::new); + + clientBuilder.disableFacets(disabledFacets); + + return clientBuilder.build(); + } +} diff --git a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineagePlugin.java b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineagePlugin.java new file mode 100644 index 000000000000..5765f66c149e --- /dev/null +++ b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineagePlugin.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage; + +import com.google.common.collect.ImmutableList; +import io.trino.spi.Plugin; +import io.trino.spi.eventlistener.EventListenerFactory; + +public class OpenLineagePlugin + implements Plugin +{ + @Override + public Iterable getEventListenerFactories() + { + return ImmutableList.of(new OpenLineageListenerFactory()); + } +} diff --git a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageTransport.java b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageTransport.java new file mode 100644 index 000000000000..07de51568bc0 --- /dev/null +++ b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageTransport.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage; + +public enum OpenLineageTransport +{ + CONSOLE, + HTTP, + /**/ +} diff --git a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageTrinoFacet.java b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageTrinoFacet.java new file mode 100644 index 000000000000..984a6f60bc01 --- /dev/null +++ b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageTrinoFacet.java @@ -0,0 +1,45 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage; + +public enum OpenLineageTrinoFacet +{ + TRINO_METADATA("trino_metadata"), + TRINO_QUERY_STATISTICS("trino_query_statistics"), + TRINO_QUERY_CONTEXT("trino_query_context"); + + final String text; + + OpenLineageTrinoFacet(String text) + { + this.text = text; + } + + public String getText() + { + return this.text; + } + + public static OpenLineageTrinoFacet fromText(String text) + throws IllegalArgumentException + { + for (OpenLineageTrinoFacet facet : OpenLineageTrinoFacet.values()) { + if (facet.text.equals(text)) { + return facet; + } + } + + throw new IllegalArgumentException(text); + } +} diff --git a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/config/OpenLineageListenerConfig.java b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/config/OpenLineageListenerConfig.java new file mode 100644 index 000000000000..2e34e4ddcf9e --- /dev/null +++ b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/config/OpenLineageListenerConfig.java @@ -0,0 +1,128 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage.config; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.trino.plugin.openlineage.OpenLineageTransport; +import io.trino.plugin.openlineage.OpenLineageTrinoFacet; +import io.trino.spi.resourcegroups.QueryType; +import jakarta.validation.constraints.NotNull; + +import java.net.URI; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +public class OpenLineageListenerConfig +{ + private OpenLineageTransport transport = OpenLineageTransport.CONSOLE; + private URI trinoURI; + private List disabledFacets = ImmutableList.of(); + private Optional namespace = Optional.empty(); + + private Set includeQueryTypes = ImmutableSet.builder() + .add(QueryType.ALTER_TABLE_EXECUTE) + .add(QueryType.DELETE) + .add(QueryType.INSERT) + .add(QueryType.MERGE) + .add(QueryType.UPDATE) + .add(QueryType.DATA_DEFINITION) + .build(); + + @Config("openlineage-event-listener.transport.type") + @ConfigDescription("Type of transport used to emit lineage information.") + public OpenLineageListenerConfig setTransport(OpenLineageTransport transport) + { + this.transport = transport; + return this; + } + + public OpenLineageTransport getTransport() + { + return transport; + } + + @Config("openlineage-event-listener.trino.uri") + @ConfigDescription("URI of trino server. Used for namespace rendering.") + public OpenLineageListenerConfig setTrinoURI(URI trinoURI) + { + this.trinoURI = trinoURI; + return this; + } + + @NotNull + public URI getTrinoURI() + { + return trinoURI; + } + + @Config("openlineage-event-listener.trino.include-query-types") + @ConfigDescription("Which query types emitted by Trino should generate OpenLineage events. Other query types will be filtered out.") + public OpenLineageListenerConfig setIncludeQueryTypes(List includeQueryTypes) + { + this.includeQueryTypes = new HashSet<>(includeQueryTypes.stream() + .map(String::trim) + .map(QueryType::valueOf) + .toList()); + + return this; + } + + public Set getIncludeQueryTypes() + { + return includeQueryTypes; + } + + @Config("openlineage-event-listener.disabled-facets") + @ConfigDescription("Which facets should be removed from OpenLineage events.") + public OpenLineageListenerConfig setDisabledFacets(List disabledFacets) + throws RuntimeException + { + this.disabledFacets = disabledFacets.stream() + .map(String::trim) + .map(text -> { + try { + return OpenLineageTrinoFacet.fromText(text); + } + catch (IllegalArgumentException e) { + throw new RuntimeException(e); + } + }) + .toList(); + + return this; + } + + public List getDisabledFacets() + { + return disabledFacets; + } + + @Config("openlineage-event-listener.namespace") + @ConfigDescription("Override default namespace for job facet.") + public OpenLineageListenerConfig setNamespace(String namespace) + { + this.namespace = Optional.ofNullable(namespace); + return this; + } + + public Optional getNamespace() + { + return namespace; + } +} diff --git a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/config/http/OpenLineageClientHttpTransportConfig.java b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/config/http/OpenLineageClientHttpTransportConfig.java new file mode 100644 index 000000000000..d7d1e94fde51 --- /dev/null +++ b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/config/http/OpenLineageClientHttpTransportConfig.java @@ -0,0 +1,139 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage.config.http; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.configuration.ConfigSecuritySensitive; +import io.airlift.units.Duration; +import io.airlift.units.MaxDuration; +import io.airlift.units.MinDuration; +import jakarta.validation.constraints.NotNull; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static java.lang.String.format; + +public class OpenLineageClientHttpTransportConfig +{ + private String url; + private String endpoint; + private Optional apiKey = Optional.empty(); + private Duration timeout = new Duration(5000, TimeUnit.MILLISECONDS); + private Map headers = new HashMap<>(); + private Map urlParams = new HashMap<>(); + + @NotNull + public String getUrl() + { + return url; + } + + @Config("openlineage-event-listener.transport.url") + @ConfigDescription("URL of receiving server. Explicitly set the scheme https:// to use symmetric encryption") + public OpenLineageClientHttpTransportConfig setUrl(String url) + { + this.url = url; + return this; + } + + public String getEndpoint() + { + return endpoint; + } + + @Config("openlineage-event-listener.transport.endpoint") + @ConfigDescription("Custom path for API receiving the events.") + public OpenLineageClientHttpTransportConfig setEndpoint(String endpoint) + { + this.endpoint = endpoint; + return this; + } + + public Optional getApiKey() + { + return apiKey; + } + + @Config("openlineage-event-listener.transport.api-key") + @ConfigDescription("API Key to use when authenticating against OpenLineage API") + @ConfigSecuritySensitive + public OpenLineageClientHttpTransportConfig setApiKey(String apiKey) + { + this.apiKey = Optional.ofNullable(apiKey); + return this; + } + + @MinDuration("1ms") + @MaxDuration("1h") + public Duration getTimeout() + { + return timeout; + } + + @Config("openlineage-event-listener.transport.timeout") + @ConfigDescription("Timeout when making HTTP Requests.") + public OpenLineageClientHttpTransportConfig setTimeout(Duration timeout) + { + this.timeout = timeout; + return this; + } + + public Map getHeaders() + { + return headers; + } + + @Config("openlineage-event-listener.transport.headers") + @ConfigDescription("List of custom custom HTTP headers provided as: \"Header-Name-1: header value 1, Header-Value-2: header value 2, ...\" ") + public OpenLineageClientHttpTransportConfig setHeaders(List headers) + { + try { + this.headers = headers + .stream() + .collect(Collectors.toMap(keyValue -> keyValue.split(":", 2)[0], keyValue -> keyValue.split(":", 2)[1])); + } + catch (IndexOutOfBoundsException e) { + throw new IllegalArgumentException(format("Cannot parse http headers from property openlineage-event-listener.transport.headers; value provided was %s, " + + "expected format is \"Header-Name-1: header value 1, Header-Value-2: header value 2, ...\"", String.join(", ", headers)), e); + } + return this; + } + + public Map getUrlParams() + { + return urlParams; + } + + @Config("openlineage-event-listener.transport.url-params") + @ConfigDescription("List of custom custom url params provided as: \"url-param-1: url param value 1, ...\" ") + public OpenLineageClientHttpTransportConfig setUrlParams(List urlParas) + { + try { + this.urlParams = urlParas + .stream() + .collect(Collectors.toMap(kvs -> kvs.split(":", 2)[0], kvs -> kvs.split(":", 2)[1])); + } + catch (IndexOutOfBoundsException e) { + throw new IllegalArgumentException(format("Cannot parse url params from property openlineage-event-listener.transport.url-params; value provided was %s, " + + "expected format is \"url-param-1: url param value 1, ...\"", String.join(", ", urlParas)), e); + } + return this; + } +} diff --git a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/transport/OpenLineageConsoleTransport.java b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/transport/OpenLineageConsoleTransport.java new file mode 100644 index 000000000000..ec4d0ee9b8f0 --- /dev/null +++ b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/transport/OpenLineageConsoleTransport.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage.transport; + +import io.openlineage.client.transports.ConsoleTransport; + +public class OpenLineageConsoleTransport + implements OpenLineageTransport +{ + @Override + public ConsoleTransport buildTransport() + { + return new ConsoleTransport(); + } +} diff --git a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/transport/OpenLineageTransport.java b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/transport/OpenLineageTransport.java new file mode 100644 index 000000000000..0e7c936c56a3 --- /dev/null +++ b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/transport/OpenLineageTransport.java @@ -0,0 +1,22 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage.transport; + +import io.openlineage.client.transports.Transport; + +public interface OpenLineageTransport +{ + Transport buildTransport() + throws Exception; +} diff --git a/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/transport/http/OpenLineageHttpTransport.java b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/transport/http/OpenLineageHttpTransport.java new file mode 100644 index 000000000000..e32b3afe931a --- /dev/null +++ b/plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/transport/http/OpenLineageHttpTransport.java @@ -0,0 +1,81 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage.transport.http; + +import com.google.inject.Inject; +import io.openlineage.client.transports.HttpConfig; +import io.openlineage.client.transports.HttpTransport; +import io.openlineage.client.transports.TokenProvider; +import io.trino.plugin.openlineage.config.http.OpenLineageClientHttpTransportConfig; +import io.trino.plugin.openlineage.transport.OpenLineageTransport; + +import java.net.URI; +import java.util.Map; + +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class OpenLineageHttpTransport + implements OpenLineageTransport +{ + private final String url; + private final String endpoint; + private final int timeout; + private final ApiKeyTokenProvider apiKey; + private final Map urlParams; + private final Map headers; + + private static class ApiKeyTokenProvider + implements TokenProvider + { + private final String token; + + public ApiKeyTokenProvider(String token) + { + this.token = requireNonNull(token); + } + + @Override + public String getToken() + { + return format("Bearer %s", this.token); + } + } + + @Inject + public OpenLineageHttpTransport(OpenLineageClientHttpTransportConfig config) + { + this.url = config.getUrl(); + this.endpoint = config.getEndpoint(); + this.timeout = (int) config.getTimeout().toMillis(); + this.apiKey = config.getApiKey().map(ApiKeyTokenProvider::new).orElse(null); + this.urlParams = config.getUrlParams(); + this.headers = config.getHeaders(); + } + + @Override + public HttpTransport buildTransport() + throws Exception + { + return new HttpTransport( + new HttpConfig( + new URI(this.url), + this.endpoint, + null, + this.timeout, + this.apiKey, + this.urlParams, + this.headers)); + } +} diff --git a/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/MarquezServer.java b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/MarquezServer.java new file mode 100644 index 000000000000..757a741112fb --- /dev/null +++ b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/MarquezServer.java @@ -0,0 +1,161 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage; + +import com.google.common.io.Closer; +import io.trino.testing.ResourcePresence; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.MountableFile; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.Optional; + +import static java.lang.String.format; + +public class MarquezServer + implements Closeable +{ + private static final String MARQUEZ_CONFIG; + private static final String MARQUEZ_HOST; + private static final int MARQUEZ_PORT; + private static final int MARQUEZ_UI_PORT; + private static final int MARQUEZ_ADMIN_PORT; + private static final String MARQUEZ_HEALTCHECK_API; + private static final String POSTGRES_HOST; + private static final String POSTGRES_DB; + private static final int POSTGRES_PORT; + private static final String POSTGRES_USER; + private static final String POSTGRES_PASSWORD; + private static final String DEFAULT_VERSION; + + private Network network; + private final Closer closer = Closer.create(); + + static { + MARQUEZ_CONFIG = "/opt/marquez/marquez.test.yaml"; + MARQUEZ_HOST = "marquez"; + MARQUEZ_PORT = 5000; + MARQUEZ_UI_PORT = 3000; + MARQUEZ_ADMIN_PORT = 5001; + MARQUEZ_HEALTCHECK_API = "/ping"; + POSTGRES_HOST = "postgres"; + POSTGRES_DB = "marquez"; + POSTGRES_PORT = 5432; + POSTGRES_USER = "marquez"; + POSTGRES_PASSWORD = "marquez"; + DEFAULT_VERSION = "0.46.0"; + } + + private final GenericContainer dockerContainerAPI; + private final PostgreSQLContainer dockerContainerPostgres; + private final Optional> dockerWebUIContainerAPI; + + public MarquezServer() + { + this(DEFAULT_VERSION); + } + + public MarquezServer(String version) + { + network = Network.newNetwork(); + closer.register(this.network::close); + + this.dockerContainerPostgres = new PostgreSQLContainer<>("postgres:14") + .withNetwork(network) + .withNetworkAliases(POSTGRES_HOST) + .withDatabaseName(POSTGRES_DB) + .withUsername(POSTGRES_USER) + .withPassword(POSTGRES_PASSWORD) + .withStartupTimeout(Duration.ofSeconds(360)); + + this.dockerContainerPostgres.start(); + closer.register(this.dockerContainerPostgres::close); + + this.dockerContainerAPI = new GenericContainer<>("marquezproject/marquez:" + version) + .withNetwork(network) + .withNetworkAliases(MARQUEZ_HOST) + .withExposedPorts(MARQUEZ_PORT, MARQUEZ_ADMIN_PORT) + .dependsOn(this.dockerContainerPostgres) + .withEnv("MARQUEZ_PORT", String.valueOf(MARQUEZ_PORT)) + .withEnv("MARQUEZ_ADMIN_PORT", String.valueOf(MARQUEZ_ADMIN_PORT)) + .withEnv("POSTGRES_URL", getPostgresUri()) + .withEnv("POSTGRES_USER", "marquez") + .withEnv("POSTGRES_PASSWORD", "marquez") + .withEnv("MARQUEZ_CONFIG", MARQUEZ_CONFIG) + .withCopyFileToContainer( + MountableFile.forClasspathResource("marquez.yaml"), + MARQUEZ_CONFIG) + .withCommand("./entrypoint.sh") + .waitingFor(Wait.forHttp(MARQUEZ_HEALTCHECK_API) + .forPort(MARQUEZ_ADMIN_PORT) + .forStatusCode(200)) + .withStartupTimeout(Duration.ofSeconds(360)); + + this.dockerContainerAPI.start(); + closer.register(this.dockerContainerAPI::close); + + this.dockerWebUIContainerAPI = Optional.of( + new GenericContainer<>("marquezproject/marquez-web:" + version) + .withNetwork(network) + .withExposedPorts(MARQUEZ_UI_PORT) + .dependsOn(this.dockerContainerAPI) + .withEnv("MARQUEZ_HOST", MARQUEZ_HOST) + .withEnv("MARQUEZ_PORT", String.valueOf(MARQUEZ_PORT)) + .withStartupTimeout(Duration.ofSeconds(360))); + + this.dockerWebUIContainerAPI.ifPresent(container -> + { + container.start(); + closer.register(container::close); + }); + } + + private String getPostgresUri() + { + return format("jdbc:postgresql://%s:%s/%s", POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB); + } + + public URI getMarquezUri() + { + return URI.create(format("http://%s:%s", dockerContainerAPI.getHost(), dockerContainerAPI.getMappedPort(MARQUEZ_PORT))); + } + + public Optional getMarquezWebUIUri() + { + if (this.dockerWebUIContainerAPI.isPresent()) { + return Optional.of( + URI.create(format("http://%s:%s", dockerWebUIContainerAPI.get().getHost(), dockerWebUIContainerAPI.get().getMappedPort(MARQUEZ_UI_PORT)))); + } + return Optional.empty(); + } + + @Override + public void close() + throws IOException + { + closer.close(); + } + + @ResourcePresence + public boolean isRunning() + { + return dockerContainerAPI.getContainerId() != null; + } +} diff --git a/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/OpenLineageListenerQueryRunner.java b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/OpenLineageListenerQueryRunner.java new file mode 100644 index 000000000000..ba094b857a95 --- /dev/null +++ b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/OpenLineageListenerQueryRunner.java @@ -0,0 +1,87 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage; + +import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logger; +import io.trino.Session; +import io.trino.plugin.memory.MemoryPlugin; +import io.trino.plugin.tpch.TpchPlugin; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; + +import java.util.Map; + +import static io.airlift.testing.Closeables.closeAllSuppress; +import static io.trino.testing.TestingSession.testSessionBuilder; + +public final class OpenLineageListenerQueryRunner +{ + public static final String CATALOG = "marquez"; + public static final String SCHEMA = "default"; + + private OpenLineageListenerQueryRunner() {} + + public static QueryRunner createOpenLineageRunner(Map listenerProperties) + throws Exception + { + QueryRunner queryRunner = null; + try { + queryRunner = DistributedQueryRunner + .builder(createSession()) + .setEventListener(new OpenLineageListenerFactory().create(listenerProperties)) + .build(); + // catalog used for output data + queryRunner.installPlugin(new MemoryPlugin()); + queryRunner.createCatalog(CATALOG, "memory"); + + // catalog used for input data + queryRunner.installPlugin(new TpchPlugin()); + queryRunner.createCatalog("tpch", "tpch"); + + return queryRunner; + } + catch (Throwable e) { + closeAllSuppress(e, queryRunner); + throw e; + } + } + + private static Session createSession() + { + return testSessionBuilder() + .setCatalog(CATALOG) + .setSchema(SCHEMA) + .build(); + } + + public static void main(String[] args) + throws Exception + { + MarquezServer server = new MarquezServer(); + + Map config = ImmutableMap.of( + "openlineage-event-listener.transport.type", "HTTP", + "openlineage-event-listener.transport.url", server.getMarquezUri().toString(), + "openlineage-event-listener.trino.uri", "http://trino-query-runner:1337"); + + QueryRunner queryRunner = createOpenLineageRunner(config); + Logger log = Logger.get(OpenLineageListenerQueryRunner.class); + log.info("======== SERVER RUNNING: %s ========", queryRunner.getCoordinator().getBaseUrl()); + + if (server.getMarquezWebUIUri().isPresent()) { + log.info("======== MARQUEZ UI RUNNING: %s ========", server.getMarquezWebUIUri().get()); + } + } +} diff --git a/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageClientHttpTransportConfig.java b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageClientHttpTransportConfig.java new file mode 100644 index 000000000000..10fe101ffa66 --- /dev/null +++ b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageClientHttpTransportConfig.java @@ -0,0 +1,65 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.airlift.units.Duration; +import io.trino.plugin.openlineage.config.http.OpenLineageClientHttpTransportConfig; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; + +public class TestOpenLineageClientHttpTransportConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(OpenLineageClientHttpTransportConfig.class) + .setUrl(null) + .setEndpoint(null) + .setTimeout(Duration.valueOf("5s")) + .setApiKey(null) + .setHeaders(ImmutableList.of()) + .setUrlParams(ImmutableList.of())); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("openlineage-event-listener.transport.url", "http://testurl") + .put("openlineage-event-listener.transport.endpoint", "/test/endpoint") + .put("openlineage-event-listener.transport.api-key", "dummy") + .put("openlineage-event-listener.transport.timeout", "30s") + .put("openlineage-event-listener.transport.headers", "header1:value1,header2:value2") + .put("openlineage-event-listener.transport.url-params", "urlParam1:urlVal1,urlParam2:urlVal2") + + .buildOrThrow(); + + OpenLineageClientHttpTransportConfig expected = new OpenLineageClientHttpTransportConfig() + .setUrl("http://testurl") + .setEndpoint("/test/endpoint") + .setApiKey("dummy") + .setTimeout(Duration.valueOf("30s")) + .setHeaders(ImmutableList.of("header1:value1", "header2:value2")) + .setUrlParams(ImmutableList.of("urlParam1:urlVal1", "urlParam2:urlVal2")); + + assertFullMapping(properties, expected); + } +} diff --git a/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageEventListenerMarquezIntegration.java b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageEventListenerMarquezIntegration.java new file mode 100644 index 000000000000..7ec8294de579 --- /dev/null +++ b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageEventListenerMarquezIntegration.java @@ -0,0 +1,135 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage; + +import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logger; +import io.airlift.units.Duration; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import org.intellij.lang.annotations.Language; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.util.Locale; + +import static io.trino.plugin.openlineage.OpenLineageListenerQueryRunner.createOpenLineageRunner; +import static io.trino.testing.assertions.Assert.assertEventually; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestOpenLineageEventListenerMarquezIntegration + extends AbstractTestQueryFramework +{ + private static final Logger logger = Logger.get(TestOpenLineageEventListenerMarquezIntegration.class); + + private static MarquezServer server; + private static String marquezURI; + private static final String trinoURI = "http://trino-integration-test:1337"; + private static final HttpClient client = HttpClient.newHttpClient(); + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + server = closeAfterClass(new MarquezServer()); + marquezURI = server.getMarquezUri().toString(); + + return createOpenLineageRunner(ImmutableMap.builder() + .put("openlineage-event-listener.transport.type", "HTTP") + .put("openlineage-event-listener.transport.url", server.getMarquezUri().toString()) + .put("openlineage-event-listener.trino.uri", trinoURI) + .buildOrThrow()); + } + + @Test + void testCreateTableAsSelectFromTable() + throws Exception + { + String outputTable = "test_create_table_as_select_from_table"; + + @Language("SQL") String createTableQuery = format( + "CREATE TABLE %s AS SELECT * FROM tpch.tiny.nation", + outputTable); + + String queryId = this.getQueryRunner() + .executeWithPlan(this.getSession(), createTableQuery) + .queryId() + .toString(); + + assertEventually(Duration.valueOf("10s"), () -> { + URI trino = new URI(trinoURI); + + String expectedNamespace = URLEncoder.encode(format("trino://%s:%s", trino.getHost(), trino.getPort()), StandardCharsets.UTF_8); + String expectedQueryId = URLEncoder.encode(queryId, StandardCharsets.UTF_8); + + checkJobRegistration(client, expectedNamespace, expectedQueryId); + }); + } + + @Test + void testCreateTableAsSelectFromView() + throws Exception + { + String viewName = "test_view"; + String outputTable = "test_create_table_as_select_from_view"; + + @Language("SQL") String createViewQuery = format( + "CREATE VIEW %s AS SELECT * FROM tpch.tiny.nation", + viewName); + + assertQuerySucceeds(createViewQuery); + + @Language("SQL") String createTableQuery = format( + "CREATE TABLE %s AS SELECT * FROM %s", + outputTable, viewName); + + String queryId = this.getQueryRunner() + .executeWithPlan(this.getSession(), createTableQuery) + .queryId() + .toString(); + + assertEventually(Duration.valueOf("10s"), () -> { + URI trino = new URI(trinoURI); + + String expectedNamespace = URLEncoder.encode(format("trino://%s:%s", trino.getHost(), trino.getPort()), StandardCharsets.UTF_8); + String expectedQueryId = URLEncoder.encode(queryId, StandardCharsets.UTF_8); + + checkJobRegistration(client, expectedNamespace, expectedQueryId); + }); + } + + private void checkJobRegistration(HttpClient client, String expectedNamespace, String expectedQueryId) + throws URISyntaxException, IOException, InterruptedException + { + HttpRequest requestJob = HttpRequest.newBuilder() + .uri(new URI(marquezURI + "/api/v1/namespaces/" + expectedNamespace + "/jobs/" + expectedQueryId)) + .GET() + .build(); + + HttpResponse responseJob = client.send(requestJob, HttpResponse.BodyHandlers.ofString()); + + logger.info(responseJob.body()); + + assertThat(responseJob.statusCode()).isEqualTo(200); + assertThat(responseJob.body().toLowerCase(Locale.ROOT)).contains("complete"); + } +} diff --git a/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageListener.java b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageListener.java new file mode 100644 index 000000000000..6d2d8f9e1e69 --- /dev/null +++ b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageListener.java @@ -0,0 +1,104 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage; +import com.google.common.collect.ImmutableMap; +import io.openlineage.client.OpenLineage.Job; +import io.openlineage.client.OpenLineage.Run; +import io.openlineage.client.OpenLineage.RunEvent; +import io.trino.spi.eventlistener.EventListener; +import io.trino.spi.eventlistener.EventListenerFactory; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import java.nio.charset.StandardCharsets; +import java.time.ZonedDateTime; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_METHOD; + +@SuppressWarnings("FieldNamingConvention") +@TestInstance(PER_METHOD) +class TestOpenLineageListener +{ + private final EventListenerFactory factory = new OpenLineageListenerFactory(); + + @Test + void testGetCompleteEvent() + throws IllegalAccessException + { + OpenLineageListener listener = (OpenLineageListener) createEventListener(Map.of( + "openlineage-event-listener.transport.type", "CONSOLE", + "openlineage-event-listener.trino.uri", "http://testhost")); + + UUID runID = UUID.nameUUIDFromBytes("testGetCompleteEvent".getBytes(StandardCharsets.UTF_8)); + RunEvent result = listener.getCompletedEvent(runID, TrinoEventData.queryCompleteEvent); + + assertThat(result) + .extracting(RunEvent::getEventType) + .isEqualTo(RunEvent.EventType.COMPLETE); + + assertThat(result) + .extracting(RunEvent::getEventTime) + .extracting(ZonedDateTime::toInstant) + .isEqualTo(TrinoEventData.queryCompleteEvent.getEndTime()); + + assertThat(result) + .extracting(RunEvent::getRun) + .extracting(Run::getRunId) + .isEqualTo(runID); + + assertThat(result) + .extracting(RunEvent::getJob) + .extracting(Job::getNamespace) + .isEqualTo("trino://testhost"); + } + + @Test + void testGetStartEvent() + throws IllegalAccessException + { + OpenLineageListener listener = (OpenLineageListener) createEventListener(Map.of( + "openlineage-event-listener.transport.type", OpenLineageTransport.CONSOLE.toString(), + "openlineage-event-listener.trino.uri", "http://testhost:8080")); + + UUID runID = UUID.nameUUIDFromBytes("testGetStartEvent".getBytes(StandardCharsets.UTF_8)); + RunEvent result = listener.getStartEvent(runID, TrinoEventData.queryCreatedEvent); + + assertThat(result) + .extracting(RunEvent::getEventType) + .isEqualTo(RunEvent.EventType.START); + + assertThat(result) + .extracting(RunEvent::getEventTime) + .extracting(ZonedDateTime::toInstant) + .isEqualTo(TrinoEventData.queryCreatedEvent.getCreateTime()); + + assertThat(result) + .extracting(RunEvent::getRun) + .extracting(Run::getRunId) + .isEqualTo(runID); + + assertThat(result) + .extracting(RunEvent::getJob) + .extracting(Job::getNamespace) + .isEqualTo("trino://testhost:8080"); + } + + private EventListener createEventListener(Map config) + { + return factory.create(ImmutableMap.copyOf(config)); + } +} diff --git a/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageListenerConfig.java b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageListenerConfig.java new file mode 100644 index 000000000000..526fc57bb904 --- /dev/null +++ b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageListenerConfig.java @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.openlineage.config.OpenLineageListenerConfig; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.util.Arrays; +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; + +public class TestOpenLineageListenerConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(OpenLineageListenerConfig.class) + .setTransport(OpenLineageTransport.CONSOLE) + .setTrinoURI(null) + .setNamespace(null) + .setDisabledFacets(ImmutableList.of()) + .setIncludeQueryTypes(Arrays.stream("ALTER_TABLE_EXECUTE,DELETE,INSERT,MERGE,UPDATE,DATA_DEFINITION".split(",")).toList())); + } + + @Test + public void testExplicitPropertyMappings() + throws Exception + { + Map properties = ImmutableMap.builder() + .put("openlineage-event-listener.transport.type", "HTTP") + .put("openlineage-event-listener.trino.uri", "http://testtrino") + .put("openlineage-event-listener.trino.include-query-types", "SELECT,DELETE") + .put("openlineage-event-listener.disabled-facets", "trino_metadata,trino_query_statistics") + .put("openlineage-event-listener.namespace", "testnamespace") + .buildOrThrow(); + + OpenLineageListenerConfig expected = new OpenLineageListenerConfig() + .setTransport(OpenLineageTransport.HTTP) + .setTrinoURI(new URI("http://testtrino")) + .setIncludeQueryTypes(ImmutableList.of("SELECT", "DELETE")) + .setDisabledFacets(ImmutableList.of("trino_metadata", "trino_query_statistics")) + .setNamespace("testnamespace"); + + assertFullMapping(properties, expected); + } +} diff --git a/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TrinoEventData.java b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TrinoEventData.java new file mode 100644 index 000000000000..f4d847a1c918 --- /dev/null +++ b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TrinoEventData.java @@ -0,0 +1,156 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.openlineage; + +import io.trino.operator.RetryPolicy; +import io.trino.spi.eventlistener.QueryCompletedEvent; +import io.trino.spi.eventlistener.QueryContext; +import io.trino.spi.eventlistener.QueryCreatedEvent; +import io.trino.spi.eventlistener.QueryIOMetadata; +import io.trino.spi.eventlistener.QueryMetadata; +import io.trino.spi.eventlistener.QueryStatistics; +import io.trino.spi.eventlistener.StageOutputBufferUtilization; +import io.trino.spi.resourcegroups.QueryType; +import io.trino.spi.resourcegroups.ResourceGroupId; +import io.trino.spi.session.ResourceEstimates; + +import java.net.URI; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static io.trino.spi.type.TimeZoneKey.UTC_KEY; +import static java.time.Duration.ofSeconds; + +public class TrinoEventData +{ + public static final QueryIOMetadata queryIOMetadata; + public static final QueryContext queryContext; + public static final QueryMetadata queryMetadata; + public static final QueryStatistics queryStatistics; + public static final QueryCompletedEvent queryCompleteEvent; + public static final QueryCreatedEvent queryCreatedEvent; + + private TrinoEventData() + { + throw new UnsupportedOperationException("This is a utility class and cannot be instantiated"); + } + + static + { + queryIOMetadata = new QueryIOMetadata(Collections.emptyList(), Optional.empty()); + + queryContext = new QueryContext( + "user", + "originalUser", + Optional.of("principal"), + Set.of(), // enabledRoles + Set.of(), // groups + Optional.empty(), // traceToken + Optional.empty(), // remoteClientAddress + Optional.empty(), // userAgent + Optional.empty(), // clientInfo + new HashSet<>(), // clientTags + new HashSet<>(), // clientCapabilities + Optional.of("source"), + UTC_KEY.getId(), + Optional.of("catalog"), + Optional.of("schema"), + Optional.of(new ResourceGroupId("name")), + new HashMap<>(), // sessionProperties + new ResourceEstimates(Optional.empty(), Optional.empty(), Optional.of(1000L)), + "serverAddress", "serverVersion", "environment", + Optional.of(QueryType.INSERT), + RetryPolicy.QUERY.toString()); + + queryMetadata = new QueryMetadata( + "queryId", + Optional.empty(), + "create table b.c as select * from y.z", + Optional.of("updateType"), + Optional.of("preparedQuery"), + "COMPLETED", + List.of(), + List.of(), + URI.create("http://localhost"), + Optional.empty(), + Optional.empty(), + Optional.empty()); + + queryStatistics = new QueryStatistics( + ofSeconds(1), + ofSeconds(1), + ofSeconds(1), + ofSeconds(1), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0.0f, + Collections.emptyList(), + 0, + true, + Collections.emptyList(), + List.of(new StageOutputBufferUtilization(0, 10, 0.1, 0.5, 0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99, 0.0, 1.0, ofSeconds(1234))), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Optional.empty()); + + queryCompleteEvent = new QueryCompletedEvent( + queryMetadata, + queryStatistics, + queryContext, + queryIOMetadata, + Optional.empty(), + Collections.emptyList(), + Instant.now(), + Instant.now(), + Instant.now()); + + queryCreatedEvent = new QueryCreatedEvent( + Instant.now(), + queryContext, + queryMetadata); + } +} diff --git a/plugin/trino-openlineage/src/test/resources/marquez.yaml b/plugin/trino-openlineage/src/test/resources/marquez.yaml new file mode 100644 index 000000000000..4af409919339 --- /dev/null +++ b/plugin/trino-openlineage/src/test/resources/marquez.yaml @@ -0,0 +1,30 @@ +server: + applicationConnectors: + - type: http + port: ${MARQUEZ_PORT:-5000} + httpCompliance: RFC7230_LEGACY + adminConnectors: + - type: http + port: ${MARQUEZ_ADMIN_PORT:-5001} + +db: + driverClass: org.postgresql.Driver + url: ${POSTGRES_URL:-jdbc:postgresql://postgres:5432/marquez} + user: ${POSTGRESQL_USER:-marquez} + password: ${POSTGRESQL_PASSWORD:-marquez} + +migrateOnStartup: true + +graphql: + enabled: true + +logging: + level: INFO + appenders: + - type: console + +tags: + - name: PII + description: Personally identifiable information + - name: SENSITIVE + description: Contains sensitive information diff --git a/pom.xml b/pom.xml index 1923e8c06e8b..2a5ab2ece29f 100644 --- a/pom.xml +++ b/pom.xml @@ -91,6 +91,7 @@ plugin/trino-mysql plugin/trino-mysql-event-listener plugin/trino-opa + plugin/trino-openlineage plugin/trino-opensearch plugin/trino-oracle plugin/trino-password-authenticators