Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor fixes in OpenLineage #22223

Merged
merged 4 commits into from
Jun 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions plugin/trino-openlineage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<air.compiler.fail-warnings>true</air.compiler.fail-warnings>
<openlineage.version>1.12.0</openlineage.version>
</properties>

Expand Down Expand Up @@ -116,6 +117,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>junit-extensions</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import io.trino.spi.resourcegroups.QueryType;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -56,6 +55,7 @@

import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

public class OpenLineageListener
Expand All @@ -81,7 +81,7 @@ public OpenLineageListener(OpenLineageClient client, OpenLineageListenerConfig l
defaultNamespace = defaultNamespace.replace(listenerConfig.getTrinoURI().getScheme(), "trino");
}
else {
defaultNamespace = String.format("trino://%s", defaultNamespace);
defaultNamespace = "trino://%s" + defaultNamespace;
}

this.jobNamespace = listenerConfig.getNamespace().orElse(defaultNamespace);
Expand All @@ -102,9 +102,9 @@ public void queryCreated(QueryCreatedEvent queryCreatedEvent)
client.emit(event);
return;
}
logger.debug(format("Query type %s not supported. Supported query types %s",
logger.debug("Query type %s not supported. Supported query types %s",
queryCreatedEvent.getContext().getQueryType().toString(),
this.includeQueryTypes));
this.includeQueryTypes);
}

@Override
Expand All @@ -117,9 +117,9 @@ public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
client.emit(event);
return;
}
logger.debug(format("Query type %s not supported. Supported query types %s",
logger.debug("Query type %s not supported. Supported query types %s",
queryCompletedEvent.getContext().getQueryType().toString(),
this.includeQueryTypes));
this.includeQueryTypes);
}

private boolean queryTypeSupported(QueryContext queryContext)
Expand All @@ -132,7 +132,7 @@ private boolean queryTypeSupported(QueryContext queryContext)

private UUID getQueryId(QueryMetadata queryMetadata)
{
return UUID.nameUUIDFromBytes(queryMetadata.getQueryId().getBytes(StandardCharsets.UTF_8));
return UUID.nameUUIDFromBytes(queryMetadata.getQueryId().getBytes(UTF_8));
}

private RunFacet getTrinoQueryContextFacet(QueryContext queryContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import jakarta.validation.constraints.NotNull;

import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;

public class OpenLineageListenerConfig
{
private OpenLineageTransport transport = OpenLineageTransport.CONSOLE;
Expand All @@ -44,6 +46,11 @@ public class OpenLineageListenerConfig
.add(QueryType.DATA_DEFINITION)
.build();

public OpenLineageTransport getTransport()
{
return transport;
}

@Config("openlineage-event-listener.transport.type")
@ConfigDescription("Type of transport used to emit lineage information.")
public OpenLineageListenerConfig setTransport(OpenLineageTransport transport)
Expand All @@ -52,9 +59,10 @@ public OpenLineageListenerConfig setTransport(OpenLineageTransport transport)
return this;
}

public OpenLineageTransport getTransport()
@NotNull
public URI getTrinoURI()
{
return transport;
return trinoURI;
}

@Config("openlineage-event-listener.trino.uri")
Expand All @@ -65,27 +73,25 @@ public OpenLineageListenerConfig setTrinoURI(URI trinoURI)
return this;
}

@NotNull
public URI getTrinoURI()
public Set<QueryType> getIncludeQueryTypes()
{
return trinoURI;
return includeQueryTypes;
}

@Config("openlineage-event-listener.trino.include-query-types")
@ConfigDescription("Which query types emitted by Trino should generate OpenLineage events. Other query types will be filtered out.")
public OpenLineageListenerConfig setIncludeQueryTypes(List<String> includeQueryTypes)
{
this.includeQueryTypes = new HashSet<>(includeQueryTypes.stream()
this.includeQueryTypes = includeQueryTypes.stream()
.map(String::trim)
.map(QueryType::valueOf)
.toList());

.collect(toImmutableSet());
return this;
}

public Set<QueryType> getIncludeQueryTypes()
public List<OpenLineageTrinoFacet> getDisabledFacets()
{
return includeQueryTypes;
return disabledFacets;
}

@Config("openlineage-event-listener.disabled-facets")
Expand All @@ -95,22 +101,14 @@ public OpenLineageListenerConfig setDisabledFacets(List<String> disabledFacets)
{
this.disabledFacets = disabledFacets.stream()
.map(String::trim)
.map(text -> {
try {
return OpenLineageTrinoFacet.fromText(text);
}
catch (IllegalArgumentException e) {
throw new RuntimeException(e);
}
})
.toList();

.map(OpenLineageTrinoFacet::fromText)
.collect(toImmutableList());
return this;
}

public List<OpenLineageTrinoFacet> getDisabledFacets()
public Optional<String> getNamespace()
{
return disabledFacets;
return namespace;
}

@Config("openlineage-event-listener.namespace")
Expand All @@ -120,9 +118,4 @@ public OpenLineageListenerConfig setNamespace(String namespace)
this.namespace = Optional.ofNullable(namespace);
return this;
}

public Optional<String> getNamespace()
{
return namespace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.Optional;

import static java.lang.String.format;

Expand Down Expand Up @@ -65,7 +64,7 @@ public class MarquezServer

private final GenericContainer<?> dockerContainerAPI;
private final PostgreSQLContainer<?> dockerContainerPostgres;
private final Optional<GenericContainer<?>> dockerWebUIContainerAPI;
private final GenericContainer<?> dockerWebUIContainerAPI;

public MarquezServer()
{
Expand Down Expand Up @@ -111,20 +110,15 @@ public MarquezServer(String version)
this.dockerContainerAPI.start();
closer.register(this.dockerContainerAPI::close);

this.dockerWebUIContainerAPI = Optional.of(
new GenericContainer<>("marquezproject/marquez-web:" + version)
this.dockerWebUIContainerAPI = new GenericContainer<>("marquezproject/marquez-web:" + version)
.withNetwork(network)
.withExposedPorts(MARQUEZ_UI_PORT)
.dependsOn(this.dockerContainerAPI)
.withEnv("MARQUEZ_HOST", MARQUEZ_HOST)
.withEnv("MARQUEZ_PORT", String.valueOf(MARQUEZ_PORT))
.withStartupTimeout(Duration.ofSeconds(360)));

this.dockerWebUIContainerAPI.ifPresent(container ->
{
container.start();
closer.register(container::close);
});
.withStartupTimeout(Duration.ofSeconds(360));
this.dockerWebUIContainerAPI.start();
closer.register(this.dockerWebUIContainerAPI::close);
}

private String getPostgresUri()
Expand All @@ -137,13 +131,9 @@ public URI getMarquezUri()
return URI.create(format("http://%s:%s", dockerContainerAPI.getHost(), dockerContainerAPI.getMappedPort(MARQUEZ_PORT)));
}

public Optional<URI> getMarquezWebUIUri()
public URI getMarquezWebUIUri()
{
if (this.dockerWebUIContainerAPI.isPresent()) {
return Optional.of(
URI.create(format("http://%s:%s", dockerWebUIContainerAPI.get().getHost(), dockerWebUIContainerAPI.get().getMappedPort(MARQUEZ_UI_PORT))));
}
return Optional.empty();
return URI.create(format("http://%s:%s", dockerWebUIContainerAPI.getHost(), dockerWebUIContainerAPI.getMappedPort(MARQUEZ_UI_PORT)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
*/
package io.trino.plugin.openlineage;

import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.airlift.log.Logger;
import io.trino.Session;
import io.trino.plugin.memory.MemoryPlugin;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;

import java.util.HashMap;
import java.util.Map;

import static io.airlift.testing.Closeables.closeAllSuppress;
Expand All @@ -33,55 +33,69 @@ public final class OpenLineageListenerQueryRunner

private OpenLineageListenerQueryRunner() {}

public static QueryRunner createOpenLineageRunner(Map<String, String> listenerProperties)
throws Exception
public static Builder builder()
{
QueryRunner queryRunner = null;
try {
queryRunner = DistributedQueryRunner
.builder(createSession())
.setEventListener(new OpenLineageListenerFactory().create(listenerProperties))
.build();
// catalog used for output data
queryRunner.installPlugin(new MemoryPlugin());
queryRunner.createCatalog(CATALOG, "memory");
return new Builder();
}

// catalog used for input data
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");
public static final class Builder
extends DistributedQueryRunner.Builder<Builder>
{
private final Map<String, String> listenerProperties = new HashMap<>();

return queryRunner;
private Builder()
{
super(testSessionBuilder()
.setCatalog(CATALOG)
.setSchema(SCHEMA)
.build());
}
catch (Throwable e) {
closeAllSuppress(e, queryRunner);
throw e;

@CanIgnoreReturnValue
public Builder addListenerProperty(String key, String value)
{
this.listenerProperties.put(key, value);
return this;
}
}

private static Session createSession()
{
return testSessionBuilder()
.setCatalog(CATALOG)
.setSchema(SCHEMA)
.build();
@Override
public DistributedQueryRunner build()
throws Exception
{
super.setEventListener(new OpenLineageListenerFactory().create(listenerProperties));
DistributedQueryRunner queryRunner = super.build();
try {
// catalog used for output data
queryRunner.installPlugin(new MemoryPlugin());
queryRunner.createCatalog(CATALOG, "memory");

// catalog used for input data
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

return queryRunner;
}
catch (Throwable e) {
closeAllSuppress(e, queryRunner);
throw e;
}
}
}

public static void main(String[] args)
throws Exception
{
MarquezServer server = new MarquezServer();

Map<String, String> config = ImmutableMap.of(
"openlineage-event-listener.transport.type", "HTTP",
"openlineage-event-listener.transport.url", server.getMarquezUri().toString(),
"openlineage-event-listener.trino.uri", "http://trino-query-runner:1337");

QueryRunner queryRunner = createOpenLineageRunner(config);
QueryRunner queryRunner = builder()
.addCoordinatorProperty("http-server.http.port", "8080")
.addListenerProperty("openlineage-event-listener.transport.type", "HTTP")
.addListenerProperty("openlineage-event-listener.transport.url", server.getMarquezUri().toString())
.addListenerProperty("openlineage-event-listener.trino.uri", "http://localhost:8080")
.build();
Logger log = Logger.get(OpenLineageListenerQueryRunner.class);
log.info("======== SERVER RUNNING: %s ========", queryRunner.getCoordinator().getBaseUrl());

if (server.getMarquezWebUIUri().isPresent()) {
log.info("======== MARQUEZ UI RUNNING: %s ========", server.getMarquezWebUIUri().get());
}
log.info("======== MARQUEZ UI RUNNING: %s ========", server.getMarquezWebUIUri());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;

public class TestOpenLineageClientHttpTransportConfig
final class TestOpenLineageClientHttpTransportConfig
{
@Test
public void testDefaults()
void testDefaults()
{
assertRecordedDefaults(recordDefaults(OpenLineageClientHttpTransportConfig.class)
.setUrl(null)
Expand All @@ -40,7 +40,7 @@ public void testDefaults()
}

@Test
public void testExplicitPropertyMappings()
void testExplicitPropertyMappings()
{
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("openlineage-event-listener.transport.url", "http://testurl")
Expand Down
Loading