Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restrict python submission #5822

Merged
merged 11 commits into from
Sep 20, 2022
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ exclude: ^test/

# Force all unspecified python hooks to run python 3.8
default_language_version:
python: python3.8
python: python3

repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
Expand Down
1 change: 0 additions & 1 deletion core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,6 @@ def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]:
def default_python_submission_method(self) -> str:
raise NotImplementedError("default_python_submission_method is not specified")

@available.parse_none
@log_code_execution
def submit_python_job(self, parsed_model: dict, compiled_code: str) -> AdapterResponse:
submission_method = parsed_model["config"].get(
Expand Down
7 changes: 4 additions & 3 deletions core/dbt/clients/jinja.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from dbt.clients._jinja_blocks import BlockIterator, BlockData, BlockTag
from dbt.contracts.graph.compiled import CompiledGenericTestNode
from dbt.contracts.graph.parsed import ParsedGenericTestNode

from dbt.exceptions import (
InternalException,
raise_compiler_error,
Expand Down Expand Up @@ -305,13 +306,13 @@ def exception_handler(self) -> Iterator[None]:
@contextmanager
def track_call(self):
# This is only called from __call__
if self.stack is None or self.node is None:
if self.stack is None:
yield
else:
unique_id = self.macro.unique_id
depth = self.stack.depth
# only mark depth=0 as a dependency
if depth == 0:
# only mark depth=0 as a dependency, when creating this dependency we don't pass in stack
if depth == 0 and self.node:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gshank the depth is being kept as 0 because of the comment in I updated

self.node.depends_on.add_macro(unique_id)
self.stack.push(unique_id)
try:
Expand Down
1 change: 1 addition & 0 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def _create_node_context(

context = generate_runtime_model_context(node, self.config, manifest)
context.update(extra_context)

if isinstance(node, CompiledGenericTestNode):
# for test nodes, add a special keyword args value to the context
jinja.add_rendered_test_kwargs(context, node)
Expand Down
5 changes: 5 additions & 0 deletions core/dbt/context/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dbt.contracts.connection import AdapterRequiredConfig
from dbt.contracts.graph.manifest import Manifest
from dbt.context.macro_resolver import TestMacroNamespace
from .base import contextproperty


from .configured import ConfiguredContext
Expand Down Expand Up @@ -66,6 +67,10 @@ def to_dict(self):
dct.update(self.namespace)
return dct

@contextproperty
def context_macro_stack(self):
return self.macro_stack


class QueryHeaderContext(ManifestContext):
def __init__(self, config: AdapterRequiredConfig, manifest: Manifest) -> None:
Expand Down
13 changes: 13 additions & 0 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,19 @@ def selected_resources(self) -> List[str]:
"""
return selected_resources.SELECTED_RESOURCES

@contextmember
def submit_python_job(self, parsed_model: Dict, compiled_code: str) -> AdapterResponse:
# Check macro_stack and that the unique id is for a materialization macro
if not (
self.context_macro_stack.depth == 2
and self.context_macro_stack.call_stack[1] == "macro.dbt.statement"
and "materialization" in self.context_macro_stack.call_stack[0]
):
raise RuntimeException(
f"submit_python_job is not intended to be called here, at model {parsed_model['alias']}, with macro call_stack {self.context_macro_stack.call_stack}."
)
return self.adapter.submit_python_job(parsed_model, compiled_code)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: Will adapter.submit_python_job still be callable (as a classmethod) from within the Jinja context? Would we need to decorate it as "unavailable"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is no longer available from within Jinja context, we make functions available using @available decorator, and I removed that decorator for submit_python_job function. Also have a snowflake PR opened for it(given that snowflake actually submit things slightly different). Gonna also sync with the dbt-databricks maintainer to make sure they don't add that @available decorator anymore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



class MacroContext(ProviderContext):
"""Internally, macros can be executed like nodes, with some restrictions:
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/include/global_project/macros/etc/statement.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ The macro override naming method (spark__statement) only works for macros which
{%- if language == 'sql'-%}
{%- set res, table = adapter.execute(compiled_code, auto_begin=auto_begin, fetch=fetch_result) -%}
{%- elif language == 'python' -%}
{%- set res = adapter.submit_python_job(model, compiled_code) -%}
{%- set res = submit_python_job(model, compiled_code) -%}
{#-- TODO: What should table be for python models? --#}
{%- set table = None -%}
{%- else -%}
Expand Down
4 changes: 3 additions & 1 deletion core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,9 @@ def execute(self, model, manifest):

hook_ctx = self.adapter.pre_model_hook(context_config)
try:
result = MacroGenerator(materialization_macro, context)()
result = MacroGenerator(
materialization_macro, context, stack=context["context_macro_stack"]
)()
finally:
self.adapter.post_model_hook(context_config, hook_ctx)

Expand Down
5 changes: 3 additions & 2 deletions test/unit/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def assert_has_keys(required_keys: Set[str], maybe_keys: Set[str], ctx: Dict[str
REQUIRED_TARGET_KEYS = REQUIRED_BASE_KEYS | {"target"}
REQUIRED_DOCS_KEYS = REQUIRED_TARGET_KEYS | {"project_name"} | {"doc"}
MACROS = frozenset({"macro_a", "macro_b", "root", "dbt"})
REQUIRED_QUERY_HEADER_KEYS = REQUIRED_TARGET_KEYS | {"project_name"} | MACROS
REQUIRED_QUERY_HEADER_KEYS = REQUIRED_TARGET_KEYS | {"project_name", "context_macro_stack"} | MACROS
REQUIRED_MACRO_KEYS = REQUIRED_QUERY_HEADER_KEYS | {
"_sql_results",
"load_result",
Expand Down Expand Up @@ -239,8 +239,9 @@ def assert_has_keys(required_keys: Set[str], maybe_keys: Set[str], ctx: Dict[str
"adapter_macro",
"selected_resources",
"invocation_args_dict",
"submit_python_job"
}
REQUIRED_MODEL_KEYS = REQUIRED_MACRO_KEYS | {"this", "compiled_code",}
REQUIRED_MODEL_KEYS = REQUIRED_MACRO_KEYS | {"this", "compiled_code"}
MAYBE_KEYS = frozenset({"debug"})


Expand Down