-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
make schemas available to on-run-end hooks (#908) #1028
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -247,13 +247,13 @@ def execute(self, compiled_node, manifest): | |
|
||
def compile(self, manifest): | ||
return self._compile_node(self.adapter, self.config, self.node, | ||
manifest) | ||
manifest, {}) | ||
|
||
@classmethod | ||
def _compile_node(cls, adapter, config, node, manifest): | ||
def _compile_node(cls, adapter, config, node, manifest, extra_context): | ||
compiler = dbt.compilation.Compiler(config) | ||
node = compiler.compile_node(node, manifest) | ||
node = cls._inject_runtime_config(adapter, node) | ||
node = compiler.compile_node(node, manifest, extra_context) | ||
node = cls._inject_runtime_config(adapter, node, extra_context) | ||
|
||
if(node.injected_sql is not None and | ||
not (dbt.utils.is_type(node, NodeType.Archive))): | ||
|
@@ -271,9 +271,10 @@ def _compile_node(cls, adapter, config, node, manifest): | |
return node | ||
|
||
@classmethod | ||
def _inject_runtime_config(cls, adapter, node): | ||
def _inject_runtime_config(cls, adapter, node, extra_context): | ||
wrapped_sql = node.wrapped_sql | ||
context = cls._node_context(adapter, node) | ||
context.update(extra_context) | ||
sql = dbt.clients.jinja.get_rendered(wrapped_sql, context) | ||
node.wrapped_sql = sql | ||
return node | ||
|
@@ -310,7 +311,7 @@ def raise_on_first_error(self): | |
return False | ||
|
||
@classmethod | ||
def run_hooks(cls, config, adapter, manifest, hook_type): | ||
def run_hooks(cls, config, adapter, manifest, hook_type, extra_context): | ||
|
||
nodes = manifest.nodes.values() | ||
hooks = get_nodes_by_tags(nodes, {hook_type}, NodeType.Operation) | ||
|
@@ -328,7 +329,8 @@ def run_hooks(cls, config, adapter, manifest, hook_type): | |
# Also, consider configuring psycopg2 (and other adapters?) to | ||
# ensure that a transaction is only created if dbt initiates it. | ||
adapter.clear_transaction(model_name) | ||
compiled = cls._compile_node(adapter, config, hook, manifest) | ||
compiled = cls._compile_node(adapter, config, hook, manifest, | ||
extra_context) | ||
statement = compiled.wrapped_sql | ||
|
||
hook_index = hook.get('index', len(hooks)) | ||
|
@@ -346,10 +348,10 @@ def run_hooks(cls, config, adapter, manifest, hook_type): | |
adapter.release_connection(model_name) | ||
|
||
@classmethod | ||
def safe_run_hooks(cls, config, adapter, manifest, hook_type): | ||
def safe_run_hooks(cls, config, adapter, manifest, hook_type, | ||
extra_context): | ||
try: | ||
cls.run_hooks(config, adapter, manifest, hook_type) | ||
|
||
cls.run_hooks(config, adapter, manifest, hook_type, extra_context) | ||
except dbt.exceptions.RuntimeException: | ||
logger.info("Database error while running {}".format(hook_type)) | ||
raise | ||
|
@@ -376,7 +378,7 @@ def populate_adapter_cache(cls, config, adapter, manifest): | |
@classmethod | ||
def before_run(cls, config, adapter, manifest): | ||
cls.populate_adapter_cache(config, adapter, manifest) | ||
cls.safe_run_hooks(config, adapter, manifest, RunHookType.Start) | ||
cls.safe_run_hooks(config, adapter, manifest, RunHookType.Start, {}) | ||
cls.create_schemas(config, adapter, manifest) | ||
|
||
@classmethod | ||
|
@@ -397,7 +399,15 @@ def print_results_line(cls, results, execution_time): | |
|
||
@classmethod | ||
def after_run(cls, config, adapter, results, manifest): | ||
cls.safe_run_hooks(config, adapter, manifest, RunHookType.End) | ||
# in on-run-end hooks, provide the value 'schemas', which is a list of | ||
# unique schemas that successfully executed models were in | ||
# errored failed skipped | ||
schemas = list(set( | ||
r.node.schema for r in results | ||
if not any((r.errored, r.failed, r.skipped)) | ||
)) | ||
cls.safe_run_hooks(config, adapter, manifest, RunHookType.End, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would it be crazy to just add the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it would be crazy at all, I wrote it as a dict for pretty much that reason. I'll add it in. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. haha, cool, let's do it :) |
||
{'schemas': schemas, 'results': results}) | ||
|
||
@classmethod | ||
def after_hooks(cls, config, adapter, results, manifest, elapsed): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couldn't the runtime context generate the list of schemas from the list of nodes? is there a reason to do this in on-run-end hooks based on the list of results, rather than here from the list of nodes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you run with models selected, this way we only get the actually executed models.