Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(java datahub-client): add Java REST emitter #3781

Merged
merged 31 commits into from
Jan 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b7c8d52
feat(spark-lineage): add ability to push data lineage from spark to d…
MugdhaHardikar-GSLab Dec 3, 2021
beb7fcb
Merge branch 'linkedin:master' into master
MugdhaHardikar-GSLab Dec 8, 2021
16f7ef7
fix(spark-lineage): Adding missing file in commit
MugdhaHardikar-GSLab Dec 8, 2021
f8054cb
docs(spark lineage): modify to fit under "Lineage" category
MugdhaHardikar-GSLab Dec 8, 2021
11fdcdd
docs(spark lineage): add docs to "Lineage" category
MugdhaHardikar-GSLab Dec 8, 2021
7c36b33
fix(spark-lineage): handle absence of execution context gracefully
MugdhaHardikar-GSLab Dec 13, 2021
38c36e2
fixing lint
shirshanka Dec 13, 2021
b8b12d9
docs(spark-lineage): Update artifact name and version
MugdhaHardikar-GSLab Dec 16, 2021
60031cd
resolved merge conflict sidebar.js
MugdhaHardikar-GSLab Dec 16, 2021
c15a978
Merge branch 'linkedin-master'
MugdhaHardikar-GSLab Dec 16, 2021
cbaa4bd
resolved pull conflict
MugdhaHardikar-GSLab Dec 16, 2021
fea4204
Merge branch 'linkedin:master' into master
MugdhaHardikar-GSLab Dec 20, 2021
890f761
feat(java datahub-client): add Java REST emitter
MugdhaHardikar-GSLab Dec 21, 2021
a508aa1
refactor(spark-lineage): use of datahub client for REST based gms com…
MugdhaHardikar-GSLab Dec 21, 2021
bd67427
fix(spark-lineage): modify gms server url config name
MugdhaHardikar-GSLab Dec 21, 2021
1f30a58
fix(datahub-client) : fix readtimeout setter
MugdhaHardikar-GSLab Dec 21, 2021
515003b
Merge branch 'linkedin:master' into master
MugdhaHardikar-GSLab Dec 21, 2021
0a4b981
fix(datahub-client): remove unused imports causing check fail
MugdhaHardikar-GSLab Dec 21, 2021
251ac84
Merge branch 'master' of https://github.com/MugdhaHardikar-GSLab/datahub
MugdhaHardikar-GSLab Dec 21, 2021
61c0c62
refactor(datahub-client): rename RESTEmitter RestEmitter
MugdhaHardikar-GSLab Dec 22, 2021
a7878f6
simplifying serialization, using http-client, adding one test
shirshanka Dec 23, 2021
752adad
refactoring interfaces, adding async and callback
shirshanka Dec 29, 2021
b6d350b
fix checkstyle
shirshanka Dec 29, 2021
62e3e44
simplify tests. move from wiremock to mockserver
shirshanka Dec 29, 2021
aa5b356
fix checkstyle and timing tests
shirshanka Dec 29, 2021
fb35e0e
fix checkstyle
shirshanka Dec 29, 2021
a2cbb9f
more simplifications
shirshanka Dec 30, 2021
0d53dd1
refactors
shirshanka Dec 31, 2021
6d90828
fix checkstyle
shirshanka Dec 31, 2021
e5bde3b
javadocs
shirshanka Jan 2, 2022
a73866a
removing caCertifatePath, will implement in a future release
shirshanka Jan 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ project.ext.externalDependency = [
'hadoopMapreduceClient':'org.apache.hadoop:hadoop-mapreduce-client-core:2.7.2',
'hibernateCore': 'org.hibernate:hibernate-core:5.2.16.Final',
'httpClient': 'org.apache.httpcomponents:httpclient:4.5.9',
'httpAsyncClient': 'org.apache.httpcomponents:httpasyncclient:4.1.5',
'iStackCommons': 'com.sun.istack:istack-commons-runtime:4.0.1',
'jacksonCore': 'com.fasterxml.jackson.core:jackson-core:2.9.10',
'jacksonDataBind': 'com.fasterxml.jackson.core:jackson-databind:2.9.10.7',
Expand All @@ -94,6 +95,8 @@ project.ext.externalDependency = [
'mavenArtifact': "org.apache.maven:maven-artifact:$mavenVersion",
'mockito': 'org.mockito:mockito-core:3.0.0',
'mockitoInline': 'org.mockito:mockito-inline:3.0.0',
'mockServer': 'org.mock-server:mockserver-netty:5.11.2',
'mockServerClient': 'org.mock-server:mockserver-client-java:5.11.2',
'mysqlConnector': 'mysql:mysql-connector-java:8.0.20',
'neo4jHarness': 'org.neo4j.test:neo4j-harness:3.4.11',
'neo4jJavaDriver': 'org.neo4j.driver:neo4j-java-driver:4.0.1',
Expand Down
6 changes: 6 additions & 0 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ module.exports = {
{
Sinks: list_ids_in_directory("metadata-ingestion/sink_docs"),
},
{
"Custom Integrations": [
"metadata-ingestion/as-a-library",
"metadata-integration/java/as-a-library",
],
},
{
Scheduling: [
"metadata-ingestion/schedule_docs/intro",
Expand Down
5 changes: 1 addition & 4 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,7 @@ Check out the [transformers guide](./transformers.md) for more info!

## Using as a library

In some cases, you might want to construct the MetadataChangeEvents yourself but still use this framework to emit that metadata to DataHub. In this case, take a look at the emitter interfaces, which can easily be imported and called from your own code.

- [DataHub emitter via REST](./src/datahub/emitter/rest_emitter.py) (same requirements as `datahub-rest`).
- [DataHub emitter via Kafka](./src/datahub/emitter/kafka_emitter.py) (same requirements as `datahub-kafka`).
In some cases, you might want to construct Metadata events directly and use programmatic ways to emit that metadata to DataHub. In this case, take a look at the [Python emitter](./as-a-library.md) and the [Java emitter](../metadata-integration/java/as-a-library.md) libraries which can be called from your own code.

### Programmatic Pipeline
In some cases, you might want to configure and run a pipeline entirely from within your custom python script. Here is an example of how to do it.
Expand Down
128 changes: 128 additions & 0 deletions metadata-ingestion/as-a-library.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Python Emitter

In some cases, you might want to construct Metadata events directly and use programmatic ways to emit that metadata to DataHub. Use-cases are typically push-based and include emitting metadata events from CI/CD pipelines, custom orchestrators etc.

The `acryl-datahub` Python package offers REST and Kafka emitter API-s, which can easily be imported and called from your own code.

## Installation

Follow the installation guide for the main `acryl-datahub` package [here](./README.md#install-from-pypi). Read on for emitter specific installation instructions.
## REST Emitter

The REST emitter is a thin wrapper on top of the `requests` module and offers a blocking interface for sending metadata events over HTTP. Use this when simplicity and acknowledgement of metadata being persisted to DataHub's metadata store is more important than throughput of metadata emission. Also use this when read-after-write scenarios exist, e.g. writing metadata and then immediately reading it back.

### Installation

```console
pip install -U `acryl-datahub[datahub-rest]`
```

### Example Usage
```python
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import ChangeTypeClass, DatasetPropertiesClass

from datahub.emitter.rest_emitter import DatahubRestEmitter

# Create an emitter to DataHub over REST
emitter = DatahubRestEmitter(gms_server="http://localhost:8080", extra_headers={})

# Test the connection
emitter.test_connection()

# Construct a dataset properties object
dataset_properties = DatasetPropertiesClass(description="This table stored the canonical User profile",
customProperties={
"governance": "ENABLED"
})

# Construct a MetadataChangeProposalWrapper object.
metadata_event = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_dataset_urn("bigquery", "my-project.my-dataset.user-table"),
aspectName="datasetProperties",
aspect=dataset_properties,
)

# Emit metadata! This is a blocking call
emitter.emit(metadata_event)
```

Other examples:
- [lineage_emitter_mcpw_rest.py](./examples/library/lineage_emitter_mcpw_rest.py) - emits simple bigquery table-to-table (dataset-to-dataset) lineage via REST as MetadataChangeProposalWrapper.

### Emitter Code

If you're interested in looking at the REST emitter code, it is available [here](./src/datahub/emitter/rest_emitter.py)

## Kafka Emitter

The Kafka emitter is a thin wrapper on top of the SerializingProducer class from `confluent-kafka` and offers a non-blocking interface for sending metadata events to DataHub. Use this when you want to decouple your metadata producer from the uptime of your datahub metadata server by utilizing Kafka as a highly available message bus. For example, if your DataHub metadata service is down due to planned or unplanned outages, you can still continue to collect metadata from your mission critical systems by sending it to Kafka. Also use this emitter when throughput of metadata emission is more important than acknowledgement of metadata being persisted to DataHub's backend store.

**_Note_**: The Kafka emitter uses Avro to serialize the Metadata events to Kafka. Changing the serializer will result in unprocessable events as DataHub currently expects the metadata events over Kafka to be serialized in Avro.

### Installation

```console
# For emission over Kafka
pip install -U `acryl-datahub[datahub-kafka]`
```


### Example Usage
```python
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import ChangeTypeClass, DatasetPropertiesClass

from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
# Create an emitter to Kafka
kafka_config = {
"connection": {
"bootstrap": "localhost:9092",
"schema_registry_url": "http://localhost:8081",
"schema_registry_config": {}, # schema_registry configs passed to underlying schema registry client
"producer_config": {}, # extra producer configs passed to underlying kafka producer
}
}

emitter = DatahubKafkaEmitter(
KafkaEmitterConfig.parse_obj(kafka_config)
)

# Construct a dataset properties object
dataset_properties = DatasetPropertiesClass(description="This table stored the canonical User profile",
customProperties={
"governance": "ENABLED"
})

# Construct a MetadataChangeProposalWrapper object.
metadata_event = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_dataset_urn("bigquery", "my-project.my-dataset.user-table"),
aspectName="datasetProperties",
aspect=dataset_properties,
)


# Emit metadata! This is a non-blocking call
emitter.emit(
metadata_event,
callback=lambda exc, message: print(f"Message sent to topic:{message.topic()}, partition:{message.partition()}, offset:{message.offset()}") if message else print(f"Failed to send with: {exc}")
)

#Send all pending events
emitter.flush()
```

### Emitter Code
If you're interested in looking at the Kafka emitter code, it is available [here](./src/datahub/emitter/kafka_emitter.py)

## Other Languages

Emitter API-s are also supported for:
- [Java](../metadata-integration/java/as-a-library.md)

2 changes: 1 addition & 1 deletion metadata-ingestion/transformers.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Using transformers
# Transformers

## What’s a transformer?

Expand Down
112 changes: 112 additions & 0 deletions metadata-integration/java/as-a-library.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Java Emitter

In some cases, you might want to construct Metadata events directly and use programmatic ways to emit that metadata to DataHub. Use-cases are typically push-based and include emitting metadata events from CI/CD pipelines, custom orchestrators etc.

The [`io.acryl:datahub-client`](https://mvnrepository.com/artifact/io.acryl/datahub-client) Java package offers REST emitter API-s, which can be easily used to emit metadata from your JVM-based systems. For example, the Spark lineage integration uses the Java emitter to emit metadata events from Spark jobs.


## Installation

Follow the specific instructions for your build system to declare a dependency on the appropriate version of the package.

**_Note_**: Check the [Maven repository](https://mvnrepository.com/artifact/io.acryl/datahub-client) for the latest version of the package before following the instructions below.

### Gradle
Add the following to your build.gradle.
```gradle
implementation 'io.acryl:datahub-client:0.0.1'
```
### Maven
Add the following to your `pom.xml`.
```xml
<!-- https://mvnrepository.com/artifact/io.acryl/datahub-client -->
<dependency>
<groupId>io.acryl</groupId>
<artifactId>datahub-client</artifactId>
<!-- replace with the latest version number -->
<version>0.0.1</version>
</dependency>
```

## REST Emitter

The REST emitter is a thin wrapper on top of the [`Apache HttpClient`](https://hc.apache.org/httpcomponents-client-4.5.x/index.html) library. It supports non-blocking emission of metadata and handles the details of JSON serialization of metadata aspects over the wire.

Constructing a REST Emitter follows a lambda-based fluent builder pattern. The config parameters mirror the Python emitter [configuration](../../metadata-ingestion/sink_docs/datahub.md#config-details) for the most part. In addition, you can also customize the HttpClient that is constructed under the hood by passing in customizations to the HttpClient builder.
```java
import datahub.client.rest.RestEmitter;
//...
RestEmitter emitter = RestEmitter.create(b -> b
.server("http://localhost:8080")
//Auth token for Managed DataHub .token(AUTH_TOKEN_IF_NEEDED)
//Override default timeout of 10 seconds .timeoutSec(OVERRIDE_DEFAULT_TIMEOUT_IN_SECONDS)
//Add additional headers .extraHeaders(Collections.singletonMap("Session-token", "MY_SESSION"))
// Customize HttpClient's connection ttl .customizeHttpAsyncClient(c -> c.setConnectionTimeToLive(30, TimeUnit.SECONDS))
);
```

### Usage

```java
import com.linkedin.dataset.DatasetProperties;
import com.linkedin.events.metadata.ChangeType;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.client.rest.RestEmitter;
import datahub.client.Callback;
// ... followed by

// Creates the emitter with the default coordinates and settings
RestEmitter emitter = RestEmitter.createWithDefaults();

MetadataChangeProposalWrapper mcpw = MetadataChangeProposalWrapper.builder()
.entityType("dataset")
.changeType(ChangeType.UPSERT)
.aspect(new DatasetProperties().setDescription("This is the canonical User profile dataset"))
.entityUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-project.my-dataset.user-table,PROD)")
.build();

// Blocking call using future
Future<MetadataWriteResponse> requestFuture = emitter.emit(mcpw, null).get();

// Non-blocking using callback
emitter.emit(mcpw, new Callback() {
@Override
public void onCompletion(MetadataWriteResponse response) {
if (response.isSuccess()) {
System.out.println(String.format("Successfully emitted metadata event for %s", mcpw.getEntityUrn()));
} else {
// Get the underlying http response
HttpResponse httpResponse = (HttpResponse) response.getUnderlyingResponse();
System.out.println(String.format("Failed to emit metadata event for %s, aspect: %s with status code: %d",
mcpw.getEntityUrn(), mcpw.getAspectName(), httpResponse.getStatusLine().getStatusCode()));
// Print the server side exception if it was captured
if (response.getServerException() != null) {
System.out.println(String.format("Server side exception was %s", response.getServerException()));
}
}
}

@Override
public void onFailure(Throwable exception) {
System.out.println(
String.format("Failed to emit metadata event for %s, aspect: %s due to %s", mcpw.getEntityUrn(),
mcpw.getAspectName(), exception.getMessage()));
}
});
```

### Emitter Code

If you're interested in looking at the REST emitter code, it is available [here](./datahub-client/src/main/java/datahub/client/rest/RestEmitter.java).

## Kafka Emitter

The Java package doesn't currently support a Kafka emitter, but this will be available shortly.


## Other Languages

Emitter API-s are also supported for:
- [Python](../../metadata-ingestion/as-a-library.md)


27 changes: 27 additions & 0 deletions metadata-integration/java/datahub-client/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
apply plugin: 'java'
apply plugin: 'com.github.johnrengelman.shadow'

dependencies {

compile project(':metadata-models')
compile externalDependency.httpAsyncClient
compile externalDependency.jacksonDataBind
compileOnly externalDependency.lombok
annotationProcessor externalDependency.lombok
testCompile externalDependency.mockito
testCompile externalDependency.mockServer
testCompile externalDependency.mockServerClient
}

test {
useJUnit()
}

shadowJar {
zip64=true
classifier=''
}

assemble {
dependsOn shadowJar
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package datahub.client;

import javax.annotation.Nullable;


public interface Callback {

/**
* Called when the client request has completed.
* Completion does not imply success. Inspect the response object to understand if
* this was a successfully processed request or not.
* @param response
*/
void onCompletion(@Nullable MetadataWriteResponse response);

/**
* Called when the client request has thrown an exception before completion.
* @param exception
*/
void onFailure(Throwable exception);

}
Loading