Skip to content

Commit

Permalink
feat(spark/openlineage): Use Openlineage 1.13.1 in Spark Plugin (data…
Browse files Browse the repository at this point in the history
…hub-project#10433)

- Use Openlineage 1.13.1 in Spark Plugin
- Add retry option to datahub client and Spark Plugin
- Add OpenLineage integration doc
treff7es authored May 7, 2024
1 parent 71759f9 commit d08f36f
Showing 25 changed files with 785 additions and 1,195 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -54,7 +54,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '12.16.1'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.5.0'
ext.openLineageVersion = '1.13.1'
ext.logbackClassicJava8 = '1.2.12'

ext.docker_registry = 'acryldata'
15 changes: 13 additions & 2 deletions docs-website/filterTagIndexes.json
Original file line number Diff line number Diff line change
@@ -77,6 +77,17 @@
"Features": ""
}
},
{
"Path": "docs/lineage/dagster",
"imgPath": "img/logos/platforms/dagster.svg",
"Title": "Dagster",
"Description": "Dagster is a next-generation open source orchestration platform for the development, production, and observation of data assets..",
"tags": {
"Platform Type": "Orchestrator",
"Connection Type": "Pull",
"Features": "Stateful Ingestion, UI Ingestion, Status Aspect"
}
},
{
"Path": "docs/generated/ingestion/sources/databricks",
"imgPath": "img/logos/platforms/databricks.png",
@@ -433,7 +444,7 @@
"Path": "docs/generated/ingestion/sources/hive-metastore",
"imgPath": "img/logos/platforms/presto.svg",
"Title": "Hive Metastore",
"Description": "Presto on Hive is a data tool that allows users to query and analyze large datasets stored in Hive using SQL-like syntax.",
"Description": "Hive Metastore (HMS) is a service that stores metadata that is related to Hive, Presto, Trino and other services in a backend Relational Database Management System (RDBMS) ",
"tags": {
"Platform Type": "Datastore",
"Connection Type": "Pull",
@@ -551,7 +562,7 @@
}
},
{
"Path": "docs/metadata-integration/java/spark-lineage",
"Path": "docs/metadata-integration/java/spark-lineage-beta",
"imgPath": "img/logos/platforms/spark.svg",
"Title": "Spark",
"Description": "Spark is a data processing tool that enables fast and efficient processing of large-scale data sets using distributed computing.",
5 changes: 5 additions & 0 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
@@ -317,6 +317,11 @@ module.exports = {
id: "docs/lineage/dagster",
label: "Dagster",
},
{
type: "doc",
id: "docs/lineage/openlineage",
label: "OpenLineage",
},
{
type: "doc",
id: "metadata-integration/java/spark-lineage/README",
11 changes: 11 additions & 0 deletions docs-website/static/img/logos/platforms/dagster.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
92 changes: 92 additions & 0 deletions docs/lineage/openlineage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# OpenLineage

DataHub, now supports [OpenLineage](https://openlineage.io/) integration. With this support, DataHub can ingest and display lineage information from various data processing frameworks, providing users with a comprehensive understanding of their data pipelines.

## Features

- **REST Endpoint Support**: DataHub now includes a REST endpoint that can understand OpenLineage events. This allows users to send lineage information directly to DataHub, enabling easy integration with various data processing frameworks.

- **[Spark Event Listener Plugin](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta)**: DataHub provides a Spark Event Listener plugin that seamlessly integrates OpenLineage's Spark plugin. This plugin enhances DataHub's OpenLineage support by offering additional features such as PathSpec support, column-level lineage, patch support and more.

## OpenLineage Support with DataHub

### 1. REST Endpoint Support

DataHub's REST endpoint allows users to send OpenLineage events directly to DataHub. This enables easy integration with various data processing frameworks, providing users with a centralized location for viewing and managing data lineage information.

With Spark and Airflow we recommend using the Spark Lineage or DataHub's Airflow plugin for tighter integration with DataHub.

#### How to Use

To send OpenLineage messages to DataHub using the REST endpoint, simply make a POST request to the following endpoint:

```
POST GMS_SERVER_HOST:GMS_PORT/api/v2/lineage
```

Include the OpenLineage message in the request body in JSON format.

Example:

```json
{
"eventType": "START",
"eventTime": "2020-12-28T19:52:00.001+10:00",
"run": {
"runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
},
"job": {
"namespace": "workshop",
"name": "process_taxes"
},
"inputs": [
{
"namespace": "postgres://workshop-db:None",
"name": "workshop.public.taxes",
"facets": {
"dataSource": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.10.0/integration/airflow",
"_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/DataSourceDatasetFacet",
"name": "postgres://workshop-db:None",
"uri": "workshop-db"
}
}
}
],
"producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client"
}
```
##### How to set up Airflow
Follow the Airflow guide to setup the Airflow DAGs to send lineage information to DataHub. The guide can be found [here](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html
The transport should look like this:
```json
{"type": "http",
"url": "https://GMS_SERVER_HOST:GMS_PORT/openapi/openlineage/",
"endpoint": "api/v1/lineage",
"auth": {
"type": "api_key",
"api_key": "your-datahub-api-key"
}
}
```

#### Known Limitations
With Spark and Airflow we recommend using the Spark Lineage or DataHub's Airflow plugin for tighter integration with DataHub.

- **[PathSpec](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta/#configuring-hdfs-based-dataset-urns) Support**: While the REST endpoint supports OpenLineage messages, full [PathSpec](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta/#configuring-hdfs-based-dataset-urns)) support is not yet available.

- **Column-level Lineage**: DataHub's current OpenLineage support does not provide full column-level lineage tracking.
- etc...
### 2. Spark Event Listener Plugin

DataHub's Spark Event Listener plugin enhances OpenLineage support by providing additional features such as PathSpec support, column-level lineage, and more.

#### How to Use

Follow the guides of the Spark Lineage plugin page for more information on how to set up the Spark Lineage plugin. The guide can be found [here](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta)

## References

- [OpenLineage](https://openlineage.io/)
- [DataHub OpenAPI Guide](../api/openapi/openapi-usage-guide.md)
- [DataHub Spark Lineage Plugin](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta)
4 changes: 2 additions & 2 deletions metadata-integration/java/datahub-client/build.gradle
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ dependencies {
}
}

compileOnly externalDependency.httpAsyncClient
compileOnly externalDependency.httpClient
implementation externalDependency.jacksonDataBind
runtimeOnly externalDependency.jna

@@ -41,7 +41,7 @@ dependencies {
testImplementation externalDependency.mockServer
testImplementation externalDependency.mockServerClient
testImplementation externalDependency.testContainers
testImplementation externalDependency.httpAsyncClient
testImplementation externalDependency.httpClient
testRuntimeOnly externalDependency.logbackClassic
}

Original file line number Diff line number Diff line change
@@ -7,16 +7,16 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import lombok.SneakyThrows;
import org.apache.http.HttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;

public class MetadataResponseFuture implements Future<MetadataWriteResponse> {
private final Future<HttpResponse> requestFuture;
private final Future<SimpleHttpResponse> requestFuture;
private final AtomicReference<MetadataWriteResponse> responseReference;
private final CountDownLatch responseLatch;
private final ResponseMapper mapper;

public MetadataResponseFuture(
Future<HttpResponse> underlyingFuture,
Future<SimpleHttpResponse> underlyingFuture,
AtomicReference<MetadataWriteResponse> responseAtomicReference,
CountDownLatch responseLatch) {
this.requestFuture = underlyingFuture;
@@ -25,7 +25,8 @@ public MetadataResponseFuture(
this.mapper = null;
}

public MetadataResponseFuture(Future<HttpResponse> underlyingFuture, ResponseMapper mapper) {
public MetadataResponseFuture(
Future<SimpleHttpResponse> underlyingFuture, ResponseMapper mapper) {
this.requestFuture = underlyingFuture;
this.responseReference = null;
this.responseLatch = null;
@@ -50,7 +51,7 @@ public boolean isDone() {
@SneakyThrows
@Override
public MetadataWriteResponse get() throws InterruptedException, ExecutionException {
HttpResponse response = requestFuture.get();
SimpleHttpResponse response = requestFuture.get();
if (mapper != null) {
return mapper.map(response);
} else {
@@ -63,7 +64,7 @@ public MetadataWriteResponse get() throws InterruptedException, ExecutionExcepti
@Override
public MetadataWriteResponse get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
HttpResponse response = requestFuture.get(timeout, unit);
SimpleHttpResponse response = requestFuture.get(timeout, unit);
if (mapper != null) {
return mapper.map(response);
} else {
@@ -75,6 +76,6 @@ public MetadataWriteResponse get(long timeout, TimeUnit unit)

@FunctionalInterface
public interface ResponseMapper {
MetadataWriteResponse map(HttpResponse httpResponse);
MetadataWriteResponse map(SimpleHttpResponse httpResponse);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package datahub.client.rest;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.UnknownHostException;
import java.util.Arrays;
import javax.net.ssl.SSLException;
import lombok.extern.slf4j.Slf4j;
import org.apache.hc.client5.http.impl.DefaultHttpRequestRetryStrategy;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.TimeValue;

@Slf4j
public class DatahubHttpRequestRetryStrategy extends DefaultHttpRequestRetryStrategy {
public DatahubHttpRequestRetryStrategy() {
this(1, TimeValue.ofSeconds(10));
}

public DatahubHttpRequestRetryStrategy(int maxRetries, TimeValue retryInterval) {
super(
maxRetries,
retryInterval,
Arrays.asList(
InterruptedIOException.class,
UnknownHostException.class,
ConnectException.class,
ConnectionClosedException.class,
NoRouteToHostException.class,
SSLException.class),
Arrays.asList(
HttpStatus.SC_TOO_MANY_REQUESTS,
HttpStatus.SC_SERVICE_UNAVAILABLE,
HttpStatus.SC_INTERNAL_SERVER_ERROR));
}

@Override
public boolean retryRequest(
HttpRequest request, IOException exception, int execCount, HttpContext context) {
log.warn("Checking if retry is needed: {}", execCount);
return super.retryRequest(request, exception, execCount, context);
}

@Override
public boolean retryRequest(HttpResponse response, int execCount, HttpContext context) {
log.warn("Retrying request due to error: {}", response);
return super.retryRequest(response, execCount, context);
}
}
Original file line number Diff line number Diff line change
@@ -18,31 +18,35 @@
import datahub.event.UpsertAspectRequest;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.concurrent.ThreadSafe;
import javax.net.ssl.SSLContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.TrustAllStrategy;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.client.HttpAsyncClient;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
import org.apache.hc.client5.http.ssl.NoopHostnameVerifier;
import org.apache.hc.client5.http.ssl.TrustAllStrategy;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.hc.core5.util.TimeValue;

@ThreadSafe
@Slf4j
@@ -89,57 +93,63 @@ public RestEmitter(RestEmitterConfig config) {
dataTemplateCodec = new JacksonDataTemplateCodec(objectMapper.getFactory());

this.config = config;
HttpAsyncClientBuilder httpClientBuilder = this.config.getAsyncHttpClientBuilder();
httpClientBuilder.setRetryStrategy(new DatahubHttpRequestRetryStrategy());

// Override httpClient settings with RestEmitter configs if present
if (config.getTimeoutSec() != null) {
HttpAsyncClientBuilder httpClientBuilder = this.config.getAsyncHttpClientBuilder();
httpClientBuilder.setDefaultRequestConfig(
RequestConfig.custom()
.setConnectTimeout(config.getTimeoutSec() * 1000)
.setSocketTimeout(config.getTimeoutSec() * 1000)
.setConnectionRequestTimeout(
config.getTimeoutSec() * 1000, java.util.concurrent.TimeUnit.MILLISECONDS)
.setResponseTimeout(
config.getTimeoutSec() * 1000, java.util.concurrent.TimeUnit.MILLISECONDS)
.build());
}
if (config.isDisableSslVerification()) {
HttpAsyncClientBuilder httpClientBuilder = this.config.getAsyncHttpClientBuilder();
try {
httpClientBuilder
.setSSLContext(
new SSLContextBuilder().loadTrustMaterial(null, TrustAllStrategy.INSTANCE).build())
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
SSLContext sslcontext =
SSLContexts.custom().loadTrustMaterial(TrustAllStrategy.INSTANCE).build();
TlsStrategy tlsStrategy =
ClientTlsStrategyBuilder.create()
.setSslContext(sslcontext)
.setHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.build();

httpClientBuilder.setConnectionManager(
PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(tlsStrategy)
.build());
} catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException e) {
throw new RuntimeException("Error while creating insecure http client", e);
}
}

this.httpClient = this.config.getAsyncHttpClientBuilder().build();
httpClientBuilder.setRetryStrategy(
new DatahubHttpRequestRetryStrategy(
config.getMaxRetries(), TimeValue.ofSeconds(config.getRetryIntervalSec())));

this.httpClient = httpClientBuilder.build();
this.httpClient.start();
this.ingestProposalUrl = this.config.getServer() + "/aspects?action=ingestProposal";
this.ingestOpenApiUrl = config.getServer() + "/openapi/entities/v1/";
this.configUrl = this.config.getServer() + "/config";
this.eventFormatter = this.config.getEventFormatter();
}

private static MetadataWriteResponse mapResponse(HttpResponse response) {
private static MetadataWriteResponse mapResponse(SimpleHttpResponse response) {
MetadataWriteResponse.MetadataWriteResponseBuilder builder =
MetadataWriteResponse.builder().underlyingResponse(response);
if ((response != null)
&& (response.getStatusLine() != null)
&& (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK
|| response.getStatusLine().getStatusCode() == HttpStatus.SC_CREATED)) {
if ((response != null) && (response.getCode()) == HttpStatus.SC_OK
|| Objects.requireNonNull(response).getCode() == HttpStatus.SC_CREATED) {
builder.success(true);
} else {
builder.success(false);
}
// Read response content
try {
ByteArrayOutputStream result = new ByteArrayOutputStream();
InputStream contentStream = response.getEntity().getContent();
byte[] buffer = new byte[1024];
int length = contentStream.read(buffer);
while (length > 0) {
result.write(buffer, 0, length);
length = contentStream.read(buffer);
}
builder.responseContent(result.toString("UTF-8"));
builder.responseContent(response.getBody().getBodyText());
} catch (Exception e) {
// Catch all exceptions and still return a valid response object
log.warn("Wasn't able to convert response into a string", e);
@@ -198,21 +208,22 @@ public Future<MetadataWriteResponse> emit(MetadataChangeProposal mcp, Callback c
private Future<MetadataWriteResponse> postGeneric(
String urlStr, String payloadJson, Object originalRequest, Callback callback)
throws IOException {
HttpPost httpPost = new HttpPost(urlStr);
httpPost.setHeader("Content-Type", "application/json");
httpPost.setHeader("X-RestLi-Protocol-Version", "2.0.0");
httpPost.setHeader("Accept", "application/json");
this.config.getExtraHeaders().forEach((k, v) -> httpPost.setHeader(k, v));
SimpleRequestBuilder simpleRequestBuilder = SimpleRequestBuilder.post(urlStr);
simpleRequestBuilder.setHeader("Content-Type", "application/json");
simpleRequestBuilder.setHeader("X-RestLi-Protocol-Version", "2.0.0");
simpleRequestBuilder.setHeader("Accept", "application/json");
this.config.getExtraHeaders().forEach(simpleRequestBuilder::setHeader);
if (this.config.getToken() != null) {
httpPost.setHeader("Authorization", "Bearer " + this.config.getToken());
simpleRequestBuilder.setHeader("Authorization", "Bearer " + this.config.getToken());
}
httpPost.setEntity(new StringEntity(payloadJson));

simpleRequestBuilder.setBody(payloadJson, ContentType.APPLICATION_JSON);
AtomicReference<MetadataWriteResponse> responseAtomicReference = new AtomicReference<>();
CountDownLatch responseLatch = new CountDownLatch(1);
FutureCallback<HttpResponse> httpCallback =
new FutureCallback<HttpResponse>() {
FutureCallback<SimpleHttpResponse> httpCallback =
new FutureCallback<SimpleHttpResponse>() {
@Override
public void completed(HttpResponse response) {
public void completed(SimpleHttpResponse response) {
MetadataWriteResponse writeResponse = null;
try {
writeResponse = mapResponse(response);
@@ -252,16 +263,20 @@ public void cancelled() {
}
}
};
Future<HttpResponse> requestFuture = httpClient.execute(httpPost, httpCallback);
Future<SimpleHttpResponse> requestFuture =
httpClient.execute(simpleRequestBuilder.build(), httpCallback);
return new MetadataResponseFuture(requestFuture, responseAtomicReference, responseLatch);
}

private Future<MetadataWriteResponse> getGeneric(String urlStr) throws IOException {
HttpGet httpGet = new HttpGet(urlStr);
httpGet.setHeader("Content-Type", "application/json");
httpGet.setHeader("X-RestLi-Protocol-Version", "2.0.0");
httpGet.setHeader("Accept", "application/json");
Future<HttpResponse> response = this.httpClient.execute(httpGet, null);
SimpleHttpRequest simpleHttpRequest =
SimpleRequestBuilder.get(urlStr)
.addHeader("Content-Type", "application/json")
.addHeader("X-RestLi-Protocol-Version", "2.0.0")
.addHeader("Accept", "application/json")
.build();

Future<SimpleHttpResponse> response = this.httpClient.execute(simpleHttpRequest, null);
return new MetadataResponseFuture(response, RestEmitter::mapResponse);
}

@@ -284,20 +299,25 @@ public Future<MetadataWriteResponse> emit(List<UpsertAspectRequest> request, Cal

private Future<MetadataWriteResponse> postOpenAPI(
List<UpsertAspectRequest> payload, Callback callback) throws IOException {
HttpPost httpPost = new HttpPost(ingestOpenApiUrl);
httpPost.setHeader("Content-Type", "application/json");
httpPost.setHeader("Accept", "application/json");
this.config.getExtraHeaders().forEach((k, v) -> httpPost.setHeader(k, v));
SimpleRequestBuilder simpleRequestBuilder =
SimpleRequestBuilder.post(ingestOpenApiUrl)
.addHeader("Content-Type", "application/json")
.addHeader("Accept", "application/json")
.addHeader("X-RestLi-Protocol-Version", "2.0.0");

this.config.getExtraHeaders().forEach(simpleRequestBuilder::addHeader);

if (this.config.getToken() != null) {
httpPost.setHeader("Authorization", "Bearer " + this.config.getToken());
simpleRequestBuilder.addHeader("Authorization", "Bearer " + this.config.getToken());
}
httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(payload)));
simpleRequestBuilder.setBody(
objectMapper.writeValueAsString(payload), ContentType.APPLICATION_JSON);
AtomicReference<MetadataWriteResponse> responseAtomicReference = new AtomicReference<>();
CountDownLatch responseLatch = new CountDownLatch(1);
FutureCallback<HttpResponse> httpCallback =
new FutureCallback<HttpResponse>() {
FutureCallback<SimpleHttpResponse> httpCallback =
new FutureCallback<SimpleHttpResponse>() {
@Override
public void completed(HttpResponse response) {
public void completed(SimpleHttpResponse response) {
MetadataWriteResponse writeResponse = null;
try {
writeResponse = mapResponse(response);
@@ -337,12 +357,13 @@ public void cancelled() {
}
}
};
Future<HttpResponse> requestFuture = httpClient.execute(httpPost, httpCallback);
Future<SimpleHttpResponse> requestFuture =
httpClient.execute(simpleRequestBuilder.build(), httpCallback);
return new MetadataResponseFuture(requestFuture, responseAtomicReference, responseLatch);
}

@VisibleForTesting
HttpAsyncClient getHttpClient() {
CloseableHttpAsyncClient getHttpClient() {
return this.httpClient;
}
}
Original file line number Diff line number Diff line change
@@ -10,8 +10,10 @@
import lombok.NonNull;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.core5.util.TimeValue;

@Value
@Builder
@@ -23,20 +25,23 @@ public class RestEmitterConfig {
public static final String DEFAULT_AUTH_TOKEN = null;
public static final String CLIENT_VERSION_PROPERTY = "clientVersion";

@Builder.Default private final String server = "http://localhost:8080";
@Builder.Default String server = "http://localhost:8080";

private final Integer timeoutSec;
@Builder.Default private final boolean disableSslVerification = false;
Integer timeoutSec;
@Builder.Default boolean disableSslVerification = false;

@Builder.Default private final String token = DEFAULT_AUTH_TOKEN;
@Builder.Default int maxRetries = 0;

@Builder.Default @NonNull private final Map<String, String> extraHeaders = Collections.EMPTY_MAP;
@Builder.Default int retryIntervalSec = 10;

private final HttpAsyncClientBuilder asyncHttpClientBuilder;
@Builder.Default String token = DEFAULT_AUTH_TOKEN;

@Builder.Default @NonNull Map<String, String> extraHeaders = Collections.EMPTY_MAP;

@Builder.Default
private final EventFormatter eventFormatter =
new EventFormatter(EventFormatter.Format.PEGASUS_JSON);
EventFormatter eventFormatter = new EventFormatter(EventFormatter.Format.PEGASUS_JSON);

HttpAsyncClientBuilder asyncHttpClientBuilder;

public static class RestEmitterConfigBuilder {

@@ -53,13 +58,19 @@ private String getVersion() {
}

private HttpAsyncClientBuilder asyncHttpClientBuilder =
HttpAsyncClientBuilder.create()
HttpAsyncClients.custom()
.setUserAgent("DataHub-RestClient/" + getVersion())
.setDefaultRequestConfig(
RequestConfig.custom()
.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_SEC * 1000)
.setSocketTimeout(DEFAULT_READ_TIMEOUT_SEC * 1000)
.setConnectionRequestTimeout(
DEFAULT_CONNECT_TIMEOUT_SEC * 1000,
java.util.concurrent.TimeUnit.MILLISECONDS)
.setResponseTimeout(
DEFAULT_READ_TIMEOUT_SEC * 1000, java.util.concurrent.TimeUnit.MILLISECONDS)
.build())
.setUserAgent("DataHub-RestClient/" + getVersion());
.setRetryStrategy(
new DatahubHttpRequestRetryStrategy(
maxRetries$value, TimeValue.ofSeconds(retryIntervalSec$value)));

public RestEmitterConfigBuilder with(Consumer<RestEmitterConfigBuilder> builderFunction) {
builderFunction.accept(this);
Original file line number Diff line number Diff line change
@@ -11,8 +11,8 @@
import datahub.event.MetadataChangeProposalWrapper;
import datahub.server.TestDataHubServer;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -32,117 +32,148 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.net.ssl.SSLHandshakeException;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.core5.http.Method;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockserver.matchers.Times;
import org.mockserver.model.HttpError;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
import org.mockserver.model.HttpStatusCode;
import org.mockserver.model.RequestDefinition;
import org.mockserver.verify.VerificationTimes;

@RunWith(MockitoJUnitRunner.class)
public class RestEmitterTest {

@Mock HttpAsyncClientBuilder mockHttpClientFactory;

@Mock CloseableHttpAsyncClient mockClient;

@Captor ArgumentCaptor<HttpPost> postArgumentCaptor;

@Captor ArgumentCaptor<FutureCallback> callbackCaptor;
@Test
public void testPost()
throws URISyntaxException, IOException, ExecutionException, InterruptedException {
TestDataHubServer testDataHubServer = new TestDataHubServer();
Integer port = testDataHubServer.getMockServer().getPort();
RestEmitter emitter = RestEmitter.create(b -> b.server("http://localhost:" + port));

@Before
public void setupMocks() {
Mockito.when(mockHttpClientFactory.build()).thenReturn(mockClient);
MetadataChangeProposalWrapper mcp =
getMetadataChangeProposalWrapper(
"Test Dataset", "urn:li:dataset:(urn:li:dataPlatform:hive,foo.bar,PROD)");
Future<MetadataWriteResponse> future = emitter.emit(mcp, null);
MetadataWriteResponse response = future.get();
String expectedContent =
"{\"proposal\":{\"aspectName\":\"datasetProperties\","
+ "\"entityUrn\":\"urn:li:dataset:(urn:li:dataPlatform:hive,foo.bar,PROD)\","
+ "\"entityType\":\"dataset\",\"changeType\":\"UPSERT\",\"aspect\":{\"contentType\":\"application/json\""
+ ",\"value\":\"{\\\"description\\\":\\\"Test Dataset\\\"}\"}}}";
testDataHubServer
.getMockServer()
.verify(
request().withHeader("X-RestLi-Protocol-Version", "2.0.0").withBody(expectedContent));
}

@Test
public void testPost() throws URISyntaxException, IOException {
public void testPostWithRetry()
throws URISyntaxException, IOException, ExecutionException, InterruptedException {
TestDataHubServer testDataHubServer = new TestDataHubServer();
Integer port = testDataHubServer.getMockServer().getPort();
RestEmitterConfig config =
RestEmitterConfig.builder()
.server("http://localhost:" + port)
.maxRetries(3)
.retryIntervalSec(1)
.build();
RestEmitter emitter = new RestEmitter(config);

RestEmitter emitter = RestEmitter.create(b -> b.asyncHttpClientBuilder(mockHttpClientFactory));
MetadataChangeProposalWrapper mcp =
getMetadataChangeProposalWrapper(
"Test Dataset", "urn:li:dataset:(urn:li:dataPlatform:hive,foo.bar,PROD)");
emitter.emit(mcp, null);
Mockito.verify(mockClient).execute(postArgumentCaptor.capture(), callbackCaptor.capture());
FutureCallback callback = callbackCaptor.getValue();
Assert.assertNotNull(callback);
HttpPost testPost = postArgumentCaptor.getValue();
Assert.assertEquals("2.0.0", testPost.getFirstHeader("X-RestLi-Protocol-Version").getValue());
InputStream is = testPost.getEntity().getContent();
byte[] contentBytes = new byte[(int) testPost.getEntity().getContentLength()];
is.read(contentBytes);
String contentString = new String(contentBytes, StandardCharsets.UTF_8);
Future<MetadataWriteResponse> future = emitter.emit(mcp, null);
MetadataWriteResponse response = future.get();
String expectedContent =
"{\"proposal\":{\"aspectName\":\"datasetProperties\","
+ "\"entityUrn\":\"urn:li:dataset:(urn:li:dataPlatform:hive,foo.bar,PROD)\","
+ "\"entityType\":\"dataset\",\"changeType\":\"UPSERT\",\"aspect\":{\"contentType\":\"application/json\""
+ ",\"value\":\"{\\\"description\\\":\\\"Test Dataset\\\"}\"}}}";
Assert.assertEquals(expectedContent, contentString);
testDataHubServer
.getMockServer()
.verify(
request().withHeader("X-RestLi-Protocol-Version", "2.0.0").withBody(expectedContent),
VerificationTimes.exactly(1))
.when(
request()
.withPath("/aspect")
.withHeader("X-RestLi-Protocol-Version", "2.0.0")
.withBody(expectedContent),
Times.exactly(4))
.respond(HttpResponse.response().withStatusCode(500).withBody("exception"));
}

@Test
public void testExceptions()
throws URISyntaxException, IOException, ExecutionException, InterruptedException {

RestEmitter emitter = RestEmitter.create($ -> $.asyncHttpClientBuilder(mockHttpClientFactory));
TestDataHubServer testDataHubServer = new TestDataHubServer();
Integer port = testDataHubServer.getMockServer().getPort();
RestEmitter emitter =
RestEmitter.create(
b ->
b.server("http://localhost:" + port)
.extraHeaders(Collections.singletonMap("Test-Header", "Test-Value")));

MetadataChangeProposalWrapper mcp =
MetadataChangeProposalWrapper.create(
b ->
b.entityType("dataset")
.entityUrn("urn:li:dataset:(urn:li:dataPlatform:hive,foo.bar,PROD)")
.upsert()
.aspect(new DatasetProperties().setDescription("Test Dataset")));

Future<HttpResponse> mockFuture = Mockito.mock(Future.class);
Mockito.when(mockClient.execute(Mockito.any(), Mockito.any())).thenReturn(mockFuture);
Mockito.when(mockFuture.get())
.thenThrow(new ExecutionException("Test execution exception", null));
getMetadataChangeProposalWrapper(
"Test Dataset", "urn:li:dataset:(urn:li:dataPlatform:hive,foo.bar,PROD)");
Future<MetadataWriteResponse> future = emitter.emit(mcp, null);
MetadataWriteResponse response = future.get();
String expectedContent =
"{\"proposal\":{\"aspectName\":\"datasetProperties\","
+ "\"entityUrn\":\"urn:li:dataset:(urn:li:dataPlatform:hive,foo.bar,PROD)\","
+ "\"entityType\":\"dataset\",\"changeType\":\"UPSERT\",\"aspect\":{\"contentType\":\"application/json\""
+ ",\"value\":\"{\\\"description\\\":\\\"Test Dataset\\\"}\"}}}";
testDataHubServer
.getMockServer()
.when(request(), Times.once())
.error(HttpError.error().withDropConnection(true));

try {
emitter.emit(mcp, null).get();
Assert.fail("should not be here");
} catch (ExecutionException e) {
Assert.assertEquals(e.getMessage(), "Test execution exception");
Assert.assertEquals(
e.getMessage(),
"org.apache.hc.core5.http.ConnectionClosedException: Connection closed by peer");
}
}

@Test
public void testExtraHeaders() throws Exception {
public void testExtraHeaders()
throws URISyntaxException, IOException, ExecutionException, InterruptedException {
TestDataHubServer testDataHubServer = new TestDataHubServer();
Integer port = testDataHubServer.getMockServer().getPort();
RestEmitter emitter =
RestEmitter.create(
b ->
b.asyncHttpClientBuilder(mockHttpClientFactory)
b.server("http://localhost:" + port)
.extraHeaders(Collections.singletonMap("Test-Header", "Test-Value")));
MetadataChangeProposalWrapper mcpw =
MetadataChangeProposalWrapper.create(
b ->
b.entityType("dataset")
.entityUrn("urn:li:dataset:foo")
.upsert()
.aspect(new DatasetProperties()));
Future<HttpResponse> mockFuture = Mockito.mock(Future.class);
Mockito.when(mockClient.execute(Mockito.any(), Mockito.any())).thenReturn(mockFuture);
emitter.emit(mcpw, null);
Mockito.verify(mockClient).execute(postArgumentCaptor.capture(), callbackCaptor.capture());
FutureCallback callback = callbackCaptor.getValue();
Assert.assertNotNull(callback);
HttpPost testPost = postArgumentCaptor.getValue();
// old headers are not modified
Assert.assertEquals("2.0.0", testPost.getFirstHeader("X-RestLi-Protocol-Version").getValue());
// new headers are added
Assert.assertEquals("Test-Value", testPost.getFirstHeader("Test-Header").getValue());

MetadataChangeProposalWrapper mcp =
getMetadataChangeProposalWrapper(
"Test Dataset", "urn:li:dataset:(urn:li:dataPlatform:hive,foo.bar,PROD)");
Future<MetadataWriteResponse> future = emitter.emit(mcp, null);
MetadataWriteResponse response = future.get();
String expectedContent =
"{\"proposal\":{\"aspectName\":\"datasetProperties\","
+ "\"entityUrn\":\"urn:li:dataset:(urn:li:dataPlatform:hive,foo.bar,PROD)\","
+ "\"entityType\":\"dataset\",\"changeType\":\"UPSERT\",\"aspect\":{\"contentType\":\"application/json\""
+ ",\"value\":\"{\\\"description\\\":\\\"Test Dataset\\\"}\"}}}";
testDataHubServer
.getMockServer()
.verify(
request()
.withHeader("Test-Header", "Test-Value")
.withHeader("X-RestLi-Protocol-Version", "2.0.0")
.withBody(expectedContent));
}

@Test
@@ -168,7 +199,7 @@ public void multithreadedTestExecutors() throws Exception {
.withQueryStringParameter("action", "ingestProposal")
.withHeader("Content-type", "application/json"),
Times.unlimited())
.respond(org.mockserver.model.HttpResponse.response().withStatusCode(200));
.respond(HttpResponse.response().withStatusCode(200));
ExecutorService executor = Executors.newFixedThreadPool(10);
ArrayList<Future> results = new ArrayList();
Random random = new Random();
@@ -476,26 +507,27 @@ public void testUserAgentHeader() throws IOException, ExecutionException, Interr

@Test
public void testDisableSslVerification()
throws IOException, InterruptedException, ExecutionException {
throws IOException, InterruptedException, ExecutionException, URISyntaxException {
RestEmitter restEmitter =
new RestEmitter(RestEmitterConfig.builder().disableSslVerification(true).build());
final String hostWithSsl = "https://self-signed.badssl.com";
final HttpGet request = new HttpGet(hostWithSsl);
final SimpleHttpRequest request = SimpleHttpRequest.create(Method.GET, new URI(hostWithSsl));

final HttpResponse response = restEmitter.getHttpClient().execute(request, null).get();
final SimpleHttpResponse response = restEmitter.getHttpClient().execute(request, null).get();
restEmitter.close();
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
Assert.assertEquals(HttpStatusCode.OK_200.code(), response.getCode());
}

@Test
public void testSslVerificationException()
throws IOException, InterruptedException, ExecutionException {
throws IOException, InterruptedException, ExecutionException, URISyntaxException {
RestEmitter restEmitter =
new RestEmitter(RestEmitterConfig.builder().disableSslVerification(false).build());
final String hostWithSsl = "https://self-signed.badssl.com";
final HttpGet request = new HttpGet(hostWithSsl);
final SimpleHttpRequest request = SimpleHttpRequest.create(Method.GET, new URI(hostWithSsl));

try {
HttpResponse response = restEmitter.getHttpClient().execute(request, null).get();
SimpleHttpResponse response = restEmitter.getHttpClient().execute(request, null).get();
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e instanceof ExecutionException);
2 changes: 1 addition & 1 deletion metadata-integration/java/datahub-event/build.gradle
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ dependencies {
testImplementation externalDependency.testng
testImplementation externalDependency.mockito
testImplementation externalDependency.testContainers
testImplementation externalDependency.httpAsyncClient
testImplementation externalDependency.httpClient
testRuntimeOnly externalDependency.logbackClassicJava8
}

Original file line number Diff line number Diff line change
@@ -621,6 +621,11 @@ private static void processParentJob(

private static void processJobInputs(
DatahubJob datahubJob, OpenLineage.RunEvent event, DatahubOpenlineageConfig datahubConf) {

if (event.getInputs() == null) {
return;
}

for (OpenLineage.InputDataset input :
event.getInputs().stream()
.filter(input -> input.getFacets() != null)
@@ -646,6 +651,11 @@ private static void processJobInputs(

private static void processJobOutputs(
DatahubJob datahubJob, OpenLineage.RunEvent event, DatahubOpenlineageConfig datahubConf) {

if (event.getOutputs() == null) {
return;
}

for (OpenLineage.OutputDataset output :
event.getOutputs().stream()
.filter(input -> input.getFacets() != null)
14 changes: 8 additions & 6 deletions metadata-integration/java/spark-lineage-beta/README.md
Original file line number Diff line number Diff line change
@@ -24,15 +24,15 @@ When running jobs using spark-submit, the agent needs to be configured in the co

```text
#Configuring DataHub spark agent jar
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.1
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.3
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server http://localhost:8080
```

## spark-submit command line

```sh
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.1 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.3 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
```

### Configuration Instructions: Amazon EMR
@@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it
stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html)

```text
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.1
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.3
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server https://your_datahub_host/gms
#If you have authentication set up then you also need to specify the Datahub access token
@@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh
spark = SparkSession.builder
.master("spark://spark-master:7077")
.appName("test-application")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.1.0")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.3")
.config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
.config("spark.datahub.rest.server", "http://localhost:8080")
.enableHiveSupport()
@@ -79,7 +79,7 @@ appName("test-application")
config("spark.master","spark://spark-master:7077")
.

config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.1")
config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.3")
.

config("spark.extraListeners","datahub.spark.DatahubSparkListener")
@@ -164,6 +164,8 @@ information like tokens.
| spark.datahub.rest.server || | Datahub server url eg:<http://localhost:8080> |
| spark.datahub.rest.token | | | Authentication token. |
| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! |
| spark.datahub.rest.rest.max_retries | | 0 | Number of times a request retried if failed |
| spark.datahub.rest.rest.retry_interval | | 10 | Number of seconds to wait between retries |
| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance |
| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance |
| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD |
@@ -180,7 +182,7 @@ information like tokens.
| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow |
| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow |
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricsk. You should enable this on Databricks if you want coalesced run . |
| spark.datahub.patch.enabled | | | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled.
| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled.
|

## What to Expect: The Metadata Model
6 changes: 3 additions & 3 deletions metadata-integration/java/spark-lineage-beta/build.gradle
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ dependencies {
provided(externalDependency.sparkSql)
provided(externalDependency.sparkHive)
implementation 'org.slf4j:slf4j-log4j12:2.0.7'
implementation externalDependency.httpAsyncClient
implementation externalDependency.httpClient
implementation externalDependency.logbackClassicJava8
implementation externalDependency.typesafeConfig
implementation externalDependency.commonsLang
@@ -53,7 +53,7 @@ dependencies {
implementation project(path: ':metadata-integration:java:openlineage-converter', configuration: 'shadow')

//implementation "io.acryl:datahub-client:0.10.2"
implementation "io.openlineage:openlineage-spark:$openLineageVersion"
implementation "io.openlineage:openlineage-spark_2.12:$openLineageVersion"
compileOnly "org.apache.iceberg:iceberg-spark3-runtime:0.12.1"
compileOnly "org.apache.spark:spark-sql_2.12:3.1.3"

@@ -123,7 +123,7 @@ shadowJar {
relocate 'com.fasterxml.jackson', 'datahub.spark2.shaded.jackson'
relocate 'org.slf4j', 'datahub.spark2.shaded.org.slf4j'
//
relocate 'org.apache.http', 'io.acryl.shaded.http'
relocate 'org.apache.hc', 'io.acryl.shaded.http'
relocate 'org.apache.commons.codec', 'datahub.spark2.shaded.o.a.c.codec'
relocate 'org.apache.commons.compress', 'datahub.spark2.shaded.o.a.c.compress'
relocate 'org.apache.commons.lang3', 'datahub.spark2.shaded.o.a.c.lang3'
Original file line number Diff line number Diff line change
@@ -23,8 +23,8 @@
import io.datahubproject.openlineage.dataset.DatahubJob;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import io.openlineage.spark.agent.ArgumentParser;
import io.openlineage.spark.agent.EventEmitter;
import io.openlineage.spark.api.SparkOpenLineageConfig;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Instant;
@@ -44,7 +44,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.streaming.StreamingQueryProgress;

@Slf4j
@@ -55,10 +54,11 @@ public class DatahubEventEmitter extends EventEmitter {
private final Map<String, MetadataChangeProposalWrapper> schemaMap = new HashMap<>();
private SparkLineageConf datahubConf;

private EventFormatter eventFormatter = new EventFormatter();
private final EventFormatter eventFormatter = new EventFormatter();

public DatahubEventEmitter() throws URISyntaxException {
super(ArgumentParser.parse(new SparkConf()));
public DatahubEventEmitter(SparkOpenLineageConfig config, String applicationJobName)
throws URISyntaxException {
super(config, applicationJobName);
}

private Optional<Emitter> getEmitter() {
@@ -167,7 +167,7 @@ public List<MetadataChangeProposal> generateCoalescedMcps() {
List<MetadataChangeProposal> mcps = new ArrayList<>();

if (_datahubJobs.isEmpty()) {
log.warn("No lineage events to emit. Maybe the spark job finished premaraturely?");
log.warn("No lineage events to emit. Maybe the spark job finished prematurely?");
return mcps;
}

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datahub.spark;

import static datahub.spark.conf.SparkConfigParser.*;
import static io.openlineage.spark.agent.util.ScalaConversionUtils.*;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -10,16 +11,33 @@
import datahub.spark.conf.SparkAppContext;
import datahub.spark.conf.SparkConfigParser;
import datahub.spark.conf.SparkLineageConf;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.openlineage.client.OpenLineageConfig;
import io.openlineage.client.circuitBreaker.CircuitBreaker;
import io.openlineage.client.circuitBreaker.CircuitBreakerFactory;
import io.openlineage.client.circuitBreaker.NoOpCircuitBreaker;
import io.openlineage.client.metrics.MicrometerProvider;
import io.openlineage.spark.agent.ArgumentParser;
import io.openlineage.spark.agent.OpenLineageSparkListener;
import io.openlineage.spark.agent.Versions;
import io.openlineage.spark.agent.lifecycle.ContextFactory;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.api.SparkOpenLineageConfig;
import java.net.URISyntaxException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext$;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.package$;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
@@ -30,20 +48,28 @@
import org.apache.spark.sql.streaming.StreamingQueryListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.Option;

public class DatahubSparkListener extends SparkListener {
private static final Logger log = LoggerFactory.getLogger(DatahubSparkListener.class);
private final Map<String, Instant> batchLastUpdated = new HashMap<String, Instant>();
private final OpenLineageSparkListener listener;
private final DatahubEventEmitter emitter;
private DatahubEventEmitter emitter;
private Config datahubConf = ConfigFactory.empty();
private SparkAppContext appContext;
private static ContextFactory contextFactory;
private static CircuitBreaker circuitBreaker = new NoOpCircuitBreaker();
private static final String sparkVersion = package$.MODULE$.SPARK_VERSION();

private final Function0<Option<SparkContext>> activeSparkContext =
ScalaConversionUtils.toScalaFn(SparkContext$.MODULE$::getActive);

private static MeterRegistry meterRegistry;
private boolean isDisabled;

public DatahubSparkListener() throws URISyntaxException {
listener = new OpenLineageSparkListener();
emitter = new DatahubEventEmitter();
ContextFactory contextFactory = new ContextFactory(emitter);
OpenLineageSparkListener.init(contextFactory);
}

private static SparkAppContext getSparkAppContext(
@@ -61,13 +87,14 @@ private static SparkAppContext getSparkAppContext(

public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
long startTime = System.currentTimeMillis();
initializeContextFactoryIfNotInitialized();

log.debug("Application start called");
log.info("Application start called");
this.appContext = getSparkAppContext(applicationStart);

listener.onApplicationStart(applicationStart);
long elapsedTime = System.currentTimeMillis() - startTime;
log.debug("onApplicationStart completed successfully in {} ms", elapsedTime);
log.info("onApplicationStart completed successfully in {} ms", elapsedTime);
}

public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf) {
@@ -87,21 +114,36 @@ public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf) {
boolean disableSslVerification =
sparkConf.hasPath(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY)
&& sparkConf.getBoolean(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY);

int retry_interval_in_sec =
sparkConf.hasPath(SparkConfigParser.RETRY_INTERVAL_IN_SEC)
? sparkConf.getInt(SparkConfigParser.RETRY_INTERVAL_IN_SEC)
: 5;

int max_retries =
sparkConf.hasPath(SparkConfigParser.MAX_RETRIES)
? sparkConf.getInt(SparkConfigParser.MAX_RETRIES)
: 0;

log.info(
"REST Emitter Configuration: GMS url {}{}",
gmsUrl,
(sparkConf.hasPath(SparkConfigParser.GMS_URL_KEY) ? "" : "(default)"));
if (token != null) {
log.info("REST Emitter Configuration: Token {}", "XXXXX");
}

if (disableSslVerification) {
log.warn("REST Emitter Configuration: ssl verification will be disabled.");
}

RestEmitterConfig restEmitterConf =
RestEmitterConfig.builder()
.server(gmsUrl)
.token(token)
.disableSslVerification(disableSslVerification)
.maxRetries(max_retries)
.retryIntervalSec(retry_interval_in_sec)
.build();
return Optional.of(new RestDatahubEmitterConfig(restEmitterConf));
} else {
@@ -145,7 +187,12 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
if (datahubConf.hasPath(STREAMING_JOB) && (datahubConf.getBoolean(STREAMING_JOB))) {
return;
}
emitter.emitCoalesced();
if (emitter != null) {
emitter.emitCoalesced();
} else {
log.warn("Emitter is not initialized, unable to emit coalesced events");
}

long elapsedTime = System.currentTimeMillis() - startTime;
log.debug("onApplicationEnd completed successfully in {} ms", elapsedTime);
}
@@ -170,6 +217,8 @@ public void onJobEnd(SparkListenerJobEnd jobEnd) {

public void onJobStart(SparkListenerJobStart jobStart) {
long startTime = System.currentTimeMillis();
initializeContextFactoryIfNotInitialized();

log.debug("Job start called");
loadDatahubConfig(this.appContext, jobStart.properties());
listener.onJobStart(jobStart);
@@ -227,4 +276,72 @@ public void onOtherEvent(SparkListenerEvent event) {
log.debug("onOtherEvent completed successfully in {} ms", elapsedTime);
}
}

private static void initializeMetrics(OpenLineageConfig openLineageConfig) {
meterRegistry =
MicrometerProvider.addMeterRegistryFromConfig(openLineageConfig.getMetricsConfig());
String disabledFacets;
if (openLineageConfig.getFacetsConfig() != null
&& openLineageConfig.getFacetsConfig().getDisabledFacets() != null) {
disabledFacets = String.join(";", openLineageConfig.getFacetsConfig().getDisabledFacets());
} else {
disabledFacets = "";
}
meterRegistry
.config()
.commonTags(
Tags.of(
Tag.of("openlineage.spark.integration.version", Versions.getVersion()),
Tag.of("openlineage.spark.version", sparkVersion),
Tag.of("openlineage.spark.disabled.facets", disabledFacets)));
((CompositeMeterRegistry) meterRegistry)
.getRegistries()
.forEach(
r ->
r.config()
.commonTags(
Tags.of(
Tag.of("openlineage.spark.integration.version", Versions.getVersion()),
Tag.of("openlineage.spark.version", sparkVersion),
Tag.of("openlineage.spark.disabled.facets", disabledFacets))));
}

private void initializeContextFactoryIfNotInitialized() {
if (contextFactory != null || isDisabled) {
return;
}
asJavaOptional(activeSparkContext.apply())
.ifPresent(context -> initializeContextFactoryIfNotInitialized(context.appName()));
}

private void initializeContextFactoryIfNotInitialized(String appName) {
if (contextFactory != null || isDisabled) {
return;
}
SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
if (sparkEnv == null) {
log.warn(
"OpenLineage listener instantiated, but no configuration could be found. "
+ "Lineage events will not be collected");
return;
}
initializeContextFactoryIfNotInitialized(sparkEnv.conf(), appName);
}

private void initializeContextFactoryIfNotInitialized(SparkConf sparkConf, String appName) {
if (contextFactory != null || isDisabled) {
return;
}
try {
SparkOpenLineageConfig config = ArgumentParser.parse(sparkConf);
// Needs to be done before initializing OpenLineageClient
initializeMetrics(config);
emitter = new DatahubEventEmitter(config, appName);
contextFactory = new ContextFactory(emitter, meterRegistry, config);
circuitBreaker = new CircuitBreakerFactory(config.getCircuitBreaker()).build();
OpenLineageSparkListener.init(contextFactory);
} catch (URISyntaxException e) {
log.error("Unable to parse OpenLineage endpoint. Lineage events will not be collected", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -29,6 +29,9 @@ public class SparkConfigParser {
public static final String GMS_URL_KEY = "rest.server";
public static final String GMS_AUTH_TOKEN = "rest.token";
public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification";
public static final String MAX_RETRIES = "rest.max_retries";
public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec";

public static final String COALESCE_KEY = "coalesce_jobs";
public static final String PATCH_ENABLED = "patch.enabled";

@@ -304,7 +307,7 @@ public static boolean isCoalesceEnabled(Config datahubConfig) {

public static boolean isPatchEnabled(Config datahubConfig) {
if (!datahubConfig.hasPath(PATCH_ENABLED)) {
return true;
return false;
}
return datahubConfig.hasPath(PATCH_ENABLED) && datahubConfig.getBoolean(PATCH_ENABLED);
}

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,88 +1,64 @@
/*
/* Copyright 2018-2023 contributors to the OpenLineage project
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.spark.agent.util;

import static io.openlineage.spark.agent.lifecycle.ExecutionContext.CAMEL_TO_SNAKE_CASE;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import datahub.spark.conf.SparkLineageConf;
import io.datahubproject.openlineage.dataset.HdfsPathDataset;
import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.Versions;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.package$;
import org.apache.spark.rdd.HadoopRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.execution.datasources.FileScanRDD;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.PartialFunction;
import scala.runtime.AbstractPartialFunction;
import scala.PartialFunction$;

/**
* Utility functions for traversing a {@link
* org.apache.spark.sql.catalyst.plans.logical.LogicalPlan}.
*/
@Slf4j
@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
public class PlanUtils {

public static final String SLASH_DELIMITER_USER_PASSWORD_REGEX =
"[A-Za-z0-9_%]+//?[A-Za-z0-9_%]*@";
public static final String COLON_DELIMITER_USER_PASSWORD_REGEX =
"([/|,])[A-Za-z0-9_%]+:?[A-Za-z0-9_%]*@";

/**
* Merge a list of {@link PartialFunction}s and return the first value where the function is
* defined or empty list if no function matches the input.
*
* @param fns
* @param arg
* @param <T>
* @param <R>
* @return
*/
public static <T, R> Collection<R> applyAll(
List<? extends PartialFunction<T, ? extends Collection<R>>> fns, T arg) {
PartialFunction<T, Collection<R>> fn = merge(fns);
if (fn.isDefinedAt(arg)) {
return fn.apply(arg);
}
return Collections.emptyList();
}

/**
* Given a list of {@link PartialFunction}s merge to produce a single function that will test the
* input against each function one by one until a match is found or empty() is returned.
* input against each function one by one until a match is found or {@link
* PartialFunction$#empty()} is returned.
*
* @param fns
* @param <T>
* @param <D>
* @return
*/
public static <T, D> PartialFunction<T, Collection<D>> merge(
public static <T, D> OpenLineageAbstractPartialFunction<T, Collection<D>> merge(
Collection<? extends PartialFunction<T, ? extends Collection<D>>> fns) {
return new AbstractPartialFunction<T, Collection<D>>() {
return new OpenLineageAbstractPartialFunction<T, Collection<D>>() {
String appliedClassName;

@Override
public boolean isDefinedAt(T x) {
return fns.stream()
@@ -110,6 +86,7 @@ public Collection<D> apply(T x) {
x.getClass().getCanonicalName(),
collection);
}
appliedClassName = x.getClass().getName();
return collection;
} catch (RuntimeException | NoClassDefFoundError | NoSuchMethodError e) {
log.error("Apply failed:", e);
@@ -120,6 +97,11 @@ public Collection<D> apply(T x) {
.flatMap(Collection::stream)
.collect(Collectors.toList());
}

@Override
String appliedName() {
return appliedClassName;
}
};
}

@@ -204,12 +186,26 @@ public static OpenLineage.ParentRunFacet parentRunFacet(
.run(new OpenLineage.ParentRunFacetRunBuilder().runId(parentRunId).build())
.job(
new OpenLineage.ParentRunFacetJobBuilder()
.name(parentJob)
.name(parentJob.replaceAll(CAMEL_TO_SNAKE_CASE, "_$1").toLowerCase(Locale.ROOT))
.namespace(parentJobNamespace)
.build())
.build();
}

public static Path getDirectoryPathOl(Path p, Configuration hadoopConf) {
try {
if (p.getFileSystem(hadoopConf).getFileStatus(p).isFile()) {
return p.getParent();
} else {
return p;
}
} catch (IOException e) {
log.warn("Unable to get file system for path ", e);
return p;
}
}

// This method was replaced to support Datahub PathSpecs
public static Path getDirectoryPath(Path p, Configuration hadoopConf) {
SparkConf conf = SparkEnv.get().conf();
String propertiesString =
@@ -229,17 +225,6 @@ public static Path getDirectoryPath(Path p, Configuration hadoopConf) {
log.warn("Unable to convert path to hdfs path {} the exception was {}", p, e.getMessage());
return p;
}

// try {
// if (p.getFileSystem(hadoopConf).getFileStatus(p).isFile()) {
// return p.getParent();
// } else {
// return p;
// }
// } catch (IOException e) {
// log.warn("Unable to get file system for path ", e);
// return p;
// }
}

/**
@@ -251,36 +236,7 @@ public static Path getDirectoryPath(Path p, Configuration hadoopConf) {
*/
public static List<Path> findRDDPaths(List<RDD<?>> fileRdds) {
return fileRdds.stream()
.flatMap(
rdd -> {
if (rdd instanceof HadoopRDD) {
HadoopRDD hadoopRDD = (HadoopRDD) rdd;
Path[] inputPaths = FileInputFormat.getInputPaths(hadoopRDD.getJobConf());
Configuration hadoopConf = hadoopRDD.getConf();
return Arrays.stream(inputPaths)
.map(p -> PlanUtils.getDirectoryPath(p, hadoopConf));
} else if (rdd instanceof FileScanRDD) {
FileScanRDD fileScanRDD = (FileScanRDD) rdd;
return ScalaConversionUtils.fromSeq(fileScanRDD.filePartitions()).stream()
.flatMap(fp -> Arrays.stream(fp.files()))
.map(
f -> {
if (package$.MODULE$.SPARK_VERSION().compareTo("3.4") > 0) {
// filePath returns SparkPath for Spark 3.4
return ReflectionUtils.tryExecuteMethod(f, "filePath")
.map(o -> ReflectionUtils.tryExecuteMethod(o, "toPath"))
.map(o -> (Path) o.get())
.get()
.getParent();
} else {
return new Path(f.filePath()).getParent();
}
});
} else {
log.warn("Unknown RDD class {}", rdd.getClass().getCanonicalName());
return Stream.empty();
}
})
.flatMap(RddPathUtils::findRDDPaths)
.distinct()
.collect(Collectors.toList());
}
@@ -316,11 +272,11 @@ public static boolean safeIsDefinedAt(PartialFunction pfn, Object x) {
return false;
} catch (Exception e) {
if (e != null) {
log.debug("isDefinedAt method failed on {}", e);
log.info("isDefinedAt method failed on {}", e);
}
return false;
} catch (NoClassDefFoundError e) {
log.debug("isDefinedAt method failed on {}", e.getMessage());
log.info("isDefinedAt method failed on {}", e.getMessage());
return false;
}
}
@@ -331,6 +287,8 @@ public static boolean safeIsDefinedAt(PartialFunction pfn, Object x) {
* @param pfn
* @param x
* @return
* @param <T>
* @param <D>
*/
public static <T, D> List<T> safeApply(PartialFunction<D, List<T>> pfn, D x) {
try {
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.spark.agent.util;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import datahub.spark.conf.SparkAppContext;
import datahub.spark.conf.SparkConfigParser;
import io.datahubproject.openlineage.config.DatahubOpenlineageConfig;
import io.datahubproject.openlineage.dataset.HdfsPathDataset;
import io.openlineage.client.OpenLineage.InputDataset;
import io.openlineage.client.OpenLineage.OutputDataset;
import io.openlineage.spark.api.OpenLineageContext;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;

/**
* Utility class to handle removing path patterns in dataset names. Given a configured regex pattern
* with "remove" group defined, class methods run regex replacements on all the datasets available
* within the event
*/
@Slf4j
public class RemovePathPatternUtils {
public static final String REMOVE_PATTERN_GROUP = "remove";
public static final String SPARK_OPENLINEAGE_DATASET_REMOVE_PATH_PATTERN =
"spark.openlineage.dataset.removePath.pattern";

private static Optional<SparkConf> sparkConf = Optional.empty();

public static List<OutputDataset> removeOutputsPathPattern_ol(
OpenLineageContext context, List<OutputDataset> outputs) {
return getPattern(context)
.map(
pattern ->
outputs.stream()
.map(
dataset -> {
String newName = removePath(pattern, dataset.getName());
if (newName != dataset.getName()) {

Check warning on line 51 in metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java

GitHub Actions / qodana

String comparison using '==', instead of 'equals()'

String values are compared using `!=`, not 'equals()'
return context
.getOpenLineage()
.newOutputDatasetBuilder()
.name(removePath(pattern, dataset.getName()))
.namespace(dataset.getNamespace())
.facets(dataset.getFacets())
.outputFacets(dataset.getOutputFacets())
.build();
} else {
return dataset;
}
})
.collect(Collectors.toList()))
.orElse(outputs);
}

// This method was replaced to support Datahub PathSpecs
public static List<OutputDataset> removeOutputsPathPattern(
OpenLineageContext context, List<OutputDataset> outputs) {
return outputs.stream()
.map(
dataset -> {
String newName = removePathPattern(dataset.getName());
if (newName != dataset.getName()) {

Check warning on line 75 in metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java

GitHub Actions / qodana

String comparison using '==', instead of 'equals()'

String values are compared using `!=`, not 'equals()'
return context
.getOpenLineage()
.newOutputDatasetBuilder()
.name(newName)
.namespace(dataset.getNamespace())
.facets(dataset.getFacets())
.outputFacets(dataset.getOutputFacets())
.build();
} else {
return dataset;
}
})
.collect(Collectors.toList());
}

// This method was replaced to support Datahub PathSpecs
public static List<InputDataset> removeInputsPathPattern(
OpenLineageContext context, List<InputDataset> inputs) {
return inputs.stream()
.map(
dataset -> {
String newName = removePathPattern(dataset.getName());
if (newName != dataset.getName()) {

Check warning on line 98 in metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java

GitHub Actions / qodana

String comparison using '==', instead of 'equals()'

String values are compared using `!=`, not 'equals()'
return context
.getOpenLineage()
.newInputDatasetBuilder()
.name(newName)
.namespace(dataset.getNamespace())
.facets(dataset.getFacets())
.inputFacets(dataset.getInputFacets())
.build();
} else {
return dataset;
}
})
.collect(Collectors.toList());
}

private static Optional<Pattern> getPattern(OpenLineageContext context) {
return Optional.ofNullable(context.getSparkContext())
.map(sparkContext -> sparkContext.conf())
.filter(conf -> conf.contains(SPARK_OPENLINEAGE_DATASET_REMOVE_PATH_PATTERN))
.map(conf -> conf.get(SPARK_OPENLINEAGE_DATASET_REMOVE_PATH_PATTERN))
.map(pattern -> Pattern.compile(pattern));
}

private static String removePath(Pattern pattern, String name) {
return Optional.ofNullable(pattern.matcher(name))
.filter(matcher -> matcher.find())
.filter(
matcher -> {
try {
matcher.group(REMOVE_PATTERN_GROUP);
return true;
} catch (IllegalStateException | IllegalArgumentException e) {
return false;
}
})
.filter(matcher -> StringUtils.isNotEmpty(matcher.group(REMOVE_PATTERN_GROUP)))
.map(
matcher ->
name.substring(0, matcher.start(REMOVE_PATTERN_GROUP))
+ name.substring(matcher.end(REMOVE_PATTERN_GROUP), name.length()))
.orElse(name);
}

/**
* SparkConf does not change through job lifetime but it can get lost once session is closed. It's
* good to have it set in case of SPARK-29046
*/
private static Optional<SparkConf> loadSparkConf() {
if (!sparkConf.isPresent() && SparkSession.getDefaultSession().isDefined()) {
sparkConf = Optional.of(SparkSession.getDefaultSession().get().sparkContext().getConf());
}
return sparkConf;
}

private static String removePathPattern(String datasetName) {
// TODO: The reliance on global-mutable state here should be changed
// this led to problems in the PathUtilsTest class, where some tests interfered with others
log.info("Removing path pattern from dataset name {}", datasetName);
Optional<SparkConf> conf = loadSparkConf();
if (!conf.isPresent()) {
return datasetName;
}
try {
String propertiesString =
Arrays.stream(conf.get().getAllWithPrefix("spark.datahub."))
.map(tup -> tup._1 + "= \"" + tup._2 + "\"")
.collect(Collectors.joining("\n"));
Config datahubConfig = ConfigFactory.parseString(propertiesString);
DatahubOpenlineageConfig datahubOpenlineageConfig =
SparkConfigParser.sparkConfigToDatahubOpenlineageConf(
datahubConfig, new SparkAppContext());
HdfsPathDataset hdfsPath =
HdfsPathDataset.create(new URI(datasetName), datahubOpenlineageConfig);
log.debug("Transformed path is {}", hdfsPath.getDatasetPath());
return hdfsPath.getDatasetPath();
} catch (InstantiationException e) {
log.warn(
"Unable to convert dataset {} to path the exception was {}", datasetName, e.getMessage());
return datasetName;
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@
"inputs": [
{
"namespace": "file",
"name": "/Users/treff7es/shadow/spark-test/people.json",
"name": "/my_folder/spark-test/people.json",
"facets": {
"dataSource": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.2.2/integration/spark",
@@ -69,7 +69,7 @@
"outputs": [
{
"namespace": "file",
"name": "/Users/treff7es/shadow/spark-test/result",
"name": "/my_folder/shadow/spark-test/result",
"facets": {
"dataSource": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.2.2/integration/spark",
@@ -95,7 +95,7 @@
"inputFields": [
{
"namespace": "file",
"name": "/Users/treff7es/shadow/spark-test/people.json",
"name": "/my_folder/spark-test/people.json",
"field": "name"
}
]
4 changes: 2 additions & 2 deletions metadata-integration/java/spark-lineage/build.gradle
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ dependencies {

provided(externalDependency.sparkSql)
provided(externalDependency.sparkHive)
implementation externalDependency.httpAsyncClient
implementation externalDependency.httpClient

// Tests need a concrete log4j available. Providing it here
testImplementation 'org.apache.logging.log4j:log4j-api:2.17.1'
@@ -106,7 +106,7 @@ shadowJar {

relocate 'com.fasterxml.jackson', 'datahub.shaded.jackson'
relocate 'org.slf4j','datahub.shaded.org.slf4j'
relocate 'org.apache.http','datahub.spark2.shaded.http'
relocate 'org.apache.hc','datahub.spark2.shaded.http'
relocate 'org.apache.commons.codec', 'datahub.spark2.shaded.o.a.c.codec'
relocate 'org.apache.commons.compress', 'datahub.spark2.shaded.o.a.c.compress'
relocate 'org.apache.commons.io', 'datahub.spark2.shaded.o.a.c.io'

0 comments on commit d08f36f

Please sign in to comment.