-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
dbt 0.19.0 breaks dbt example and likely dagster-dbt #3616
Comments
Summary: This likely points to a breakage in dagster-dbt, but currently only the dbt example tests are failing. More info here: #3616. Test Plan: bk Reviewers: bob, max, johann Reviewed By: johann Subscribers: johann Differential Revision: https://dagster.phacility.com/D6189
Summary: This likely points to a breakage in dagster-dbt, but currently only the dbt example tests are failing. More info here: #3616. Test Plan: bk Reviewers: bob, max, johann Reviewed By: johann Subscribers: johann Differential Revision: https://dagster.phacility.com/D6189
👍 Hi - we are struggling with the same thing. I believe there was an update to the |
Experiencing the same issue. I extracted the import json
import subprocess
from typing import Any, Dict, Tuple
from dagster import (
Array,
Bool,
InputDefinition,
Noneable,
Nothing,
Output,
OutputDefinition,
Permissive,
StringSource,
solid,
check,
)
from dagster.config.field import Field
DEFAULT_DBT_EXECUTABLE = "dbt"
# The following config fields correspond to flags that apply to all dbt CLI commands. For details
# on dbt CLI flags, see
# https://github.com/fishtown-analytics/dbt/blob/1f8e29276e910c697588c43f08bc881379fff178/core/dbt/main.py#L260-L329
CLI_COMMON_FLAGS_CONFIG_SCHEMA = {
"project-dir": Field(
config=StringSource,
is_required=False,
description=(
"Which directory to look in for the dbt_project.yml file. Default is the current "
"working directory and its parents."
),
),
"profiles-dir": Field(
config=StringSource,
is_required=False,
description=(
"Which directory to look in for the profiles.yml file. Default = $DBT_PROFILES_DIR or "
"$HOME/.dbt"
),
),
"profile": Field(
config=StringSource,
is_required=False,
description="Which profile to load. Overrides setting in dbt_project.yml.",
),
"target": Field(
config=StringSource,
is_required=False,
description="Which target to load for the given profile.",
),
"vars": Field(
config=Permissive({}),
is_required=False,
description=(
"Supply variables to the project. This argument overrides variables defined in your "
"dbt_project.yml file. This argument should be a dictionary, eg. "
"{'my_variable': 'my_value'}"
),
),
"bypass-cache": Field(
config=bool,
is_required=False,
description="If set, bypass the adapter-level cache of database state",
default_value=False,
),
}
# The following config fields correspond to options that apply to all CLI solids, but should not be
# formatted as CLI flags.
CLI_COMMON_OPTIONS_CONFIG_SCHEMA = {
"warn-error": Field(
config=bool,
is_required=False,
description=(
"If dbt would normally warn, instead raise an exception. Examples include --models "
"that selects nothing, deprecations, configurations with no associated models, "
"invalid test configurations, and missing sources/refs in tests."
),
default_value=False,
),
"dbt_executable": Field(
config=StringSource,
is_required=False,
description="Path to the dbt executable. Default is {}".format(
DEFAULT_DBT_EXECUTABLE
),
default_value=DEFAULT_DBT_EXECUTABLE,
),
"ignore_handled_error": Field(
config=bool,
is_required=False,
description=(
"When True, will not raise an exception when the dbt CLI returns error code 1. "
"Default is False."
),
default_value=False,
),
}
CLI_CONFIG_SCHEMA = {
**CLI_COMMON_FLAGS_CONFIG_SCHEMA,
**CLI_COMMON_OPTIONS_CONFIG_SCHEMA,
}
CLI_COMMON_FLAGS = set(CLI_COMMON_FLAGS_CONFIG_SCHEMA.keys())
def passthrough_flags_only(solid_config, additional_flags):
return {
flag: solid_config[flag]
for flag in (CLI_COMMON_FLAGS | set(additional_flags))
if solid_config.get(flag) is not None
}
# dbt execution
def execute_cli(
executable: str,
command: Tuple[str, ...],
flags_dict: Dict[str, Any],
log: Any,
warn_error: bool,
ignore_handled_error: bool,
) -> Dict[str, Any]:
"""Executes a command on the dbt CLI in a subprocess."""
check.str_param(executable, "executable")
check.tuple_param(command, "command", of_type=str)
check.dict_param(flags_dict, "flags_dict", key_type=str)
check.bool_param(warn_error, "warn_error")
check.bool_param(ignore_handled_error, "ignore_handled_error")
# Format the dbt CLI flags in the command..
warn_error = ["--warn-error"] if warn_error else []
command_list = [executable, "--log-format", "json", *warn_error, *command]
for flag, value in flags_dict.items():
if not value:
continue
command_list.append(f"--{flag}")
if isinstance(value, bool):
# If a bool flag (and is True), the presence of the flag itself is enough.
continue
if isinstance(value, list):
check.list_param(value, f"config.{flag}", of_type=str)
command_list += value
continue
if isinstance(value, dict):
command_list.append(json.dumps(value))
continue
command_list.append(str(value))
# Execute the dbt CLI command in a subprocess.
command = " ".join(command_list)
log.info(f"Executing command: {command}")
return_code = 0
process = subprocess.Popen(command_list, stdout=subprocess.PIPE)
logs = []
output = []
for raw_line in process.stdout:
line = raw_line.decode("utf-8")
output.append(line)
try:
json_line = json.loads(line)
except json.JSONDecodeError:
log.info(line.rstrip())
else:
logs.append(json_line)
level = json_line.get("levelname", "").lower()
if hasattr(log, level):
getattr(log, level)(json_line.get("message", ""))
else:
log.info(line.rstrip())
process.wait()
return_code = process.returncode
log.info(
"dbt exited with return code {return_code}".format(return_code=return_code)
)
raw_output = "\n".join(output)
if return_code == 2 or return_code == 1:
raise Exception(raw_output)
return True
@solid(
description="A solid to invoke dbt run via CLI.",
input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)],
output_defs=[OutputDefinition(name="success", dagster_type=bool)],
config_schema={
**CLI_CONFIG_SCHEMA,
"threads": Field(
config=Noneable(int),
default_value=None,
is_required=False,
description=(
"Specify number of threads to use while executing models. Overrides settings "
"in profiles.yml."
),
),
"models": Field(
config=Noneable([str]),
default_value=None,
is_required=False,
description="The dbt models to run.",
),
"exclude": Field(
config=Noneable([str]),
default_value=None,
is_required=False,
description="The dbt models to exclude.",
),
"full-refresh": Field(
config=bool,
description=(
"If specified, DBT will drop incremental models and fully-recalculate the "
"incremental table from the model definition. (--full-refresh)"
),
is_required=False,
default_value=False,
),
"fail-fast": Field(
config=bool,
description="Stop execution upon a first failure. (--fail-fast)",
is_required=False,
default_value=False,
),
},
tags={"kind": "dbt"},
)
def dbt_cli_run(context):
"""This solid executes ``dbt run`` via the dbt CLI."""
cli_output = execute_cli(
context.solid_config["dbt_executable"],
command=("run",),
flags_dict=passthrough_flags_only(
context.solid_config,
("threads", "models", "exclude", "full-refresh", "fail-fast"),
),
log=context.log,
warn_error=context.solid_config["warn-error"],
ignore_handled_error=context.solid_config["ignore_handled_error"],
)
yield Output(cli_output or False, output_name="success") |
I think this is because we pin
The tricky part with dagster-dbt is that you can't pin the version of dbt that you use with dagster-dbt. This is because dbt is installed as a CLI tool, not as a Python package dep in dagster-dbt. @erik-attest Thanks for linking that issue from fishtown-analytics/dbt. Very useful 🙏 The release notes for dbt 0.19.0 indeed list out some GitHub issues and PRs that revamp their JSON schemas for dbt Artifacts. Starting with 0.19.0, dbt is documenting their JSON schemas for dbt Artifacts. Unlike before, these JSON schemas will be versioned, which is very nice 👌. However, the 0.19.0 JSON schema is a breaking change from <0.19.0, which is less nice. For curious readers, the JSON schemas can be found here. Currently, the Dagster solids in Next Steps The dbt Artifacts are a good use case for AssetMaterializations, so I don't want to just rip that out. I'm going to work on a diff that can support the JSON schemas before and after dbt 0.19.0, but I would imagine that we eventually want to stop supporting JSON schemas from dbt <0.19.0. I'd be open to hearing people's opinions about how long we should keep supporting JSON schemas from dbt<0.19.0. |
@chenbobby that sounds like a good approach to me. |
Summary: Intended to resolve [issue #3616](#3616) The JSON schema for dbt run results (among many other [dbt Artifacts](https://docs.getdbt.com/reference/artifacts/dbt-artifacts)) has [changed in dbt 0.19.0](https://github.com/fishtown-analytics/dbt/releases/tag/v0.19.0). dagster-dbt currently fails when parsing the output from `dbt run` and `dbt compile`. This diff sets missing fields as optional and should be compatible with dbt //before// and //after// 0.19.0 **To Do** - [ ] Decide on how long dagster-dbt will support dbt <0.19.0. Please comment below with your thoughts. - [ ] Include new metadata fields from dbt 0.19.0 in the dagster-dbt Outputs and AssetMaterializations Test Plan: buildkite Reviewers: sandyryza, max Reviewed By: max Differential Revision: https://dagster.phacility.com/D6407
I'm closing this issue as commit 7bb8db4 has landed on master to fix this breakage. There is still some schema compatibility work to be done, and I have created issue #3673 to track that work. |
Summary: Intended to resolve [issue dagster-io#3616](dagster-io#3616) The JSON schema for dbt run results (among many other [dbt Artifacts](https://docs.getdbt.com/reference/artifacts/dbt-artifacts)) has [changed in dbt 0.19.0](https://github.com/fishtown-analytics/dbt/releases/tag/v0.19.0). dagster-dbt currently fails when parsing the output from `dbt run` and `dbt compile`. This diff sets missing fields as optional and should be compatible with dbt //before// and //after// 0.19.0 **To Do** - [ ] Decide on how long dagster-dbt will support dbt <0.19.0. Please comment below with your thoughts. - [ ] Include new metadata fields from dbt 0.19.0 in the dagster-dbt Outputs and AssetMaterializations Test Plan: buildkite Reviewers: sandyryza, max Reviewed By: max Differential Revision: https://dagster.phacility.com/D6407
Summary
The following failure is occurring in dbt_example_tests/test_pipeline.py. It started failing on the same day that dbt 0.19.0 was released. Pinning dbt to <0.19.0 seems to fix it.
Weirdly, none of the dagster-dbt tests are failing, but this seems like a failure that should not be specific to the example.
Reproduction
Dagit UI/UX Issue Screenshots
Additional Info about Your Environment
Message from the maintainers:
Impacted by this bug? Give it a 👍. We factor engagement into prioritization.
The text was updated successfully, but these errors were encountered: