Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Apr 8, 2024
2 parents a379d35 + b6c4d9c commit 2f29957
Show file tree
Hide file tree
Showing 32 changed files with 507 additions and 129 deletions.
92 changes: 57 additions & 35 deletions .github/workflows/docker-unified.yml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ project.ext.externalDependency = [
'jline':'jline:jline:1.4.1',
'jetbrains':' org.jetbrains.kotlin:kotlin-stdlib:1.6.0',
'annotationApi': 'javax.annotation:javax.annotation-api:1.3.2',
'classGraph': 'io.github.classgraph:classgraph:4.8.165',
'classGraph': 'io.github.classgraph:classgraph:4.8.168',
]

allprojects {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,7 @@ public CompletableFuture<SearchResults> get(final DataFetchingEnvironment enviro
.getFilters()
.forEach(
filter -> {
criteria.add(
new Criterion()
.setField(filter.getField())
.setValue(filter.getValue()));
criteria.add(criterionFromFilter(filter, true));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ private SearchUtils() {}
EntityType.CORP_USER,
EntityType.CORP_GROUP,
EntityType.NOTEBOOK,
EntityType.DATA_PRODUCT);
EntityType.DATA_PRODUCT,
EntityType.DOMAIN);

/** Entities that are part of browse by default */
public static final List<EntityType> BROWSE_ENTITY_TYPES =
Expand Down
6 changes: 5 additions & 1 deletion docs-website/docusaurus.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ module.exports = {
announcementBar: {
id: "announcement",
content:
'<div><img src="/img/acryl-logo-white-mark.svg" /><p><strong>Managed DataHub</strong><span> &nbsp;Acryl Data delivers an easy to consume DataHub platform for the enterprise</span></p></div> <a href="https://www.acryldata.io/datahub-sign-up?utm_source=datahub&utm_medium=referral&utm_campaign=acryl_signup" target="_blank" class="button button--primary">Sign up for Managed DataHub&nbsp;→</a>',
'<div><img src="/img/acryl-logo-white-mark.svg" /><p><strong>Acryl DataHub</strong><span> &nbsp;Acryl Data delivers an easy to consume DataHub platform for the enterprise</span></p></div> <a href="https://www.acryldata.io?utm_source=datahub&utm_medium=referral&utm_campaign=acryl_signup" target="_blank" class="button button--primary">Learn about Acryl DataHub&nbsp;→</a>',
backgroundColor: "#070707",
textColor: "#ffffff",
isCloseable: false,
Expand Down Expand Up @@ -86,6 +86,10 @@ module.exports = {
to: "/slack",
label: "Join Slack",
},
{
href: "https://forum.datahubproject.io/",
label: "Community Forum",
},
{
to: "/events",
label: "Events",
Expand Down
8 changes: 5 additions & 3 deletions docs-website/src/components/Feedback/styles.module.scss
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@
}

.feedbackButton {
width: 2rem;
height: 2rem;
width: 2.25rem;
height: 2.25rem;
text-align: center;
font-size: 1.25rem;
padding: 0.25rem;
padding: 0.3rem;
border-radius: 1000em;
border: None;
background: None;
margin-left: 1rem;
cursor: pointer;
transition: all 0.2s ease-in-out;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;

@Slf4j
public class PluginFactory {
Expand Down Expand Up @@ -68,6 +69,7 @@ public PluginFactory(
@Nullable PluginConfiguration pluginConfiguration, @Nonnull List<ClassLoader> classLoaders) {
this.classGraph =
new ClassGraph()
.acceptPackages(ArrayUtils.addAll(HOOK_PACKAGES, VALIDATOR_PACKAGES))
.enableRemoteJarScanning()
.enableExternalClasses()
.enableClassInfo()
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion-modules/airflow-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ task cleanPythonCache(type: Exec) {
commandLine 'bash', '-c',
"find src -type f -name '*.py[co]' -delete -o -type d -name __pycache__ -delete -o -type d -empty -delete"
}
task buildWheel(type: Exec, dependsOn: [environmentSetup, cleanPythonCache]) {
task buildWheel(type: Exec, dependsOn: [environmentSetup]) {
commandLine 'bash', '-c',
"source ${venv_name}/bin/activate && set -x && " +
'uv pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_TEST=1 RELEASE_SKIP_INSTALL=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh'
'uv pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_INSTALL=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh'
}

build.dependsOn install
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash
set -euxo pipefail

if [[ ! ${RELEASE_SKIP_TEST:-} ]]; then
if [[ ! ${RELEASE_SKIP_TEST:-} ]] && [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then
../../gradlew build # also runs tests
elif [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then
../../gradlew install
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import datahub.emitter.mce_builder as builder
from datahub.api.entities.datajob import DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
FineGrainedLineageClass,
FineGrainedLineageDownstreamTypeClass,
FineGrainedLineageUpstreamTypeClass,
StatusClass,
)
from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult
from datahub.telemetry import telemetry
Expand Down Expand Up @@ -492,6 +494,16 @@ def on_dag_start(self, dag_run: "DagRun") -> None:
)
dataflow.emit(self.emitter, callback=self._make_emit_callback())

# emit tags
for tag in dataflow.tags:
tag_urn = builder.make_tag_urn(tag)

event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityUrn=tag_urn, aspect=StatusClass(removed=False)
)

self.emitter.emit(event)

if HAS_AIRFLOW_DAG_LISTENER_API:

@hookimpl
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion-modules/dagster-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ task testFull(type: Exec, dependsOn: [testQuick, installDevTest]) {
}
task buildWheel(type: Exec, dependsOn: [environmentSetup]) {
commandLine 'bash', '-c', "source ${venv_name}/bin/activate && " +
'uv pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_TEST=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh'
'uv pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_INSTALL=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh'
}

task cleanPythonCache(type: Exec) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash
set -euxo pipefail

if [[ ! ${RELEASE_SKIP_TEST:-} ]]; then
if [[ ! ${RELEASE_SKIP_TEST:-} ]] && [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then
../../gradlew build # also runs tests
elif [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then
../../gradlew install
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ task cleanPythonCache(type: Exec) {
task buildWheel(type: Exec, dependsOn: [install, codegen, cleanPythonCache]) {
commandLine 'bash', '-c',
"source ${venv_name}/bin/activate && " +
'uv pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_TEST=1 RELEASE_SKIP_INSTALL=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh'
'uv pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_INSTALL=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh'
}

build.dependsOn install
Expand Down
14 changes: 7 additions & 7 deletions metadata-ingestion/docs/sources/datahub/datahub_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ source:
stateful_ingestion:
enabled: true
ignore_old_state: true
urn_pattern: # URN pattern to ignore/include in the ingestion
deny:
# Ignores all datahub metadata where the urn matches the regex
- ^denied.urn.*
allow:
# Ingests all datahub metadata where the urn matches the regex.
- ^allowed.urn.*
urn_pattern: # URN pattern to ignore/include in the ingestion
deny:
# Ignores all datahub metadata where the urn matches the regex
- ^denied.urn.*
allow:
# Ingests all datahub metadata where the urn matches the regex.
- ^allowed.urn.*
```
#### Limitations
Expand Down
6 changes: 3 additions & 3 deletions metadata-ingestion/docs/sources/datahub/datahub_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ source:
stateful_ingestion:
enabled: true
ignore_old_state: false
extractor_config:
set_system_metadata: false # Replicate system metadata
urn_pattern:
urn_pattern:
deny:
# Ignores all datahub metadata where the urn matches the regex
- ^denied.urn.*
allow:
# Ingests all datahub metadata where the urn matches the regex.
- ^allowed.urn.*
extractor_config:
set_system_metadata: false # Replicate system metadata

# Here, we write to a DataHub instance
# You can also use a different sink, e.g. to write the data to a file instead
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/scripts/release.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash
set -euxo pipefail

if [[ ! ${RELEASE_SKIP_TEST:-} ]]; then
if [[ ! ${RELEASE_SKIP_TEST:-} ]] && [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then
../gradlew build # also runs tests
elif [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then
../gradlew install
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@

from datahub.cli.cli_utils import get_url_and_token
from datahub.configuration import config_loader
from datahub.configuration.common import (
AllowDenyPattern,
ConfigModel,
DynamicTypedConfig,
)
from datahub.configuration.common import ConfigModel, DynamicTypedConfig
from datahub.ingestion.graph.client import DatahubClientConfig
from datahub.ingestion.sink.file import FileSinkConfig

Expand All @@ -25,7 +21,6 @@
class SourceConfig(DynamicTypedConfig):
extractor: str = "generic"
extractor_config: dict = Field(default_factory=dict)
urn_pattern: AllowDenyPattern = Field(default=AllowDenyPattern())


class ReporterConfig(DynamicTypedConfig):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def get_table_display_name(self) -> str:
- removes shard suffix (table_yyyymmdd-> table)
- removes wildcard part (table_yyyy* -> table)
- remove time decorator (table@1624046611000 -> table)
- removes partition ids (table$20210101 -> table or table$__UNPARTITIONED__ -> table)
"""
# if table name ends in _* or * or _yyyy* or _yyyymm* then we strip it as that represents a query on a sharded table
shortened_table_name = re.sub(self._BIGQUERY_WILDCARD_REGEX, "", self.table)
Expand All @@ -103,6 +104,12 @@ def get_table_display_name(self) -> str:
f"Found table snapshot. Using {shortened_table_name} as the table name."
)

if "$" in shortened_table_name:
shortened_table_name = shortened_table_name.split("$", maxsplit=1)[0]
logger.debug(
f"Found partitioned table. Using {shortened_table_name} as the table name."
)

table_name, _ = self.get_table_and_shard(shortened_table_name)
return table_name or self.dataset

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
UpstreamClass,
UpstreamLineageClass,
)
from datahub.specific.dataset import DatasetPatchBuilder
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult, sqlglot_lineage
from datahub.utilities import memory_footprint
Expand Down Expand Up @@ -451,29 +450,15 @@ def gen_lineage(
return

if upstream_lineage is not None:
if self.config.incremental_lineage:
patch_builder: DatasetPatchBuilder = DatasetPatchBuilder(
urn=dataset_urn
)
for upstream in upstream_lineage.upstreams:
patch_builder.add_upstream_lineage(upstream)

yield from [
MetadataWorkUnit(
id=f"upstreamLineage-for-{dataset_urn}",
mcp_raw=mcp,
)
for mcp in patch_builder.build()
]
else:
if not self.config.extract_column_lineage:
upstream_lineage.fineGrainedLineages = None

yield from [
MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=upstream_lineage
).as_workunit()
]
if not self.config.extract_column_lineage:
upstream_lineage.fineGrainedLineages = None

# Incremental lineage is handled by the auto_incremental_lineage helper.
yield from [
MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=upstream_lineage
).as_workunit()
]

def lineage_via_catalog_lineage_api(
self, project_id: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from pydantic import Field, root_validator

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.ingestion.source.sql.sql_config import SQLAlchemyConnectionConfig
from datahub.ingestion.source.state.stateful_ingestion_base import (
Expand Down Expand Up @@ -80,6 +81,8 @@ class DataHubSourceConfig(StatefulIngestionConfigBase):
hidden_from_docs=True,
)

urn_pattern: AllowDenyPattern = Field(default=AllowDenyPattern())

@root_validator(skip_on_failure=True)
def check_ingesting_data(cls, values):
if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,9 @@ def __init__(self, config: DataHubSourceConfig, ctx: PipelineContext):
super().__init__(config, ctx)
self.config = config

if (
ctx.pipeline_config
and ctx.pipeline_config.source
and ctx.pipeline_config.source.urn_pattern
):
self.urn_pattern = ctx.pipeline_config.source.urn_pattern
if self.config.urn_pattern:
self.urn_pattern = self.config.urn_pattern

self.report: DataHubSourceReport = DataHubSourceReport()
self.stateful_ingestion_handler = StatefulDataHubIngestionHandler(self)

Expand Down
20 changes: 8 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging
import re
from datetime import datetime, timezone
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse

Expand Down Expand Up @@ -208,7 +208,7 @@ def extract_dbt_entities(
max_loaded_at_str = sources_by_id.get(key, {}).get("max_loaded_at")
max_loaded_at = None
if max_loaded_at_str:
max_loaded_at = dateutil.parser.parse(max_loaded_at_str)
max_loaded_at = parse_dbt_timestamp(max_loaded_at_str)

test_info = None
if manifest_node.get("resource_type") == "test":
Expand Down Expand Up @@ -284,10 +284,8 @@ def extract_dbt_entities(
return dbt_entities


def _parse_dbt_timestamp(timestamp: str) -> datetime:
return datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ").replace(
tzinfo=timezone.utc
)
def parse_dbt_timestamp(timestamp: str) -> datetime:
return dateutil.parser.parse(timestamp)


class DBTRunTiming(BaseModel):
Expand Down Expand Up @@ -338,11 +336,9 @@ def _parse_test_result(

execution_timestamp = run_result.timing_map.get("execute")
if execution_timestamp and execution_timestamp.started_at:
execution_timestamp_parsed = _parse_dbt_timestamp(
execution_timestamp.started_at
)
execution_timestamp_parsed = parse_dbt_timestamp(execution_timestamp.started_at)
else:
execution_timestamp_parsed = _parse_dbt_timestamp(dbt_metadata.generated_at)
execution_timestamp_parsed = parse_dbt_timestamp(dbt_metadata.generated_at)

return DBTTestResult(
invocation_id=dbt_metadata.invocation_id,
Expand All @@ -369,8 +365,8 @@ def _parse_model_run(
return DBTModelPerformance(
run_id=dbt_metadata.invocation_id,
status=status,
start_time=_parse_dbt_timestamp(execution_timestamp.started_at),
end_time=_parse_dbt_timestamp(execution_timestamp.completed_at),
start_time=parse_dbt_timestamp(execution_timestamp.started_at),
end_time=parse_dbt_timestamp(execution_timestamp.completed_at),
)


Expand Down
Loading

0 comments on commit 2f29957

Please sign in to comment.