Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Children of multiple boundary nodes #148

Merged
merged 6 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbt_meshify/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def connect(
dependency,
project_map[dependency.upstream_project_name],
project_map[dependency.downstream_project_name],
change_set,
)
change_set.extend(changes)
except Exception as e:
Expand Down
5 changes: 4 additions & 1 deletion dbt_meshify/storage/dbt_project_editors.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def initialize(self) -> ChangeSet:
resource = subproject.get_manifest_node(unique_id)
if not resource:
raise KeyError(f"Resource {unique_id} not found in manifest")

if resource.resource_type in ["model", "test", "snapshot", "seed"]:
# ignore generic tests, as moving the yml entry will move the test too
if resource.resource_type == "test" and len(resource.unique_id.split(".")) == 4:
Expand All @@ -157,7 +158,9 @@ def initialize(self) -> ChangeSet:
logger.debug(
f"Updating ref functions for children of {resource.unique_id}..."
)
change_set.extend(reference_updater.update_child_refs(resource))
change_set.extend(
reference_updater.update_child_refs(resource, change_set)
)

logger.debug(
f"Moving {resource.unique_id} and associated YML to subproject {subproject.name}..."
Expand Down
34 changes: 31 additions & 3 deletions dbt_meshify/utilities/linker.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from dataclasses import dataclass
from enum import Enum
from typing import Set, Union
from typing import List, Optional, Set, Union

from dbt.contracts.graph.nodes import CompiledNode, ModelNode, SourceDefinition
from dbt.node_types import AccessType

from dbt_meshify.change import ChangeSet, EntityType, Operation, ResourceChange
from dbt_meshify.change import (
ChangeSet,
EntityType,
FileChange,
Operation,
ResourceChange,
)
from dbt_meshify.dbt_projects import BaseDbtProject, DbtProject, PathedProject
from dbt_meshify.utilities.contractor import Contractor
from dbt_meshify.utilities.dependencies import DependenciesUpdater
Expand Down Expand Up @@ -235,6 +241,7 @@ def resolve_dependency(
dependency: ProjectDependency,
upstream_project: DbtProject,
downstream_project: DbtProject,
current_change_set: Optional[ChangeSet] = None,
) -> ChangeSet:
upstream_manifest_entry = upstream_project.get_manifest_node(dependency.upstream_resource)
if not upstream_manifest_entry:
Expand Down Expand Up @@ -314,12 +321,33 @@ def resolve_dependency(
f"{upstream_manifest_entry.unique_id}"
)

if current_change_set:
previous_changes: List[FileChange] = [
change
for change in current_change_set.changes
if (
isinstance(change, FileChange)
and change.identifier == downstream_manifest_entry.name
and change.operation == Operation.Update
and change.entity_type == EntityType.Code
and change.path
== downstream_project.resolve_file_path(downstream_manifest_entry)
)
]
previous_change = previous_changes[-1] if previous_changes else None

code_to_update = (
previous_change.data
if (previous_change and previous_change.data)
else downstream_manifest_entry.raw_code
)

change_set.add(
reference_updater.generate_reference_update(
project_name=upstream_project.name,
downstream_node=downstream_manifest_entry,
upstream_node=upstream_manifest_entry,
code=downstream_manifest_entry.raw_code,
code=code_to_update,
downstream_project=downstream_project,
)
)
Expand Down
40 changes: 34 additions & 6 deletions dbt_meshify/utilities/references.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import re
from typing import Union
from typing import List, Optional, Union

from dbt.contracts.graph.nodes import CompiledNode
from loguru import logger
Expand Down Expand Up @@ -123,12 +123,14 @@ def generate_reference_update(
data=updated_code,
)

def update_child_refs(self, resource: CompiledNode) -> ChangeSet:
def update_child_refs(
self, resource: CompiledNode, current_change_set: Optional[ChangeSet] = None
) -> ChangeSet:
"""Generate a set of FileChanges to update child references"""

if not isinstance(self.project, DbtSubProject):
raise Exception(
"The `update_parent_refs` method requires the calling project to have a parent project to update."
"The `update_child_refs` method requires the calling project to have a parent project to update."
)

change_set = ChangeSet()
Expand All @@ -142,11 +144,28 @@ def update_child_refs(self, resource: CompiledNode) -> ChangeSet:
if not hasattr(model_node, "language") or not isinstance(model_node, CompiledNode):
continue

if current_change_set:
previous_changes: List[FileChange] = [
change
for change in current_change_set.changes
if (
isinstance(change, FileChange)
and change.identifier == model_node.name
and change.operation == Operation.Update
and change.entity_type == EntityType.Code
and change.path
== self.project.parent_project.resolve_file_path(model_node)
)
]
previous_change = previous_changes[-1] if previous_changes else None

change = self.generate_reference_update(
project_name=self.project.name,
upstream_node=resource,
downstream_node=model_node,
code=model_node.raw_code,
code=previous_change.data
if (previous_change and previous_change.data)
else model_node.raw_code,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach seems reasonable. Since this filtering is being done here and in linker.py, I think it might be appropriate to extract this logic into a function in this file (references.py) that both this method and the linker.py method can leverage.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totally agree - would this be a method on the ReferenceUpdater Class, or just a function outside the scope of the class?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking outside the ReferenceUpdater class scope.

downstream_project=self.project.parent_project,
)

Expand All @@ -170,8 +189,6 @@ def update_parent_refs(self, resource: CompiledNode) -> ChangeSet:

change_set = ChangeSet()

code = resource.raw_code

for model in upstream_models:
logger.debug(f"Updating reference to {model} in {resource.name}.")
model_node = self.project.get_manifest_node(model)
Expand All @@ -181,6 +198,17 @@ def update_parent_refs(self, resource: CompiledNode) -> ChangeSet:
# Don't process Resources missing a language attribute
if not hasattr(model_node, "language") or not isinstance(model_node, CompiledNode):
continue
previous_change = change_set.changes[-1] if change_set.changes else None

code = (
previous_change.data
if (
previous_change
and isinstance(previous_change, FileChange)
and previous_change.data
)
else resource.raw_code
)

change = self.generate_reference_update(
project_name=self.project.parent_project.name,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
with

upstream as (
select * from {{ ref('shared_model') }}
),

upstream1 as (
select * from {{ ref('new_model') }}
)

select 1 as id
7 changes: 7 additions & 0 deletions tests/integration/test_connect_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,18 @@ def test_connect_package(self, producer_project):

# assert that the source is replaced with a ref
x_proj_ref = "{{ ref('src_proj_a', 'shared_model') }}"
x_proj_ref_2 = "{{ ref('src_proj_a', 'new_model') }}"
child_sql = (
Path(copy_package_consumer_project_path) / "models" / "downstream_model.sql"
).read_text()
assert x_proj_ref in child_sql

child_sql_2 = (
Path(copy_package_consumer_project_path) / "models" / "downstream_model_2.sql"
).read_text()
assert x_proj_ref in child_sql_2
assert x_proj_ref_2 in child_sql_2

# assert that the dependecies yml was created with a pointer to the upstream project
assert (
"src_proj_a"
Expand Down
16 changes: 15 additions & 1 deletion tests/integration/test_dependency_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,21 @@ def test_linker_detects_package_import_dependencies(self, src_proj_a, dest_proj_
downstream_resource="model.dest_proj_a.downstream_model",
downstream_project_name="dest_proj_a",
type=ProjectDependencyType.Package,
)
),
ProjectDependency(
upstream_resource="model.src_proj_a.shared_model",
upstream_project_name="src_proj_a",
downstream_resource="model.dest_proj_a.downstream_model_2",
downstream_project_name="dest_proj_a",
type=ProjectDependencyType.Package,
),
ProjectDependency(
upstream_resource="model.src_proj_a.new_model",
upstream_project_name="src_proj_a",
downstream_resource="model.dest_proj_a.downstream_model_2",
downstream_project_name="dest_proj_a",
type=ProjectDependencyType.Package,
),
}

# This doesn't exist yet as of 1.5.0. We'll test it out once it's a thing.
Expand Down
73 changes: 73 additions & 0 deletions tests/integration/test_split_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,76 @@ def test_split_project_cycle(self):
assert result.exit_code == 1

teardown_test_project(dest_project_path)

def test_split_upstream_multiple_boundary_parents(self):
"""
Test that splitting out a project downstream of the base project splits as expected
"""
setup_test_project(src_project_path, dest_project_path)
runner = CliRunner()
result = runner.invoke(
cli,
[
"split",
"my_new_project",
"--project-path",
dest_project_path,
"--select",
"+stg_orders",
"+stg_order_items",
],
)

assert result.exit_code == 0
# selected model is moved to subdirectory
assert (
Path(dest_project_path) / "my_new_project" / "models" / "staging" / "stg_orders.sql"
).exists()
x_proj_ref_1 = "{{ ref('my_new_project', 'stg_orders') }}"
x_proj_ref_2 = "{{ ref('my_new_project', 'stg_order_items') }}"
child_sql = (Path(dest_project_path) / "models" / "marts" / "orders.sql").read_text()
# downstream model has all cross project refs updated
assert x_proj_ref_1 in child_sql
assert x_proj_ref_2 in child_sql

teardown_test_project(dest_project_path)

def test_split_downstream_multiple_boundary_parents(self):
"""
Test that splitting out a project downstream of the base project splits as expected
"""
setup_test_project(src_project_path, dest_project_path)
runner = CliRunner()
result = runner.invoke(
cli,
[
"split",
"my_new_project",
"--project-path",
dest_project_path,
"--select",
"orders+",
],
)

assert result.exit_code == 0
# selected model is moved to subdirectory
assert (
Path(dest_project_path) / "my_new_project" / "models" / "marts" / "orders.sql"
).exists()
x_proj_ref_1 = "{{ ref('split_proj', 'stg_orders') }}"
x_proj_ref_2 = "{{ ref('split_proj', 'stg_order_items') }}"
x_proj_ref_3 = "{{ ref('split_proj', 'stg_products') }}"
x_proj_ref_4 = "{{ ref('split_proj', 'stg_locations') }}"
x_proj_ref_5 = "{{ ref('split_proj', 'stg_supplies') }}"
child_sql = (
Path(dest_project_path) / "my_new_project" / "models" / "marts" / "orders.sql"
).read_text()
# downstream model has all cross project refs updated
assert x_proj_ref_1 in child_sql
assert x_proj_ref_2 in child_sql
assert x_proj_ref_3 in child_sql
assert x_proj_ref_4 in child_sql
assert x_proj_ref_5 in child_sql

teardown_test_project(dest_project_path)