From 7e5610f358a5f7ffcef23c90fdd1740c1381d1bd Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Mon, 25 Mar 2024 13:28:35 +0100 Subject: [PATCH] feat(ingest/dagster): Dagster source (#10071) Co-authored-by: shubhamjagtap639 --- .github/workflows/build-and-test.yml | 2 + .github/workflows/dagster-plugin.yml | 85 +++ .github/workflows/test-results.yml | 2 +- docs-website/build.gradle | 10 +- docs-website/generateDocsDir.ts | 1 + docs-website/sidebars.js | 6 + docs/lineage/dagster.md | 89 ++++ .../dagster-plugin/.gitignore | 143 +++++ .../dagster-plugin/README.md | 4 + .../dagster-plugin/build.gradle | 131 +++++ .../examples/advanced_ops_jobs.py | 106 ++++ .../dagster-plugin/examples/assets_job.py | 63 +++ .../dagster-plugin/examples/basic_setup.py | 20 + .../dagster-plugin/examples/ops_job.py | 51 ++ .../dagster-plugin/pyproject.toml | 19 + .../dagster-plugin/scripts/release.sh | 26 + .../dagster-plugin/setup.cfg | 73 +++ .../dagster-plugin/setup.py | 136 +++++ .../src/datahub_dagster_plugin/__init__.py | 21 + .../datahub_dagster_plugin/client/__init__.py | 0 .../client/dagster_generator.py | 504 ++++++++++++++++++ .../datahub_dagster_plugin.py | 2 + .../sensors/__init__.py | 0 .../sensors/datahub_sensors.py | 439 +++++++++++++++ .../integration/integration_test_dummy.py | 2 + .../dagster-plugin/tests/unit/test_dagster.py | 303 +++++++++++ .../dagster-plugin/tests/unit/test_dummy.py | 2 + metadata-ingestion/developing.md | 9 + .../source/data_lake_common/path_spec.py | 1 + .../local/golden_mces_single_file.json | 162 ++++-- settings.gradle | 1 + 31 files changed, 2357 insertions(+), 56 deletions(-) create mode 100644 .github/workflows/dagster-plugin.yml create mode 100644 docs/lineage/dagster.md create mode 100644 metadata-ingestion-modules/dagster-plugin/.gitignore create mode 100644 metadata-ingestion-modules/dagster-plugin/README.md create mode 100644 metadata-ingestion-modules/dagster-plugin/build.gradle create mode 100644 metadata-ingestion-modules/dagster-plugin/examples/advanced_ops_jobs.py create mode 100644 metadata-ingestion-modules/dagster-plugin/examples/assets_job.py create mode 100644 metadata-ingestion-modules/dagster-plugin/examples/basic_setup.py create mode 100644 metadata-ingestion-modules/dagster-plugin/examples/ops_job.py create mode 100644 metadata-ingestion-modules/dagster-plugin/pyproject.toml create mode 100755 metadata-ingestion-modules/dagster-plugin/scripts/release.sh create mode 100644 metadata-ingestion-modules/dagster-plugin/setup.cfg create mode 100644 metadata-ingestion-modules/dagster-plugin/setup.py create mode 100644 metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/__init__.py create mode 100644 metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/__init__.py create mode 100644 metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/dagster_generator.py create mode 100644 metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/datahub_dagster_plugin.py create mode 100644 metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/sensors/__init__.py create mode 100644 metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/sensors/datahub_sensors.py create mode 100644 metadata-ingestion-modules/dagster-plugin/tests/integration/integration_test_dummy.py create mode 100644 metadata-ingestion-modules/dagster-plugin/tests/unit/test_dagster.py create mode 100644 metadata-ingestion-modules/dagster-plugin/tests/unit/test_dummy.py diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index df223e3603e1e..837838352c8fd 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -84,6 +84,8 @@ jobs: -x :metadata-io:test \ -x :metadata-ingestion-modules:airflow-plugin:build \ -x :metadata-ingestion-modules:airflow-plugin:check \ + -x :metadata-ingestion-modules:dagster-plugin:build \ + -x :metadata-ingestion-modules:dagster-plugin:check \ -x :datahub-frontend:build \ -x :datahub-web-react:build \ --parallel diff --git a/.github/workflows/dagster-plugin.yml b/.github/workflows/dagster-plugin.yml new file mode 100644 index 0000000000000..48f1b24196c9e --- /dev/null +++ b/.github/workflows/dagster-plugin.yml @@ -0,0 +1,85 @@ +name: Dagster Plugin +on: + push: + branches: + - master + paths: + - ".github/workflows/dagster-plugin.yml" + - "metadata-ingestion-modules/dagster-plugin/**" + - "metadata-ingestion/**" + - "metadata-models/**" + pull_request: + branches: + - master + paths: + - ".github/**" + - "metadata-ingestion-modules/dagster-plugin/**" + - "metadata-ingestion/**" + - "metadata-models/**" + release: + types: [published] + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + dagster-plugin: + runs-on: ubuntu-latest + env: + SPARK_VERSION: 3.0.3 + DATAHUB_TELEMETRY_ENABLED: false + strategy: + matrix: + python-version: ["3.8", "3.10"] + include: + - python-version: "3.8" + extraPythonRequirement: "dagster>=1.3.3" + - python-version: "3.10" + extraPythonRequirement: "dagster>=1.3.3" + fail-fast: false + steps: + - name: Set up JDK 17 + uses: actions/setup-java@v3 + with: + distribution: "zulu" + java-version: 17 + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + cache: "pip" + - name: Install dependencies + run: ./metadata-ingestion/scripts/install_deps.sh + - name: Install dagster package and test (extras ${{ matrix.extraPythonRequirement }}) + run: ./gradlew -Pextra_pip_requirements='${{ matrix.extraPythonRequirement }}' :metadata-ingestion-modules:dagster-plugin:lint :metadata-ingestion-modules:dagster-plugin:testQuick + - name: pip freeze show list installed + if: always() + run: source metadata-ingestion-modules/dagster-plugin/venv/bin/activate && pip freeze + - uses: actions/upload-artifact@v3 + if: ${{ always() && matrix.python-version == '3.10' && matrix.extraPythonRequirement == 'dagster>=1.3.3' }} + with: + name: Test Results (dagster Plugin ${{ matrix.python-version}}) + path: | + **/build/reports/tests/test/** + **/build/test-results/test/** + **/junit.*.xml + - name: Upload coverage to Codecov + if: always() + uses: codecov/codecov-action@v3 + with: + token: ${{ secrets.CODECOV_TOKEN }} + directory: . + fail_ci_if_error: false + flags: dagster-${{ matrix.python-version }}-${{ matrix.extraPythonRequirement }} + name: pytest-dagster + verbose: true + + event-file: + runs-on: ubuntu-latest + steps: + - name: Upload + uses: actions/upload-artifact@v3 + with: + name: Event File + path: ${{ github.event_path }} diff --git a/.github/workflows/test-results.yml b/.github/workflows/test-results.yml index 0153060692271..c94a5fc340f47 100644 --- a/.github/workflows/test-results.yml +++ b/.github/workflows/test-results.yml @@ -2,7 +2,7 @@ name: Test Results on: workflow_run: - workflows: ["build & test", "metadata ingestion", "Airflow Plugin"] + workflows: ["build & test", "metadata ingestion", "Airflow Plugin", "Dagster Plugin"] types: - completed diff --git a/docs-website/build.gradle b/docs-website/build.gradle index 702ec0429780f..d1fdc6dc83365 100644 --- a/docs-website/build.gradle +++ b/docs-website/build.gradle @@ -79,11 +79,19 @@ task yarnInstall(type: YarnTask) { ) outputs.dir('node_modules') } +task airflowPluginBuild(dependsOn: [':metadata-ingestion-modules:airflow-plugin:buildWheel']) { +} + +// The Dagster plugin build and airflow plugin build can't be built at the same time; otherwise, it will raise +// fatal: Unable to create '/home/runner/work/datahub/datahub/.git/index.lock': File exists.. on CI +task dagsterPluginBuild(dependsOn: [':metadata-ingestion-modules:dagster-plugin:buildWheel', airflowPluginBuild]) { +} task yarnGenerate(type: YarnTask, dependsOn: [yarnInstall, generateGraphQLSchema, generateJsonSchema, ':metadata-ingestion:modelDocGen', ':metadata-ingestion:docGen', - ':metadata-ingestion:buildWheel', ':metadata-ingestion-modules:airflow-plugin:buildWheel'] ) { + ':metadata-ingestion:buildWheel', + airflowPluginBuild, dagsterPluginBuild] ) { inputs.files(projectMdFiles) outputs.cacheIf { true } args = ['run', 'generate'] diff --git a/docs-website/generateDocsDir.ts b/docs-website/generateDocsDir.ts index e19f09530665a..9116218290d32 100644 --- a/docs-website/generateDocsDir.ts +++ b/docs-website/generateDocsDir.ts @@ -572,6 +572,7 @@ function copy_python_wheels(): void { const wheel_dirs = [ "../metadata-ingestion/dist", "../metadata-ingestion-modules/airflow-plugin/dist", + "../metadata-ingestion-modules/dagster-plugin/dist", ]; const wheel_output_directory = path.join(STATIC_DIRECTORY, "wheels"); diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index f07f1aa031bc7..34398bc8c6661 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -310,6 +310,11 @@ module.exports = { id: "docs/lineage/airflow", label: "Airflow", }, + { + type: "doc", + id: "docs/lineage/dagster", + label: "Dagster", + }, //"docker/airflow/local_airflow", "metadata-integration/java/spark-lineage/README", "metadata-ingestion/integration_docs/great-expectations", @@ -766,6 +771,7 @@ module.exports = { // "metadata-integration/java/spark-lineage-beta/README.md // "metadata-integration/java/openlineage-converter/README" //"metadata-ingestion-modules/airflow-plugin/README" + //"metadata-ingestion-modules/dagster-plugin/README" // "metadata-ingestion/schedule_docs/datahub", // we can delete this // TODO: change the titles of these, removing the "What is..." portion from the sidebar" // "docs/what/entity", diff --git a/docs/lineage/dagster.md b/docs/lineage/dagster.md new file mode 100644 index 0000000000000..785aeaa4c03b2 --- /dev/null +++ b/docs/lineage/dagster.md @@ -0,0 +1,89 @@ +# Dagster Integration +DataHub supports the integration of + +- Dagster Pipeline metadata +- Job and Op run information as well as +- Lineage information when present + +## Using Datahub's Dagster Sensor + +Dagster sensors allow us to perform some actions based on some state change. Datahub's defined dagster sensor will emit metadata after every dagster pipeline run execution. This sensor is able to emit both pipeline success as well as failures. For more details about Dagster sensors please refer [Sensors](https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors). + +### Prerequisites + +1. You need to create a new dagster project. See . +2. There are two ways to define Dagster definition before starting dagster UI. One using [Definitions](https://docs.dagster.io/_apidocs/definitions#dagster.Definitions) class (recommended) and second using [Repositories](https://docs.dagster.io/concepts/repositories-workspaces/repositories#repositories). +3. Creation of new dagster project by default uses Definition class to define Dagster definition. + +### Setup + +1. You need to install the required dependency. + +```shell +pip install acryl_datahub_dagster_plugin +``` + +2. You need to import DataHub dagster plugin provided sensor definition and add it in Dagster definition or dagster repository before starting dagster UI as show below: +**Using Definitions class:** + +```python +{{ inline /metadata-ingestion-modules/dagster-plugin/examples/basic_setup.py }} +``` + +3. The DataHub dagster plugin provided sensor internally uses below configs. You can set these configs using environment variables. If not set, the sensor will take the default value. + + **Configuration options:** + + | Configuration Option | Default value | Description | + |-------------------------------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| + | datahub_client_config | | The DataHub client config | + | dagster_url | | The url to your Dagster Webserver. | + | capture_asset_materialization | True | Whether to capture asset keys as Dataset on AssetMaterialization event | + | capture_input_output | True | Whether to capture and try to parse input and output from HANDLED_OUTPUT,.LOADED_INPUT events. (currently only [PathMetadataValue](https://github.com/dagster-io/dagster/blob/7e08c05dcecef9fd07f887c7846bd1c9a90e7d84/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py#L655) metadata supported (EXPERIMENTAL) | + | platform_instance | | The instance of the platform that all assets produced by this recipe belong to. It is optional | + | asset_lineage_extractor | | You can implement your own logic to capture asset lineage information. See example for details[] | + +4. Once Dagster UI is up, you need to turn on the provided sensor execution. To turn on the sensor, click on Overview tab and then on Sensors tab. You will see a toggle button in front of all defined sensors to turn it on/off. + +5. DataHub dagster plugin provided sensor is ready to emit metadata after every dagster pipeline run execution. + +### How to validate installation + +1. Go and check in Dagster UI at Overview -> Sensors menu if you can see the 'datahub_sensor'. +2. Run a Dagster Job. In the dagster daemon logs, you should see DataHub related log messages like: + +``` +datahub_sensor - Emitting metadata... +``` + +## Dagster Ins and Out + +We can provide inputs and outputs to both assets and ops explicitly using a dictionary of `Ins` and `Out` corresponding to the decorated function arguments. While providing inputs and outputs explicitly we can provide metadata as well. +To create dataset upstream and downstream dependency for the assets and ops you can use an ins and out dictionary with metadata provided. For reference, look at the sample jobs created using assets [`assets_job.py`](../../metadata-ingestion-modules/dagster-plugin/examples/assets_job.py), or ops [`ops_job.py`](../../metadata-ingestion-modules/dagster-plugin/examples/ops_job.py). + +## Define your custom logic to capture asset lineage information +You can define your own logic to capture asset lineage information. + +The output Tuple contains two dictionaries, one for input assets and the other for output assets. The key of the dictionary is the op key and the value is the set of asset urns that are upstream or downstream of the op. + +```python +from datahub_dagster_plugin.client.dagster_generator import DagsterGenerator, DatasetLineage + +def asset_lineage_extractor( + context: RunStatusSensorContext, + dagster_generator: DagsterGenerator, + graph: DataHubGraph, +) -> Dict[str, DatasetLineage]: + dataset_lineage: Dict[str, DatasetLineage] = {} + + # Extracting input and output assets from the context + return dataset_lineage +``` + +[See example job here](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion-modules/dagster-plugin/examples/advanced_ops_jobs.py). + +## Debugging + +### Connection error for Datahub Rest URL + +If you get ConnectionError: HTTPConnectionPool(host='localhost', port=8080), then in that case your DataHub GMS service is not up. diff --git a/metadata-ingestion-modules/dagster-plugin/.gitignore b/metadata-ingestion-modules/dagster-plugin/.gitignore new file mode 100644 index 0000000000000..4ff42af3e16cf --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/.gitignore @@ -0,0 +1,143 @@ +.envrc +src/datahub_dagster_plugin/__init__.py.bak +.vscode/ +output +pvenv36/ +bq_credentials.json +/tmp +*.bak + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Generated classes +src/datahub/metadata/ +wheels/ +junit.quick.xml diff --git a/metadata-ingestion-modules/dagster-plugin/README.md b/metadata-ingestion-modules/dagster-plugin/README.md new file mode 100644 index 0000000000000..8e1460957ed9f --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/README.md @@ -0,0 +1,4 @@ +# Datahub Dagster Plugin + +See the DataHub Dagster docs for details. + diff --git a/metadata-ingestion-modules/dagster-plugin/build.gradle b/metadata-ingestion-modules/dagster-plugin/build.gradle new file mode 100644 index 0000000000000..163e0e6738b4d --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/build.gradle @@ -0,0 +1,131 @@ +plugins { + id 'base' +} + +ext { + python_executable = 'python3' + venv_name = 'venv' +} + +if (!project.hasProperty("extra_pip_requirements")) { + ext.extra_pip_requirements = "" +} + +def pip_install_command = "VIRTUAL_ENV=${venv_name} ${venv_name}/bin/uv pip install -e ../../metadata-ingestion" + +task checkPythonVersion(type: Exec) { + commandLine python_executable, '-c', 'import sys; assert sys.version_info >= (3, 8)' +} + +task environmentSetup(type: Exec, dependsOn: checkPythonVersion) { + def sentinel_file = "${venv_name}/.venv_environment_sentinel" + inputs.file file('setup.py') + outputs.file(sentinel_file) + commandLine 'bash', '-c', + "${python_executable} -m venv ${venv_name} && " + + "${venv_name}/bin/python -m pip install --upgrade pip uv wheel 'setuptools>=63.0.0' && " + + "touch ${sentinel_file}" +} + +task installPackage(type: Exec, dependsOn: [environmentSetup, ':metadata-ingestion:codegen']) { + def sentinel_file = "${venv_name}/.build_install_package_sentinel" + inputs.file file('setup.py') + outputs.file(sentinel_file) + commandLine 'bash', '-c', + "source ${venv_name}/bin/activate && set -x && " + + "uv pip install -e . ${extra_pip_requirements} && " + + "touch ${sentinel_file}" +} + +task install(dependsOn: [installPackage]) + +task installDev(type: Exec, dependsOn: [install]) { + def sentinel_file = "${venv_name}/.build_install_dev_sentinel" + inputs.file file('setup.py') + outputs.file(sentinel_file) + commandLine 'bash', '-c', + "source ${venv_name}/bin/activate && set -x && " + + "uv pip install -e .[dev] ${extra_pip_requirements} && " + + "touch ${sentinel_file}" +} + +task lint(type: Exec, dependsOn: installDev) { + /* + The find/sed combo below is a temporary work-around for the following mypy issue with airflow 2.2.0: + "venv/lib/python3.8/site-packages/airflow/_vendor/connexion/spec.py:169: error: invalid syntax". + */ + commandLine 'bash', '-c', + "source ${venv_name}/bin/activate && set -x && " + + "black --check --diff src/ tests/ examples/ && " + + "isort --check --diff src/ tests/ examples/ && " + + "flake8 --count --statistics src/ tests/ examples/ && " + + "mypy --show-traceback --show-error-codes src/ tests/ examples/" +} +task lintFix(type: Exec, dependsOn: installDev) { + commandLine 'bash', '-x', '-c', + "source ${venv_name}/bin/activate && " + + "black src/ tests/ examples/ && " + + "isort src/ tests/ examples/ && " + + "flake8 src/ tests/ examples/ && " + + "mypy src/ tests/ examples/" +} + +task installDevTest(type: Exec, dependsOn: [installDev]) { + def sentinel_file = "${venv_name}/.build_install_dev_test_sentinel" + inputs.file file('setup.py') + outputs.dir("${venv_name}") + outputs.file(sentinel_file) + commandLine 'bash', '-c', + "source ${venv_name}/bin/activate && set -x && " + + "uv pip install -e .[dev,integration-tests] ${extra_pip_requirements} && " + + "touch ${sentinel_file}" +} + +def testFile = hasProperty('testFile') ? testFile : 'unknown' +task testSingle(dependsOn: [installDevTest]) { + doLast { + if (testFile != 'unknown') { + exec { + commandLine 'bash', '-x', '-c', + "source ${venv_name}/bin/activate && pytest ${testFile}" + } + } else { + throw new GradleException("No file provided. Use -PtestFile=") + } + } +} + +task testQuick(type: Exec, dependsOn: installDevTest) { + // We can't enforce the coverage requirements if we run a subset of the tests. + inputs.files(project.fileTree(dir: "src/", include: "**/*.py")) + inputs.files(project.fileTree(dir: "tests/")) + outputs.dir("${venv_name}") + commandLine 'bash', '-x', '-c', + "source ${venv_name}/bin/activate && pytest -vv --continue-on-collection-errors --junit-xml=junit.quick.xml" +} + + +task testFull(type: Exec, dependsOn: [testQuick, installDevTest]) { + commandLine 'bash', '-x', '-c', + "source ${venv_name}/bin/activate && pytest -m 'not slow_integration' -vv --continue-on-collection-errors --junit-xml=junit.full.xml" +} +task buildWheel(type: Exec, dependsOn: [install]) { + commandLine 'bash', '-c', "source ${venv_name}/bin/activate && " + + 'uv pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_TEST=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh' +} + +task cleanPythonCache(type: Exec) { + commandLine 'bash', '-c', + "find src -type f -name '*.py[co]' -delete -o -type d -name __pycache__ -delete -o -type d -empty -delete" +} + +build.dependsOn install +check.dependsOn lint +check.dependsOn testQuick + +clean { + delete venv_name + delete 'build' + delete 'dist' +} +clean.dependsOn cleanPythonCache diff --git a/metadata-ingestion-modules/dagster-plugin/examples/advanced_ops_jobs.py b/metadata-ingestion-modules/dagster-plugin/examples/advanced_ops_jobs.py new file mode 100644 index 0000000000000..d4cc65297e42c --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/examples/advanced_ops_jobs.py @@ -0,0 +1,106 @@ +from typing import Dict + +from dagster import ( + Definitions, + In, + Out, + PythonObjectDagsterType, + RunStatusSensorContext, + job, + op, +) +from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph +from datahub.utilities.urns.dataset_urn import DatasetUrn + +from datahub_dagster_plugin.client.dagster_generator import ( + DagsterGenerator, + DatasetLineage, +) +from datahub_dagster_plugin.sensors.datahub_sensors import ( + DatahubDagsterSourceConfig, + make_datahub_sensor, +) + + +@op +def extract(): + results = [1, 2, 3, 4] + return results + + +@op( + ins={ + "data": In( + dagster_type=PythonObjectDagsterType(list), + metadata={"datahub.inputs": [DatasetUrn("snowflake", "tableA").urn]}, + ) + }, + out={ + "result": Out( + metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableB").urn]} + ) + }, +) +def transform(data): + results = [] + for each in data: + results.append(str(each)) + return results + + +@job +def do_stuff(): + transform(extract()) + + +def asset_lineage_extractor( + context: RunStatusSensorContext, + dagster_generator: DagsterGenerator, + graph: DataHubGraph, +) -> Dict[str, DatasetLineage]: + from dagster._core.events import DagsterEventType + + logs = context.instance.all_logs( + context.dagster_run.run_id, + { + DagsterEventType.ASSET_MATERIALIZATION, + DagsterEventType.ASSET_OBSERVATION, + DagsterEventType.HANDLED_OUTPUT, + DagsterEventType.LOADED_INPUT, + }, + ) + + dataset_lineage: Dict[str, DatasetLineage] = {} + + for log in logs: + if not log.dagster_event or not log.step_key: + continue + + if log.dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION: + if log.step_key not in dataset_lineage: + dataset_lineage[log.step_key] = DatasetLineage(set(), set()) + + materialization = log.asset_materialization + if not materialization: + continue + + properties = { + key: str(value) for (key, value) in materialization.metadata.items() + } + asset_key = materialization.asset_key.path + dataset_urn = dagster_generator.emit_asset( + graph, asset_key, materialization.description, properties + ) + dataset_lineage[log.step_key].outputs.add(dataset_urn) + + return dataset_lineage + + +config = DatahubDagsterSourceConfig( + datahub_client_config=DatahubClientConfig(server="http://localhost:8080"), + dagster_url="http://localhost:3000", + asset_lineage_extractor=asset_lineage_extractor, +) + +datahub_sensor = make_datahub_sensor(config=config) +defs = Definitions(jobs=[do_stuff], sensors=[datahub_sensor]) diff --git a/metadata-ingestion-modules/dagster-plugin/examples/assets_job.py b/metadata-ingestion-modules/dagster-plugin/examples/assets_job.py new file mode 100644 index 0000000000000..57634ab345a5e --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/examples/assets_job.py @@ -0,0 +1,63 @@ +from dagster import ( + AssetIn, + AssetOut, + Definitions, + Output, + asset, + define_asset_job, + multi_asset, +) +from datahub.utilities.urns.dataset_urn import DatasetUrn + +from datahub_dagster_plugin.sensors.datahub_sensors import ( + DatahubDagsterSourceConfig, + make_datahub_sensor, +) + + +@multi_asset( + outs={ + "extract": AssetOut( + metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableD").urn]} + ), + } +) +def extract(): + results = [1, 2, 3, 4] + metadata = { + "num_record": len(results), + } + return Output(value=results, metadata=metadata) + + +@asset( + ins={ + "extract": AssetIn( + "extract", + metadata={"datahub.inputs": [DatasetUrn("snowflake", "tableC").urn()]}, + ) + } +) +def transform(extract): + results = [] + for each in extract: + results.append(str(each)) + return results + + +assets_job = define_asset_job(name="assets_job") + +config = DatahubDagsterSourceConfig.parse_obj( + { + "rest_sink_config": { + "server": "http://localhost:8080", + }, + "dagster_url": "http://localhost:3000", + } +) + +datahub_sensor = make_datahub_sensor(config=config) + +defs = Definitions( + assets=[extract, transform], jobs=[assets_job], sensors=[datahub_sensor] +) diff --git a/metadata-ingestion-modules/dagster-plugin/examples/basic_setup.py b/metadata-ingestion-modules/dagster-plugin/examples/basic_setup.py new file mode 100644 index 0000000000000..300cf9df022c6 --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/examples/basic_setup.py @@ -0,0 +1,20 @@ +from dagster import Definitions +from datahub.ingestion.graph.client import DatahubClientConfig + +from datahub_dagster_plugin.sensors.datahub_sensors import ( + DatahubDagsterSourceConfig, + make_datahub_sensor, +) + +config = DatahubDagsterSourceConfig( + datahub_client_config=DatahubClientConfig( + server="https://your_datahub_url/gms", token="your_datahub_token" + ), + dagster_url="https://my-dagster-cloud.dagster.cloud", +) + +datahub_sensor = make_datahub_sensor(config=config) + +defs = Definitions( + sensors=[datahub_sensor], +) diff --git a/metadata-ingestion-modules/dagster-plugin/examples/ops_job.py b/metadata-ingestion-modules/dagster-plugin/examples/ops_job.py new file mode 100644 index 0000000000000..d743e19a235d5 --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/examples/ops_job.py @@ -0,0 +1,51 @@ +from dagster import Definitions, In, Out, PythonObjectDagsterType, job, op +from datahub.utilities.urns.dataset_urn import DatasetUrn + +from datahub_dagster_plugin.sensors.datahub_sensors import ( + DatahubDagsterSourceConfig, + make_datahub_sensor, +) + + +@op +def extract(): + results = [1, 2, 3, 4] + return results + + +@op( + ins={ + "data": In( + dagster_type=PythonObjectDagsterType(list), + metadata={"datahub.inputs": [DatasetUrn("snowflake", "tableA").urn]}, + ) + }, + out={ + "result": Out( + metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableB").urn]} + ) + }, +) +def transform(data): + results = [] + for each in data: + results.append(str(each)) + return results + + +@job +def do_stuff(): + transform(extract()) + + +config = DatahubDagsterSourceConfig.parse_obj( + { + "rest_sink_config": { + "server": "http://localhost:8080", + }, + "dagster_url": "http://localhost:3000", + } +) + +datahub_sensor = make_datahub_sensor(config=config) +defs = Definitions(jobs=[do_stuff], sensors=[datahub_sensor]) diff --git a/metadata-ingestion-modules/dagster-plugin/pyproject.toml b/metadata-ingestion-modules/dagster-plugin/pyproject.toml new file mode 100644 index 0000000000000..fba81486b9f67 --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/pyproject.toml @@ -0,0 +1,19 @@ +[build-system] +build-backend = "setuptools.build_meta" +requires = ["setuptools>=54.0.0", "wheel", "pip>=21.0.0"] + +[tool.black] +extend-exclude = ''' +# A regex preceded with ^/ will apply only to files and directories +# in the root of the project. +^/tmp +''' +include = '\.pyi?$' + +[tool.isort] +indent = ' ' +profile = 'black' +sections = 'FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER' + +[tool.pyright] +extraPaths = ['tests'] \ No newline at end of file diff --git a/metadata-ingestion-modules/dagster-plugin/scripts/release.sh b/metadata-ingestion-modules/dagster-plugin/scripts/release.sh new file mode 100755 index 0000000000000..30219956534d9 --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/scripts/release.sh @@ -0,0 +1,26 @@ +#!/bin/bash +set -euxo pipefail + +if [[ ! ${RELEASE_SKIP_TEST:-} ]]; then + ../../gradlew build # also runs tests +elif [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then + ../../gradlew install +fi + +MODULE=datahub_dagster_plugin + +# Check packaging constraint. +python -c 'import setuptools; where="./src"; assert setuptools.find_packages(where) == setuptools.find_namespace_packages(where), "you seem to be missing or have extra __init__.py files"' +if [[ ${RELEASE_VERSION:-} ]]; then + # Replace version with RELEASE_VERSION env variable + sed -i.bak "s/__version__ = \"0.0.0.dev0\"/__version__ = \"$RELEASE_VERSION\"/" src/${MODULE}/__init__.py +else + vim src/${MODULE}/__init__.py +fi + +rm -rf build dist || true +python -m build +if [[ ! ${RELEASE_SKIP_UPLOAD:-} ]]; then + python -m twine upload 'dist/*' +fi +git restore src/${MODULE}/__init__.py diff --git a/metadata-ingestion-modules/dagster-plugin/setup.cfg b/metadata-ingestion-modules/dagster-plugin/setup.cfg new file mode 100644 index 0000000000000..20a903914332a --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/setup.cfg @@ -0,0 +1,73 @@ +[flake8] +max-complexity = 15 +ignore = + # Ignore: line length issues, since black's formatter will take care of them. + E501, + # Ignore: 1 blank line required before class docstring. + D203, + # See https://stackoverflow.com/a/57074416. + W503, + # See https://github.com/psf/black/issues/315. + E203 +exclude = + .git, + venv, + .tox, + __pycache__ +per-file-ignores = + # imported but unused + __init__.py: F401 +ban-relative-imports = true + +[mypy] +plugins = + pydantic.mypy +exclude = ^(venv|build|dist)/ +ignore_missing_imports = yes +strict_optional = yes +check_untyped_defs = yes +disallow_incomplete_defs = yes +disallow_untyped_decorators = yes +warn_unused_configs = yes +# eventually we'd like to enable these +disallow_untyped_defs = no + +# try to be a bit more strict in certain areas of the codebase +[mypy-datahub.*] +ignore_missing_imports = no +[mypy-tests.*] +ignore_missing_imports = no + +[tool:pytest] +asyncio_mode = auto +addopts = --cov=src --cov-report term-missing --cov-config setup.cfg --strict-markers + +testpaths = + tests/unit + tests/integration + +[coverage:run] +# Because of some quirks in the way setup.cfg, coverage.py, pytest-cov, +# and tox interact, we should not uncomment the following line. +# See https://pytest-cov.readthedocs.io/en/latest/config.html and +# https://coverage.readthedocs.io/en/coverage-5.0/config.html. +# We also have some additional pytest/cov config options in tox.ini. +# source = src + +[coverage:paths] +# This is necessary for tox-based coverage to be counted properly. +source = + src + */site-packages + +[coverage:report] +# The fail_under value ensures that at least some coverage data is collected. +# We override its value in the tox config. +show_missing = true +exclude_lines = + pragma: no cover + @abstract + if TYPE_CHECKING: +omit = + # omit example jobs + src/datahub_dagster_plugin/example_jobs/* diff --git a/metadata-ingestion-modules/dagster-plugin/setup.py b/metadata-ingestion-modules/dagster-plugin/setup.py new file mode 100644 index 0000000000000..60b960e653eb2 --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/setup.py @@ -0,0 +1,136 @@ +import os +import pathlib + +import setuptools + +package_metadata: dict = {} +with open("./src/datahub_dagster_plugin/__init__.py") as fp: + exec(fp.read(), package_metadata) + + +def get_long_description(): + root = os.path.dirname(__file__) + return pathlib.Path(os.path.join(root, "README.md")).read_text() + + +rest_common = {"requests", "requests_file"} + +_version: str = package_metadata["__version__"] +_self_pin = ( + f"=={_version}" if not (_version.endswith("dev0") or "docker" in _version) else "" +) + +base_requirements = { + # Actual dependencies. + "dagster >= 1.3.3", + "dagit >= 1.3.3", + *rest_common, + # Ignoring the dependency below because it causes issues with the vercel built wheel install + #f"acryl-datahub[datahub-rest]{_self_pin}", + "acryl-datahub[datahub-rest]", +} + +mypy_stubs = { + "types-dataclasses", + "sqlalchemy-stubs", + "types-pkg_resources", + "types-six", + "types-python-dateutil", + "types-requests", + "types-toml", + "types-PyYAML", + "types-freezegun", + "types-cachetools", + # versions 0.1.13 and 0.1.14 seem to have issues + "types-click==0.1.12", + "types-tabulate", + # avrogen package requires this + "types-pytz", +} + +base_dev_requirements = { + *base_requirements, + *mypy_stubs, + "black==22.12.0", + "coverage>=5.1", + "flake8>=6.0.0", + "flake8-tidy-imports>=4.3.0", + "flake8-bugbear==23.3.12", + "isort>=5.7.0", + "mypy>=1.4.0", + # pydantic 1.8.2 is incompatible with mypy 0.910. + # See https://github.com/samuelcolvin/pydantic/pull/3175#issuecomment-995382910. + "pydantic>=1.10.0,!=1.10.3", + "pytest>=6.2.2", + "pytest-asyncio>=0.16.0", + "pytest-cov>=2.8.1", + "tox", + "deepdiff", + "requests-mock", + "freezegun", + "jsonpickle", + "build", + "twine", + "packaging", +} + +dev_requirements = { + *base_dev_requirements, +} + +integration_test_requirements = { + *dev_requirements, +} + +entry_points = { + "dagster.plugins": "acryl-datahub-dagster-plugin = datahub_dagster_plugin.datahub_dagster_plugin:DatahubDagsterPlugin" +} + + +setuptools.setup( + # Package metadata. + name=package_metadata["__package_name__"], + version=package_metadata["__version__"], + url="https://datahubproject.io/", + project_urls={ + "Documentation": "https://datahubproject.io/docs/", + "Source": "https://github.com/datahub-project/datahub", + "Changelog": "https://github.com/datahub-project/datahub/releases", + }, + license="Apache License 2.0", + description="Datahub Dagster plugin to capture executions and send to Datahub", + long_description=get_long_description(), + long_description_content_type="text/markdown", + classifiers=[ + "Development Status :: 5 - Production/Stable", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Intended Audience :: Developers", + "Intended Audience :: Information Technology", + "Intended Audience :: System Administrators", + "License :: OSI Approved", + "License :: OSI Approved :: Apache Software License", + "Operating System :: Unix", + "Operating System :: POSIX :: Linux", + "Environment :: Console", + "Environment :: MacOS X", + "Topic :: Software Development", + ], + # Package info. + zip_safe=False, + python_requires=">=3.8", + package_dir={"": "src"}, + packages=setuptools.find_namespace_packages(where="./src"), + entry_points=entry_points, + # Dependencies. + install_requires=list(base_requirements), + extras_require={ + "ignore": [], # This is a dummy extra to allow for trailing commas in the list. + "dev": list(dev_requirements), + "integration-tests": list(integration_test_requirements), + }, +) diff --git a/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/__init__.py b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/__init__.py new file mode 100644 index 0000000000000..1ecfc362ceb4e --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/__init__.py @@ -0,0 +1,21 @@ +# Published at https://pypi.org/project/acryl-datahub/. +__package_name__ = "acryl-datahub-dagster-plugin" +__version__ = "0.0.0.dev0" + + +def is_dev_mode() -> bool: + return __version__.endswith("dev0") + + +def nice_version_name() -> str: + if is_dev_mode(): + return "unavailable (installed in develop mode)" + return __version__ + + +def get_provider_info(): + return { + "package-name": f"{__package_name__}", + "name": f"{__package_name__}", + "description": "Datahub metadata collector plugin", + } diff --git a/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/__init__.py b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/dagster_generator.py b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/dagster_generator.py new file mode 100644 index 0000000000000..c00160dfb0319 --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/dagster_generator.py @@ -0,0 +1,504 @@ +from dataclasses import dataclass +from logging import Logger +from typing import Any, Callable, Dict, List, NamedTuple, Optional, Sequence, Set +from urllib.parse import urlsplit + +import pydantic +from dagster import DagsterRunStatus, PathMetadataValue, RunStatusSensorContext +from dagster._core.execution.stats import RunStepKeyStatsSnapshot, StepEventStatus +from dagster._core.snap import JobSnapshot +from dagster._core.snap.node import OpDefSnap +from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatsSnapshot +from datahub.api.entities.datajob import DataFlow, DataJob +from datahub.api.entities.dataprocess.dataprocess_instance import ( + DataProcessInstance, + InstanceRunResult, +) +from datahub.api.entities.dataset.dataset import Dataset +from datahub.configuration.source_common import DatasetSourceConfigMixin +from datahub.emitter.mce_builder import ( + make_data_platform_urn, + make_dataplatform_instance_urn, +) +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph +from datahub.metadata.schema_classes import DataPlatformInstanceClass, SubTypesClass +from datahub.utilities.urns.data_flow_urn import DataFlowUrn +from datahub.utilities.urns.data_job_urn import DataJobUrn +from datahub.utilities.urns.dataset_urn import DatasetUrn + + +class Constant: + """ + keys used in dagster plugin + """ + + ORCHESTRATOR = "dagster" + + # Default config constants + DEFAULT_DATAHUB_REST_URL = "http://localhost:8080" + + # Environment variable contants + DATAHUB_REST_URL = "DATAHUB_REST_URL" + DATAHUB_ENV = "DATAHUB_ENV" + DATAHUB_PLATFORM_INSTANCE = "DATAHUB_PLATFORM_INSTANCE" + DAGSTER_UI_URL = "DAGSTER_UI_URL" + + # Datahub inputs/outputs constant + DATAHUB_INPUTS = "datahub.inputs" + DATAHUB_OUTPUTS = "datahub.outputs" + + # Job run constant + JOB_SNAPSHOT_ID = "job_snapshot_id" + EXECUTION_PLAN_SNAPSHOT_ID = "execution_plan_snapshot_id" + ROOT_RUN_ID = "root_run_id" + PARENT_RUN_ID = "parent_run_id" + HAS_REPOSITORY_LOAD_DATA = "has_repository_load_data" + TAGS = "tags" + STEPS_SUCCEEDED = "steps_succeeded" + STEPS_FAILED = "steps_failed" + MATERIALIZATIONS = "materializations" + EXPECTATIONS = "expectations" + ENQUEUED_TIME = "enqueued_time" + LAUNCH_TIME = "launch_time" + START_TIME = "start_time" + END_TIME = "end_time" + + # Op run contants + STEP_KEY = "step_key" + ATTEMPTS = "attempts" + + +class DatasetLineage(NamedTuple): + inputs: Set[str] + outputs: Set[str] + + +class DatahubDagsterSourceConfig(DatasetSourceConfigMixin): + datahub_client_config: DatahubClientConfig = pydantic.Field( + default=DatahubClientConfig(), + description="Datahub client config", + ) + + dagster_url: Optional[str] = pydantic.Field( + default=None, + description="Dagster UI URL. Like: https://myDagsterCloudEnvironment.dagster.cloud/prod", + ) + + capture_asset_materialization: bool = pydantic.Field( + default=True, + description="Whether to capture asset keys as Dataset on AssetMaterialization event", + ) + + capture_input_output: bool = pydantic.Field( + default=False, + description="Whether to capture and try to parse input and output from HANDLED_OUTPUT, LOADED_INPUT event. (currently only filepathvalue metadata supported", + ) + + asset_lineage_extractor: Optional[ + Callable[ + [RunStatusSensorContext, "DagsterGenerator", DataHubGraph], + Dict[str, DatasetLineage], + ] + ] = pydantic.Field( + default=None, + description="Custom asset lineage extractor function. See details at [https://datahubproject.io/docs/lineage/dagster/#define-your-custom-logic-to-capture-asset-lineage-information]", + ) + + +def _str_urn_to_dataset_urn(urns: List[str]) -> List[DatasetUrn]: + return [DatasetUrn.create_from_string(urn) for urn in urns] + + +@dataclass +class DagsterEnvironment: + repository: Optional[str] + is_cloud: bool = True + is_branch_deployment: bool = False + branch: Optional[str] = "prod" + module: Optional[str] = None + + +def job_url_generator(dagster_url: str, dagster_environment: DagsterEnvironment) -> str: + if dagster_environment.is_cloud: + base_url = f"{dagster_url}/{dagster_environment.branch}" + else: + base_url = dagster_url + + if dagster_environment.module: + base_url = f"{base_url}/locations/{dagster_environment.module}" + + return base_url + + +class DagsterGenerator: + def __init__( + self, + logger: Logger, + config: DatahubDagsterSourceConfig, + dagster_environment: DagsterEnvironment, + ): + self.logger = logger + self.config = config + self.dagster_environment = dagster_environment + + def path_metadata_resolver(self, value: PathMetadataValue) -> Optional[DatasetUrn]: + """ + Resolve path metadata to dataset urn + """ + path = value.value + if not path: + return None + + if "://" in path: + url = urlsplit(path) + scheme = url.scheme + + # Need to adjust some these schemes + if scheme in ["s3a", "s3n"]: + scheme = "s3" + elif scheme in ["gs"]: + scheme = "gcs" + + return DatasetUrn(platform=scheme, name=url.path) + else: + return DatasetUrn(platform="file", name=path) + + def metadata_resolver(self, metadata: Any) -> Optional[DatasetUrn]: + """ + Resolve metadata to dataset urn + """ + if isinstance(metadata, PathMetadataValue): + return self.path_metadata_resolver(metadata) + else: + self.logger.info(f"Unknown Metadata: {metadata} of type {type(metadata)}") + return None + + def generate_dataflow( + self, + job_snapshot: JobSnapshot, + env: str, + platform_instance: Optional[str] = None, + ) -> DataFlow: + """ + Generates a Dataflow object from an Dagster Job Snapshot + :param job_snapshot: JobSnapshot - Job snapshot object + :param env: str + :param platform_instance: Optional[str] + :return: DataFlow - Data generated dataflow + """ + if self.dagster_environment.is_cloud: + id = f"{self.dagster_environment.branch}/{self.dagster_environment.module}/{job_snapshot.name}" + else: + id = f"{self.dagster_environment.module}/{job_snapshot.name}" + + dataflow = DataFlow( + orchestrator=Constant.ORCHESTRATOR, + id=id, + env=env, + name=job_snapshot.name, + platform_instance=platform_instance, + ) + dataflow.description = job_snapshot.description + dataflow.tags = set(job_snapshot.tags.keys()) + if self.config.dagster_url: + dataflow.url = f"{job_url_generator(dagster_url=self.config.dagster_url, dagster_environment=self.dagster_environment)}/jobs/{job_snapshot.name}" + flow_property_bag: Dict[str, str] = {} + for key in job_snapshot.metadata.keys(): + flow_property_bag[key] = str(job_snapshot.metadata[key]) + dataflow.properties = flow_property_bag + return dataflow + + def generate_datajob( + self, + job_snapshot: JobSnapshot, + step_deps: Dict[str, List], + op_def_snap: OpDefSnap, + env: str, + input_datasets: Dict[str, Set[DatasetUrn]], + output_datasets: Dict[str, Set[DatasetUrn]], + platform_instance: Optional[str] = None, + ) -> DataJob: + """ + Generates a Datajob object from an Dagster op snapshot + :param job_snapshot: JobSnapshot - Job snapshot object + :param op_def_snap: OpDefSnap - Op def snapshot object + :param env: str + :param platform_instance: Optional[str] + :param output_datasets: dict[str, Set[DatasetUrn]] - output datasets for each op + :return: DataJob - Data generated datajob + """ + + if self.dagster_environment.is_cloud: + flow_id = f"{self.dagster_environment.branch}/{self.dagster_environment.module}/{job_snapshot.name}" + job_id = f"{self.dagster_environment.branch}/{self.dagster_environment.module}/{op_def_snap.name}" + else: + flow_id = f"{self.dagster_environment.module}/{job_snapshot.name}" + job_id = f"{self.dagster_environment.module}/{op_def_snap.name}" + + dataflow_urn = DataFlowUrn.create_from_ids( + orchestrator=Constant.ORCHESTRATOR, + flow_id=flow_id, + env=env, + platform_instance=platform_instance, + ) + datajob = DataJob( + id=job_id, + flow_urn=dataflow_urn, + name=op_def_snap.name, + ) + + if self.config.dagster_url: + datajob.url = f"{job_url_generator(dagster_url=self.config.dagster_url, dagster_environment=self.dagster_environment)}/jobs/{job_snapshot.name}/{op_def_snap.name}" + + datajob.description = op_def_snap.description + datajob.tags = set(op_def_snap.tags.keys()) + + # Add upstream dependencies for this op + for upstream_op_name in step_deps[op_def_snap.name]: + if self.dagster_environment.is_cloud: + upstream_job_id = f"{self.dagster_environment.branch}/{self.dagster_environment.module}/{upstream_op_name}" + else: + upstream_job_id = ( + f"{self.dagster_environment.module}/{upstream_op_name}" + ) + upstream_op_urn = DataJobUrn.create_from_ids( + data_flow_urn=str(dataflow_urn), + job_id=upstream_job_id, + ) + datajob.upstream_urns.extend([upstream_op_urn]) + job_property_bag: Dict[str, str] = {} + if input_datasets: + self.logger.info( + f"Input datasets for {op_def_snap.name} are { list(input_datasets.get(op_def_snap.name, []))}" + ) + datajob.inlets = list(input_datasets.get(op_def_snap.name, [])) + + if output_datasets: + self.logger.info( + f"Output datasets for {op_def_snap.name} are { list(output_datasets.get(op_def_snap.name, []))}" + ) + datajob.outlets = list(output_datasets.get(op_def_snap.name, [])) + + # For all op inputs/outputs: + # Add input/output details like its type, description, metadata etc in datajob properties. + # Also, add datahub inputs/outputs if present in input/output metatdata. + for input_def_snap in op_def_snap.input_def_snaps: + job_property_bag[f"input.{input_def_snap.name}"] = str( + input_def_snap._asdict() + ) + if Constant.DATAHUB_INPUTS in input_def_snap.metadata: + datajob.inlets.extend( + _str_urn_to_dataset_urn( + input_def_snap.metadata[Constant.DATAHUB_INPUTS].value # type: ignore + ) + ) + + for output_def_snap in op_def_snap.output_def_snaps: + job_property_bag[f"output_{output_def_snap.name}"] = str( + output_def_snap._asdict() + ) + if Constant.DATAHUB_OUTPUTS in output_def_snap.metadata: + datajob.outlets.extend( + _str_urn_to_dataset_urn( + output_def_snap.metadata[Constant.DATAHUB_OUTPUTS].value # type: ignore + ) + ) + + datajob.properties = job_property_bag + + return datajob + + def emit_job_run( + self, + graph: DataHubGraph, + dataflow: DataFlow, + run: DagsterRun, + run_stats: DagsterRunStatsSnapshot, + ) -> None: + """ + Emit a latest job run + :param graph: DatahubRestEmitter + :param dataflow: DataFlow - DataFlow object + :param run: DagsterRun - Dagster Run object + :param run_stats: DagsterRunStatsSnapshot - latest job run stats + """ + dpi = DataProcessInstance.from_dataflow(dataflow=dataflow, id=run_stats.run_id) + if self.config.dagster_url: + if self.dagster_environment.is_cloud: + dpi.url = f"{self.config.dagster_url}/{self.dagster_environment.branch}/runs/{run.run_id}" + else: + dpi.url = f"{self.config.dagster_url}/runs/{run.run_id}" + + # Add below details in dpi properties + dpi_property_bag: Dict[str, str] = {} + allowed_job_run_keys = [ + Constant.JOB_SNAPSHOT_ID, + Constant.EXECUTION_PLAN_SNAPSHOT_ID, + Constant.ROOT_RUN_ID, + Constant.PARENT_RUN_ID, + Constant.HAS_REPOSITORY_LOAD_DATA, + Constant.TAGS, + Constant.STEPS_SUCCEEDED, + Constant.STEPS_FAILED, + Constant.MATERIALIZATIONS, + Constant.EXPECTATIONS, + Constant.ENQUEUED_TIME, + Constant.LAUNCH_TIME, + Constant.START_TIME, + Constant.END_TIME, + ] + for key in allowed_job_run_keys: + if hasattr(run, key) and getattr(run, key) is not None: + dpi_property_bag[key] = str(getattr(run, key)) + if hasattr(run_stats, key) and getattr(run_stats, key) is not None: + dpi_property_bag[key] = str(getattr(run_stats, key)) + dpi.properties.update(dpi_property_bag) + + status_result_map = { + DagsterRunStatus.SUCCESS: InstanceRunResult.SUCCESS, + DagsterRunStatus.FAILURE: InstanceRunResult.FAILURE, + DagsterRunStatus.CANCELED: InstanceRunResult.SKIPPED, + } + + if run.status not in status_result_map: + raise Exception( + f"Job run status should be either complete, failed or cancelled and it was " + f"{run.status }" + ) + + if run_stats.start_time is not None: + dpi.emit_process_start( + emitter=graph, + start_timestamp_millis=int(run_stats.start_time * 1000), + ) + + if run_stats.end_time is not None: + dpi.emit_process_end( + emitter=graph, + end_timestamp_millis=int(run_stats.end_time * 1000), + result=status_result_map[run.status], + result_type=Constant.ORCHESTRATOR, + ) + + def emit_op_run( + self, + graph: DataHubGraph, + datajob: DataJob, + run_step_stats: RunStepKeyStatsSnapshot, + ) -> None: + """ + Emit an op run + :param graph: DataHubGraph + :param datajob: DataJob - DataJob object + :param run_step_stats: RunStepKeyStatsSnapshot - step(op) run stats + """ + dpi = DataProcessInstance.from_datajob( + datajob=datajob, + id=f"{run_step_stats.run_id}.{datajob.id}", + clone_inlets=True, + clone_outlets=True, + ) + if self.config.dagster_url: + dpi.url = f"{self.config.dagster_url}/runs/{run_step_stats.run_id}" + if self.dagster_environment.is_cloud: + dpi.url = f"{self.config.dagster_url}/{self.dagster_environment.branch}/runs/{run_step_stats.run_id}" + else: + dpi.url = f"{self.config.dagster_url}/runs/{run_step_stats.run_id}" + + # Add below details in dpi properties + dpi_property_bag: Dict[str, str] = {} + allowed_op_run_keys = [ + Constant.STEP_KEY, + Constant.ATTEMPTS, + Constant.START_TIME, + Constant.END_TIME, + ] + for key in allowed_op_run_keys: + if ( + hasattr(run_step_stats, key) + and getattr(run_step_stats, key) is not None + ): + dpi_property_bag[key] = str(getattr(run_step_stats, key)) + dpi.properties.update(dpi_property_bag) + + status_result_map = { + StepEventStatus.SUCCESS: InstanceRunResult.SUCCESS, + StepEventStatus.FAILURE: InstanceRunResult.FAILURE, + StepEventStatus.SKIPPED: InstanceRunResult.SKIPPED, + } + + if run_step_stats.status not in status_result_map: + raise Exception( + f"Step run status should be either complete, failed or cancelled and it was " + f"{run_step_stats.status }" + ) + + if run_step_stats.start_time is not None: + dpi.emit_process_start( + emitter=graph, + start_timestamp_millis=int(run_step_stats.start_time * 1000), + ) + + if run_step_stats.end_time is not None: + dpi.emit_process_end( + emitter=graph, + end_timestamp_millis=int(run_step_stats.end_time * 1000), + result=status_result_map[run_step_stats.status], + result_type=Constant.ORCHESTRATOR, + ) + + def dataset_urn_from_asset(self, asset_key: Sequence[str]) -> DatasetUrn: + """ + Generate dataset urn from asset key + """ + return DatasetUrn( + platform="dagster", env=self.config.env, name="/".join(asset_key) + ) + + def emit_asset( + self, + graph: DataHubGraph, + asset_key: Sequence[str], + description: Optional[str], + properties: Optional[Dict[str, str]], + ) -> str: + """ + Emit asset to datahub + """ + dataset_urn = self.dataset_urn_from_asset(asset_key) + dataset = Dataset( + id=None, + urn=dataset_urn.urn(), + platform="dagster", + name=asset_key[-1], + schema=None, + downstreams=None, + subtype="Asset", + subtypes=None, + description=description, + env=self.config.env, + properties=properties, + ) + for mcp in dataset.generate_mcp(): + graph.emit_mcp(mcp) + + mcp = MetadataChangeProposalWrapper( + entityUrn=dataset_urn.urn(), + aspect=SubTypesClass(typeNames=["Asset"]), + ) + graph.emit_mcp(mcp) + + if self.config.platform_instance: + mcp = MetadataChangeProposalWrapper( + entityUrn=dataset_urn.urn(), + aspect=DataPlatformInstanceClass( + instance=make_dataplatform_instance_urn( + instance=self.config.platform_instance, + platform="dagster", + ), + platform=make_data_platform_urn("dagster"), + ), + ) + graph.emit_mcp(mcp) + return dataset_urn.urn() diff --git a/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/datahub_dagster_plugin.py b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/datahub_dagster_plugin.py new file mode 100644 index 0000000000000..3a66f97fe90bd --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/datahub_dagster_plugin.py @@ -0,0 +1,2 @@ +class DatahubDagsterPlugin: + name = "datahub_dagster_plugin" diff --git a/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/sensors/__init__.py b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/sensors/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/sensors/datahub_sensors.py b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/sensors/datahub_sensors.py new file mode 100644 index 0000000000000..181ecc7b5c5cd --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/sensors/datahub_sensors.py @@ -0,0 +1,439 @@ +import os +import traceback +from typing import Dict, List, Optional, Sequence, Set, Tuple + +from dagster import ( + DagsterRunStatus, + EventLogEntry, + RunStatusSensorContext, + SensorDefinition, + SkipReason, + run_status_sensor, + sensor, +) +from dagster._core.definitions.asset_selection import CoercibleToAssetSelection +from dagster._core.definitions.sensor_definition import ( + DefaultSensorStatus, + RawSensorEvaluationFunctionReturn, +) +from dagster._core.definitions.target import ExecutableDefinition +from dagster._core.events import DagsterEventType, HandledOutputData, LoadedInputData +from dagster._core.execution.stats import RunStepKeyStatsSnapshot +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.graph.client import DataHubGraph +from datahub.metadata.schema_classes import SubTypesClass + +from datahub_dagster_plugin.client.dagster_generator import ( + DagsterEnvironment, + DagsterGenerator, + DatahubDagsterSourceConfig, +) + + +def make_datahub_sensor( + config: DatahubDagsterSourceConfig, + name: Optional[str] = None, + minimum_interval_seconds: Optional[int] = None, + description: Optional[str] = None, + job: Optional[ExecutableDefinition] = None, + jobs: Optional[Sequence[ExecutableDefinition]] = None, + default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, + asset_selection: Optional[CoercibleToAssetSelection] = None, + required_resource_keys: Optional[Set[str]] = None, +) -> SensorDefinition: + """Create a sensor on job status change emit lineage to DataHub. + + Args: + config (DatahubDagsterSourceConfig): DataHub Sensor config + name: (Optional[str]): The name of the sensor. Defaults to "datahub_sensor". + minimum_interval_seconds: (Optional[int]): The minimum number of seconds that will elapse + between sensor evaluations. + default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default + status can be overridden from Dagit or via the GraphQL API. + + Examples: + .. code-block:: python + + datahub_sensor = make_datahub_sensor( + config + ) + + @repository + def my_repo(): + return [my_job + datahub_sensor] + + + """ + + @sensor( + name=name, + minimum_interval_seconds=minimum_interval_seconds, + description=description, + job=job, + jobs=jobs, + default_status=default_status, + asset_selection=asset_selection, + required_resource_keys=required_resource_keys, + ) + def datahub_sensor(context): + """ + Sensor which instigate all run status sensors and trigger them based upon run status + """ + for each in DatahubSensors(config).sensors: + each.evaluate_tick(context) + return SkipReason("Trigger run status sensors if any new runs present...") + + return datahub_sensor + + +class DatahubSensors: + def __init__(self, config: Optional[DatahubDagsterSourceConfig] = None): + """ + Set dagster source configurations and initialize datahub emitter and dagster run status sensors + """ + if config: + self.config = config + else: + self.config = DatahubDagsterSourceConfig() + self.graph = DataHubGraph( + self.config.datahub_client_config, + ) + + self.graph.test_connection() + self.sensors: List[SensorDefinition] = [] + self.sensors.append( + run_status_sensor( + name="datahub_success_sensor", run_status=DagsterRunStatus.SUCCESS + )(self._emit_metadata) + ) + + self.sensors.append( + run_status_sensor( + name="datahub_failure_sensor", run_status=DagsterRunStatus.FAILURE + )(self._emit_metadata) + ) + + self.sensors.append( + run_status_sensor( + name="datahub_canceled_sensor", run_status=DagsterRunStatus.CANCELED + )(self._emit_metadata) + ) + + def get_dagster_environment( + self, context: RunStatusSensorContext + ) -> Optional[DagsterEnvironment]: + if ( + context.dagster_run.job_code_origin + and context.dagster_run.job_code_origin.repository_origin + and context.dagster_run.job_code_origin.repository_origin.code_pointer + ): + + code_pointer = ( + context.dagster_run.job_code_origin.repository_origin.code_pointer + ) + context.log.debug(f"code_pointer: {code_pointer}") + + if hasattr(code_pointer, "attribute"): + repository = code_pointer.attribute + else: + repository = None + + if hasattr(code_pointer, "module"): + module = code_pointer.module + else: + context.log.error("Unable to get Module") + return None + + dagster_environment = DagsterEnvironment( + is_cloud=os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", None) + is not None, + is_branch_deployment=( + True + if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", False) == 1 + else False + ), + branch=os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "prod"), + module=module, + repository=repository, + ) + return dagster_environment + else: + context.log.error("Unable to get Dagster Environment...") + return None + + def process_asset_logs( + self, + dagster_generator: DagsterGenerator, + log: EventLogEntry, + dataset_inputs: Dict[str, set], + dataset_outputs: Dict[str, set], + ) -> None: + + if not log.dagster_event or not log.step_key: + return + + if log.dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION: + if log.step_key not in dataset_outputs: + dataset_outputs[log.step_key] = set() + + materialization = log.asset_materialization + if not materialization: + return + + properties = { + key: str(value) for (key, value) in materialization.metadata.items() + } + asset_key = materialization.asset_key.path + dataset_urn = dagster_generator.emit_asset( + self.graph, asset_key, materialization.description, properties + ) + dataset_outputs[log.step_key].add(dataset_urn) + + elif log.dagster_event.event_type == DagsterEventType.ASSET_OBSERVATION: + if log.step_key not in dataset_inputs: + dataset_inputs[log.step_key] = set() + asset_observation = log.asset_observation + if not asset_observation: + return + + properties = { + key: str(value) + for (key, value) in asset_observation.metadata.items() # type: ignore + } + asset_key = asset_observation.asset_key.path # type: ignore + dataset_urn = dagster_generator.emit_asset( + self.graph, + asset_key, + asset_observation.description, + properties, # type: ignore + ) + dataset_inputs[log.step_key].add(dataset_urn) + + def process_handle_input_output( + self, + context: RunStatusSensorContext, + log: EventLogEntry, + dagster_generator: DagsterGenerator, + dataset_inputs: Dict[str, set], + dataset_outputs: Dict[str, set], + ) -> None: + if not log.dagster_event or not log.step_key: + return + + if ( + self.config.capture_input_output + and log.dagster_event.event_type == DagsterEventType.HANDLED_OUTPUT + ): + if log.step_key not in dataset_outputs: + dataset_outputs[log.step_key] = set() + + event_specific_data = log.dagster_event.event_specific_data + if isinstance(event_specific_data, HandledOutputData): + context.log.debug( + f"Output Path: {event_specific_data.metadata.get('path')}" + ) + metadata = event_specific_data.metadata.get("path") + context.log.debug(f"Metadata: {metadata}") + if not metadata: + return + urn = dagster_generator.metadata_resolver(metadata) + if urn: + context.log.debug(f"Output Urn: {urn}") + dataset_outputs[log.step_key].add(urn) + elif ( + self.config.capture_input_output + and log.dagster_event.event_type == DagsterEventType.LOADED_INPUT + ): + if log.step_key not in dataset_inputs: + dataset_inputs[log.step_key] = set() + event_specific_data = log.dagster_event.event_specific_data + if isinstance(event_specific_data, LoadedInputData): + context.log.debug( + f"Input Path: {event_specific_data.metadata.get('path')}" + ) + metadata = event_specific_data.metadata.get("path") + context.log.debug(f"Metadata: {metadata}") + if not metadata: + return + urn = dagster_generator.metadata_resolver(metadata) + if urn: + context.log.debug(f"Input Urn: {urn}") + dataset_inputs[log.step_key].add(urn) + + def process_dagster_logs( + self, context: RunStatusSensorContext, dagster_generator: DagsterGenerator + ) -> Tuple[Dict[str, set], Dict[str, set]]: + dataset_outputs: Dict[str, set] = {} + dataset_inputs: Dict[str, set] = {} + + logs = context.instance.all_logs( + context.dagster_run.run_id, + { + DagsterEventType.ASSET_MATERIALIZATION, + DagsterEventType.ASSET_OBSERVATION, + DagsterEventType.HANDLED_OUTPUT, + DagsterEventType.LOADED_INPUT, + }, + ) + + for log in logs: + if not log.dagster_event or not log.step_key: + continue + context.log.debug(f"Log: {log.step_key} - {log.dagster_event}") + context.log.debug(f"Event Type: {log.dagster_event.event_type}") + if self.config.capture_input_output: + self.process_handle_input_output( + context=context, + log=log, + dagster_generator=dagster_generator, + dataset_inputs=dataset_inputs, + dataset_outputs=dataset_outputs, + ) + + if self.config.capture_asset_materialization: + self.process_asset_logs( + dagster_generator=dagster_generator, + log=log, + dataset_inputs=dataset_inputs, + dataset_outputs=dataset_outputs, + ) + + return dataset_inputs, dataset_outputs + + @staticmethod + def merge_dicts(dict1: Dict[str, Set], dict2: Dict[str, Set]) -> Dict[str, Set]: + """ + Merge two dictionaries + """ + for key, value in dict2.items(): + if key in dict1: + dict1[key] = dict1[key].union(value) + else: + dict1[key] = value + return dict1 + + def _emit_metadata( + self, context: RunStatusSensorContext + ) -> RawSensorEvaluationFunctionReturn: + """ + Function to emit metadata for datahub rest. + """ + try: + context.log.info("Emitting metadata...") + + assert context.dagster_run.job_snapshot_id + assert context.dagster_run.execution_plan_snapshot_id + + dagster_environment = self.get_dagster_environment(context) + context.log.debug(f"dagster enivronment: {dagster_environment}") + if not dagster_environment: + return SkipReason( + "Unable to get Dagster Environment from DataHub Sensor" + ) + + context.log.debug(f"Dagster Environment: {dagster_environment}") + + dagster_generator = DagsterGenerator( + logger=context.log, + config=self.config, + dagster_environment=dagster_environment, + ) + + job_snapshot = context.instance.get_job_snapshot( + snapshot_id=context.dagster_run.job_snapshot_id + ) + + dataset_inputs: Dict[str, Set] = {} + dataset_outputs: Dict[str, Set] = {} + + if self.config.asset_lineage_extractor: + asset_lineages = self.config.asset_lineage_extractor( + context, dagster_generator, self.graph + ) + for key, value in asset_lineages.items(): + dataset_inputs[key] = dataset_inputs.get(key, set()).union( + value.inputs + ) + dataset_outputs[key] = dataset_outputs.get(key, set()).union( + value.outputs + ) + + ( + dataset_inputs_from_logs, + dataset_outputs_from_logs, + ) = self.process_dagster_logs(context, dagster_generator) + + dataset_inputs = DatahubSensors.merge_dicts( + dataset_inputs, dataset_inputs_from_logs + ) + dataset_outputs = DatahubSensors.merge_dicts( + dataset_outputs, dataset_outputs_from_logs + ) + + context.log.debug(f"Outputs: {dataset_outputs}") + # Emit dagster job entity which get mapped with datahub dataflow entity + dataflow = dagster_generator.generate_dataflow( + job_snapshot=job_snapshot, + env=self.config.env, + platform_instance=self.config.platform_instance, + ) + dataflow.emit(self.graph) + + # Emit dagster job run which get mapped with datahub data process instance entity + dagster_generator.emit_job_run( + graph=self.graph, + dataflow=dataflow, + run=context.dagster_run, + run_stats=context.instance.get_run_stats(context.dagster_run.run_id), + ) + + # Execution plan snapshot contains all steps(ops) dependency. + execution_plan_snapshot = context.instance.get_execution_plan_snapshot( + snapshot_id=context.dagster_run.execution_plan_snapshot_id + ) + + # Map step key with its run step stats + run_step_stats: Dict[str, RunStepKeyStatsSnapshot] = { + run_step_stat.step_key: run_step_stat + for run_step_stat in context.instance.get_run_step_stats( + context.dagster_run.run_id + ) + } + + # For all dagster ops present in job: + # Emit op entity which get mapped with datahub datajob entity. + # Emit op run which get mapped with datahub data process instance entity. + for op_def_snap in job_snapshot.node_defs_snapshot.op_def_snaps: + datajob = dagster_generator.generate_datajob( + job_snapshot=job_snapshot, + step_deps=execution_plan_snapshot.step_deps, + op_def_snap=op_def_snap, + env=self.config.env, + platform_instance=self.config.platform_instance, + output_datasets=dataset_outputs, + input_datasets=dataset_inputs, + ) + context.log.info(f"Generated Datajob: {datajob}") + datajob.emit(self.graph) + + self.graph.emit_mcp( + mcp=MetadataChangeProposalWrapper( + entityUrn=str(datajob.urn), + aspect=SubTypesClass( + typeNames=["Op"], + ), + ) + ) + + dagster_generator.emit_op_run( + graph=self.graph, + datajob=datajob, + run_step_stats=run_step_stats[op_def_snap.name], + ) + + return SkipReason("Pipeline metadata is emitted to DataHub") + except Exception as e: + context.log.error( + f"Error in emitting metadata to DataHub: {e}. Traceback: {traceback.format_exc()}" + ) + return SkipReason("Error in emitting metadata to DataHub") diff --git a/metadata-ingestion-modules/dagster-plugin/tests/integration/integration_test_dummy.py b/metadata-ingestion-modules/dagster-plugin/tests/integration/integration_test_dummy.py new file mode 100644 index 0000000000000..10cf3ad0a608a --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/tests/integration/integration_test_dummy.py @@ -0,0 +1,2 @@ +def test_dummy(): + pass diff --git a/metadata-ingestion-modules/dagster-plugin/tests/unit/test_dagster.py b/metadata-ingestion-modules/dagster-plugin/tests/unit/test_dagster.py new file mode 100644 index 0000000000000..ac46cfd86fbb9 --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/tests/unit/test_dagster.py @@ -0,0 +1,303 @@ +from unittest.mock import Mock, patch + +import pytest +from dagster import ( + DagsterInstance, + In, + Out, + SkipReason, + build_run_status_sensor_context, + build_sensor_context, + job, + op, +) +from datahub.api.entities.dataprocess.dataprocess_instance import ( + DataProcessInstanceKey, + InstanceRunResult, +) +from datahub.configuration.source_common import DEFAULT_ENV +from datahub.ingestion.graph.client import DatahubClientConfig + +from datahub_dagster_plugin.client.dagster_generator import DatahubDagsterSourceConfig +from datahub_dagster_plugin.sensors.datahub_sensors import ( + DatahubSensors, + make_datahub_sensor, +) + + +@patch("datahub.ingestion.graph.client.DataHubGraph", autospec=True) +@pytest.mark.skip(reason="disabling this test unti it will use proper golden files") +def test_datahub_sensor(mock_emit): + instance = DagsterInstance.ephemeral() + context = build_sensor_context(instance=instance) + mock_emit.return_value = Mock() + + config = DatahubDagsterSourceConfig( + datahub_client_config=DatahubClientConfig( + server="http://localhost:8081", + ), + dagster_url="http://localhost:3000", + ) + + datahub_sensor = make_datahub_sensor(config) + skip_reason = datahub_sensor(context) + assert isinstance(skip_reason, SkipReason) + + +@patch("datahub_dagster_plugin.sensors.datahub_sensors.DatahubClient", autospec=True) +@pytest.mark.skip(reason="disabling this test unti it will use proper golden files") +def test_emit_metadata(mock_emit): + mock_emitter = Mock() + mock_emit.return_value = mock_emitter + + @op( + out={ + "result": Out( + metadata={ + "datahub.outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableB,PROD)" + ] + } + ) + } + ) + def extract(): + results = [1, 2, 3, 4] + return results + + @op( + ins={ + "data": In( + metadata={ + "datahub.inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)" + ] + } + ) + } + ) + def transform(data): + results = [] + for each in data: + results.append(str(each)) + return results + + @job + def etl(): + transform(extract()) + + instance = DagsterInstance.ephemeral() + result = etl.execute_in_process(instance=instance) + + # retrieve the DagsterRun + dagster_run = result.dagster_run + + # retrieve a success event from the completed execution + dagster_event = result.get_job_success_event() + + # create the context + run_status_sensor_context = build_run_status_sensor_context( + sensor_name="my_email_sensor", + dagster_instance=instance, + dagster_run=dagster_run, + dagster_event=dagster_event, + ) + + DatahubSensors()._emit_metadata(run_status_sensor_context) + + expected_dataflow_urn = ( + f"urn:li:dataFlow:(dagster,{dagster_run.job_name},{DEFAULT_ENV})" + ) + assert mock_emitter.method_calls[1][1][0].aspectName == "dataFlowInfo" + assert mock_emitter.method_calls[1][1][0].entityUrn == expected_dataflow_urn + assert mock_emitter.method_calls[2][1][0].aspectName == "ownership" + assert mock_emitter.method_calls[2][1][0].entityUrn == expected_dataflow_urn + assert mock_emitter.method_calls[3][1][0].aspectName == "globalTags" + assert mock_emitter.method_calls[3][1][0].entityUrn == expected_dataflow_urn + + dpi_id = DataProcessInstanceKey( + cluster=DEFAULT_ENV, + orchestrator="dagster", + id=dagster_run.run_id, + ).guid() + assert ( + mock_emitter.method_calls[7][1][0].aspectName == "dataProcessInstanceProperties" + ) + assert ( + mock_emitter.method_calls[7][1][0].entityUrn + == f"urn:li:dataProcessInstance:{dpi_id}" + ) + assert ( + mock_emitter.method_calls[8][1][0].aspectName + == "dataProcessInstanceRelationships" + ) + assert ( + mock_emitter.method_calls[8][1][0].entityUrn + == f"urn:li:dataProcessInstance:{dpi_id}" + ) + assert ( + mock_emitter.method_calls[9][1][0].aspectName == "dataProcessInstanceRunEvent" + ) + assert ( + mock_emitter.method_calls[9][1][0].entityUrn + == f"urn:li:dataProcessInstance:{dpi_id}" + ) + assert ( + mock_emitter.method_calls[10][1][0].aspectName == "dataProcessInstanceRunEvent" + ) + assert ( + mock_emitter.method_calls[10][1][0].entityUrn + == f"urn:li:dataProcessInstance:{dpi_id}" + ) + assert ( + mock_emitter.method_calls[10][1][0].aspect.result.type + == InstanceRunResult.SUCCESS + ) + assert mock_emitter.method_calls[11][1][0].aspectName == "dataJobInfo" + assert ( + mock_emitter.method_calls[11][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},extract)" + ) + assert mock_emitter.method_calls[12][1][0].aspectName == "dataJobInputOutput" + assert ( + mock_emitter.method_calls[12][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},extract)" + ) + assert mock_emitter.method_calls[13][1][0].aspectName == "status" + assert ( + mock_emitter.method_calls[13][1][0].entityUrn + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableB,PROD)" + ) + assert mock_emitter.method_calls[14][1][0].aspectName == "ownership" + assert ( + mock_emitter.method_calls[14][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},extract)" + ) + assert mock_emitter.method_calls[15][1][0].aspectName == "globalTags" + assert ( + mock_emitter.method_calls[15][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},extract)" + ) + dpi_id = DataProcessInstanceKey( + cluster=DEFAULT_ENV, + orchestrator="dagster", + id=f"{dagster_run.run_id}.extract", + ).guid() + assert ( + mock_emitter.method_calls[21][1][0].aspectName + == "dataProcessInstanceProperties" + ) + assert ( + mock_emitter.method_calls[21][1][0].entityUrn + == f"urn:li:dataProcessInstance:{dpi_id}" + ) + assert ( + mock_emitter.method_calls[22][1][0].aspectName + == "dataProcessInstanceRelationships" + ) + assert ( + mock_emitter.method_calls[22][1][0].entityUrn + == f"urn:li:dataProcessInstance:{dpi_id}" + ) + assert mock_emitter.method_calls[23][1][0].aspectName == "dataProcessInstanceOutput" + assert ( + mock_emitter.method_calls[23][1][0].entityUrn + == f"urn:li:dataProcessInstance:{dpi_id}" + ) + assert mock_emitter.method_calls[24][1][0].aspectName == "status" + assert ( + mock_emitter.method_calls[24][1][0].entityUrn + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableB,PROD)" + ) + assert ( + mock_emitter.method_calls[25][1][0].aspectName == "dataProcessInstanceRunEvent" + ) + assert ( + mock_emitter.method_calls[25][1][0].entityUrn + == f"urn:li:dataProcessInstance:{dpi_id}" + ) + assert ( + mock_emitter.method_calls[26][1][0].aspectName == "dataProcessInstanceRunEvent" + ) + assert ( + mock_emitter.method_calls[26][1][0].entityUrn + == f"urn:li:dataProcessInstance:{dpi_id}" + ) + assert ( + mock_emitter.method_calls[26][1][0].aspect.result.type + == InstanceRunResult.SUCCESS + ) + assert mock_emitter.method_calls[27][1][0].aspectName == "dataJobInfo" + assert ( + mock_emitter.method_calls[27][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},transform)" + ) + assert mock_emitter.method_calls[28][1][0].aspectName == "dataJobInputOutput" + assert ( + mock_emitter.method_calls[28][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},transform)" + ) + assert mock_emitter.method_calls[29][1][0].aspectName == "status" + assert ( + mock_emitter.method_calls[29][1][0].entityUrn + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)" + ) + assert mock_emitter.method_calls[30][1][0].aspectName == "ownership" + assert ( + mock_emitter.method_calls[30][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},transform)" + ) + assert mock_emitter.method_calls[31][1][0].aspectName == "globalTags" + assert ( + mock_emitter.method_calls[31][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},transform)" + ) + dpi_id = DataProcessInstanceKey( + cluster=DEFAULT_ENV, + orchestrator="dagster", + id=f"{dagster_run.run_id}.transform", + ).guid() + assert ( + mock_emitter.method_calls[37][1][0].aspectName + == "dataProcessInstanceProperties" + ) + assert ( + mock_emitter.method_calls[37][1][0].entityUrn + == f"urn:li:dataProcessInstance:{dpi_id}" + ) + assert ( + mock_emitter.method_calls[38][1][0].aspectName + == "dataProcessInstanceRelationships" + ) + assert ( + mock_emitter.method_calls[38][1][0].entityUrn + == f"urn:li:dataProcessInstance:{dpi_id}" + ) + assert mock_emitter.method_calls[39][1][0].aspectName == "dataProcessInstanceInput" + assert ( + mock_emitter.method_calls[39][1][0].entityUrn + == f"urn:li:dataProcessInstance:{dpi_id}" + ) + assert mock_emitter.method_calls[40][1][0].aspectName == "status" + assert ( + mock_emitter.method_calls[40][1][0].entityUrn + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)" + ) + assert ( + mock_emitter.method_calls[41][1][0].aspectName == "dataProcessInstanceRunEvent" + ) + assert ( + mock_emitter.method_calls[41][1][0].entityUrn + == f"urn:li:dataProcessInstance:{dpi_id}" + ) + assert ( + mock_emitter.method_calls[42][1][0].aspectName == "dataProcessInstanceRunEvent" + ) + assert ( + mock_emitter.method_calls[42][1][0].entityUrn + == f"urn:li:dataProcessInstance:{dpi_id}" + ) + assert ( + mock_emitter.method_calls[42][1][0].aspect.result.type + == InstanceRunResult.SUCCESS + ) diff --git a/metadata-ingestion-modules/dagster-plugin/tests/unit/test_dummy.py b/metadata-ingestion-modules/dagster-plugin/tests/unit/test_dummy.py new file mode 100644 index 0000000000000..10cf3ad0a608a --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/tests/unit/test_dummy.py @@ -0,0 +1,2 @@ +def test_dummy(): + pass diff --git a/metadata-ingestion/developing.md b/metadata-ingestion/developing.md index 47e325171ddcc..9b4b4c56339b9 100644 --- a/metadata-ingestion/developing.md +++ b/metadata-ingestion/developing.md @@ -35,7 +35,16 @@ cd metadata-ingestion-modules/airflow-plugin source venv/bin/activate datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)" ``` +### (Optional) Set up your Python environment for developing on Dagster Plugin +From the repository root: + +```shell +cd metadata-ingestion-modules/dagster-plugin +../../gradlew :metadata-ingestion-modules:dagster-plugin:installDev +source venv/bin/activate +datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)" +``` ### Common setup issues Common issues (click to expand): diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py index a4b3779b73803..7a807bde2ed0a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py @@ -45,6 +45,7 @@ class Config: ) default_extension: Optional[str] = Field( + default=None, description="For files without extension it will assume the specified file type. If it is not set the files without extensions will be skipped.", ) diff --git a/metadata-ingestion/tests/integration/s3/golden-files/local/golden_mces_single_file.json b/metadata-ingestion/tests/integration/s3/golden-files/local/golden_mces_single_file.json index f54c62865bcde..d9a5b8b4a7eb8 100644 --- a/metadata-ingestion/tests/integration/s3/golden-files/local/golden_mces_single_file.json +++ b/metadata-ingestion/tests/integration/s3/golden-files/local/golden_mces_single_file.json @@ -18,7 +18,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -111,7 +112,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -132,7 +134,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -152,7 +155,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -167,7 +171,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -182,7 +187,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -199,7 +205,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -214,7 +221,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -234,7 +242,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -249,7 +258,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -264,7 +274,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -281,7 +292,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -296,7 +308,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -316,7 +329,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -336,7 +350,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -351,7 +366,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -366,7 +382,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -383,7 +400,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -398,7 +416,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -422,7 +441,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -442,7 +462,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -457,7 +478,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -472,7 +494,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -489,7 +512,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -504,7 +528,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -532,7 +557,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -552,7 +578,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -567,7 +594,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -582,7 +610,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -599,7 +628,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -614,7 +644,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -646,7 +677,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -666,7 +698,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -681,7 +714,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -696,7 +730,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -713,7 +748,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -728,7 +764,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -764,7 +801,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -784,7 +822,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -799,7 +838,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -814,7 +854,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -831,7 +872,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -846,7 +888,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -886,7 +929,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -906,7 +950,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -921,7 +966,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -936,7 +982,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -953,7 +1000,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -968,7 +1016,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -1012,7 +1061,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -1027,7 +1077,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -1377,7 +1428,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -1392,7 +1444,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } }, { @@ -1440,7 +1493,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "single_file.json" + "runId": "single_file.json", + "lastRunId": "no-run-id-provided" } } ] \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index ade0c818b130d..f553bf97ec14b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -59,6 +59,7 @@ include 'metadata-integration:java:openlineage-converter' include 'metadata-integration:java:spark-lineage-beta' include 'ingestion-scheduler' include 'metadata-ingestion-modules:airflow-plugin' +include 'metadata-ingestion-modules:dagster-plugin' include 'smoke-test' include 'metadata-auth:auth-api' include 'metadata-service:schema-registry-api'