Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Children of multiple boundary nodes #148

Merged
merged 6 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbt_meshify/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def connect(
dependency,
project_map[dependency.upstream_project_name],
project_map[dependency.downstream_project_name],
change_set,
)
change_set.extend(changes)
except Exception as e:
Expand Down
5 changes: 4 additions & 1 deletion dbt_meshify/storage/dbt_project_editors.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def initialize(self) -> ChangeSet:
resource = subproject.get_manifest_node(unique_id)
if not resource:
raise KeyError(f"Resource {unique_id} not found in manifest")

if resource.resource_type in ["model", "test", "snapshot", "seed"]:
# ignore generic tests, as moving the yml entry will move the test too
if resource.resource_type == "test" and len(resource.unique_id.split(".")) == 4:
Expand All @@ -156,7 +157,9 @@ def initialize(self) -> ChangeSet:
logger.debug(
f"Updating ref functions for children of {resource.unique_id}..."
)
change_set.extend(reference_updater.update_child_refs(resource))
change_set.extend(
reference_updater.update_child_refs(resource, change_set)
)

logger.debug(
f"Moving {resource.unique_id} and associated YML to subproject {subproject.name}..."
Expand Down
28 changes: 24 additions & 4 deletions dbt_meshify/utilities/linker.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
from dataclasses import dataclass
from enum import Enum
from typing import Set, Union
from typing import List, Optional, Set, Union

from dbt.contracts.graph.nodes import CompiledNode, ModelNode, SourceDefinition
from dbt.node_types import AccessType

from dbt_meshify.change import ChangeSet, EntityType, Operation, ResourceChange
from dbt_meshify.change import (
ChangeSet,
EntityType,
FileChange,
Operation,
ResourceChange,
)
from dbt_meshify.dbt_projects import BaseDbtProject, DbtProject, PathedProject
from dbt_meshify.utilities.contractor import Contractor
from dbt_meshify.utilities.dependencies import DependenciesUpdater
from dbt_meshify.utilities.grouper import ResourceGrouper
from dbt_meshify.utilities.references import ReferenceUpdater
from dbt_meshify.utilities.references import ReferenceUpdater, get_latest_file_change


class ProjectDependencyType(str, Enum):
Expand Down Expand Up @@ -235,6 +241,7 @@ def resolve_dependency(
dependency: ProjectDependency,
upstream_project: DbtProject,
downstream_project: DbtProject,
current_change_set: Optional[ChangeSet] = None,
) -> ChangeSet:
upstream_manifest_entry = upstream_project.get_manifest_node(dependency.upstream_resource)
if not upstream_manifest_entry:
Expand Down Expand Up @@ -314,12 +321,25 @@ def resolve_dependency(
f"{upstream_manifest_entry.unique_id}"
)

if current_change_set:
previous_change = get_latest_file_change(
changeset=current_change_set,
identifier=downstream_manifest_entry.name,
path=downstream_project.resolve_file_path(downstream_manifest_entry),
)

code_to_update = (
previous_change.data
if (previous_change and previous_change.data)
else downstream_manifest_entry.raw_code
)

Comment on lines +324 to +336
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like how this flows! Thanks for making the change.

change_set.add(
reference_updater.generate_reference_update(
project_name=upstream_project.name,
downstream_node=downstream_manifest_entry,
upstream_node=upstream_manifest_entry,
code=downstream_manifest_entry.raw_code,
code=code_to_update,
downstream_project=downstream_project,
)
)
Expand Down
57 changes: 51 additions & 6 deletions dbt_meshify/utilities/references.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re
from typing import Union
from pathlib import Path
from typing import List, Optional, Union

from dbt.contracts.graph.nodes import CompiledNode
from loguru import logger
Expand All @@ -8,6 +9,27 @@
from dbt_meshify.dbt_projects import DbtProject, DbtSubProject, PathedProject


def get_latest_file_change(
changeset: ChangeSet,
path: Path,
identifier: str,
entity_type: EntityType = EntityType.Code,
operation: Operation = Operation.Update,
) -> Optional[FileChange]:
previous_changes: List[FileChange] = [
change
for change in changeset.changes
if (
isinstance(change, FileChange)
and change.identifier == identifier
and change.operation == operation
and change.entity_type == entity_type
and change.path == path
)
]
return previous_changes[-1] if previous_changes else None


class ReferenceUpdater:
def __init__(self, project: Union[DbtSubProject, DbtProject]):
self.project = project
Expand Down Expand Up @@ -123,12 +145,14 @@ def generate_reference_update(
data=updated_code,
)

def update_child_refs(self, resource: CompiledNode) -> ChangeSet:
def update_child_refs(
self, resource: CompiledNode, current_change_set: Optional[ChangeSet] = None
) -> ChangeSet:
"""Generate a set of FileChanges to update child references"""

if not isinstance(self.project, DbtSubProject):
raise Exception(
"The `update_parent_refs` method requires the calling project to have a parent project to update."
"The `update_child_refs` method requires the calling project to have a parent project to update."
)

change_set = ChangeSet()
Expand All @@ -142,11 +166,23 @@ def update_child_refs(self, resource: CompiledNode) -> ChangeSet:
if not hasattr(model_node, "language") or not isinstance(model_node, CompiledNode):
continue

if current_change_set:
previous_change = get_latest_file_change(
changeset=current_change_set,
identifier=model_node.name,
path=self.project.parent_project.resolve_file_path(model_node),
)
code = (
previous_change.data
if (previous_change and previous_change.data)
else model_node.raw_code
)

change = self.generate_reference_update(
project_name=self.project.name,
upstream_node=resource,
downstream_node=model_node,
code=model_node.raw_code,
code=code,
downstream_project=self.project.parent_project,
)

Expand All @@ -170,8 +206,6 @@ def update_parent_refs(self, resource: CompiledNode) -> ChangeSet:

change_set = ChangeSet()

code = resource.raw_code

for model in upstream_models:
logger.debug(f"Updating reference to {model} in {resource.name}.")
model_node = self.project.get_manifest_node(model)
Expand All @@ -181,6 +215,17 @@ def update_parent_refs(self, resource: CompiledNode) -> ChangeSet:
# Don't process Resources missing a language attribute
if not hasattr(model_node, "language") or not isinstance(model_node, CompiledNode):
continue
previous_change = change_set.changes[-1] if change_set.changes else None

code = (
previous_change.data
if (
previous_change
and isinstance(previous_change, FileChange)
and previous_change.data
)
else resource.raw_code
)

change = self.generate_reference_update(
project_name=self.project.parent_project.name,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
with

upstream as (
select * from {{ ref('shared_model') }}
),

upstream1 as (
select * from {{ ref('new_model') }}
)

select 1 as id
7 changes: 7 additions & 0 deletions tests/integration/test_connect_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,18 @@ def test_connect_package(self, producer_project):

# assert that the source is replaced with a ref
x_proj_ref = "{{ ref('src_proj_a', 'shared_model') }}"
x_proj_ref_2 = "{{ ref('src_proj_a', 'new_model') }}"
child_sql = (
Path(copy_package_consumer_project_path) / "models" / "downstream_model.sql"
).read_text()
assert x_proj_ref in child_sql

child_sql_2 = (
Path(copy_package_consumer_project_path) / "models" / "downstream_model_2.sql"
).read_text()
assert x_proj_ref in child_sql_2
assert x_proj_ref_2 in child_sql_2

# assert that the dependecies yml was created with a pointer to the upstream project
assert (
"src_proj_a"
Expand Down
16 changes: 15 additions & 1 deletion tests/integration/test_dependency_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,21 @@ def test_linker_detects_package_import_dependencies(self, src_proj_a, dest_proj_
downstream_resource="model.dest_proj_a.downstream_model",
downstream_project_name="dest_proj_a",
type=ProjectDependencyType.Package,
)
),
ProjectDependency(
upstream_resource="model.src_proj_a.shared_model",
upstream_project_name="src_proj_a",
downstream_resource="model.dest_proj_a.downstream_model_2",
downstream_project_name="dest_proj_a",
type=ProjectDependencyType.Package,
),
ProjectDependency(
upstream_resource="model.src_proj_a.new_model",
upstream_project_name="src_proj_a",
downstream_resource="model.dest_proj_a.downstream_model_2",
downstream_project_name="dest_proj_a",
type=ProjectDependencyType.Package,
),
}

# This doesn't exist yet as of 1.5.0. We'll test it out once it's a thing.
Expand Down
73 changes: 73 additions & 0 deletions tests/integration/test_split_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,76 @@ def test_split_public_leaf_nodes(self, project):
Path(dest_project_path) / "my_new_project" / "models" / "marts" / "__models.yml"
).read_text()
)

def test_split_upstream_multiple_boundary_parents(self):
"""
Test that splitting out a project downstream of the base project splits as expected
"""
setup_test_project(src_project_path, dest_project_path)
runner = CliRunner()
result = runner.invoke(
cli,
[
"split",
"my_new_project",
"--project-path",
dest_project_path,
"--select",
"+stg_orders",
"+stg_order_items",
],
)

assert result.exit_code == 0
# selected model is moved to subdirectory
assert (
Path(dest_project_path) / "my_new_project" / "models" / "staging" / "stg_orders.sql"
).exists()
x_proj_ref_1 = "{{ ref('my_new_project', 'stg_orders') }}"
x_proj_ref_2 = "{{ ref('my_new_project', 'stg_order_items') }}"
child_sql = (Path(dest_project_path) / "models" / "marts" / "orders.sql").read_text()
# downstream model has all cross project refs updated
assert x_proj_ref_1 in child_sql
assert x_proj_ref_2 in child_sql

teardown_test_project(dest_project_path)

def test_split_downstream_multiple_boundary_parents(self):
"""
Test that splitting out a project downstream of the base project splits as expected
"""
setup_test_project(src_project_path, dest_project_path)
runner = CliRunner()
result = runner.invoke(
cli,
[
"split",
"my_new_project",
"--project-path",
dest_project_path,
"--select",
"orders+",
],
)

assert result.exit_code == 0
# selected model is moved to subdirectory
assert (
Path(dest_project_path) / "my_new_project" / "models" / "marts" / "orders.sql"
).exists()
x_proj_ref_1 = "{{ ref('split_proj', 'stg_orders') }}"
x_proj_ref_2 = "{{ ref('split_proj', 'stg_order_items') }}"
x_proj_ref_3 = "{{ ref('split_proj', 'stg_products') }}"
x_proj_ref_4 = "{{ ref('split_proj', 'stg_locations') }}"
x_proj_ref_5 = "{{ ref('split_proj', 'stg_supplies') }}"
child_sql = (
Path(dest_project_path) / "my_new_project" / "models" / "marts" / "orders.sql"
).read_text()
# downstream model has all cross project refs updated
assert x_proj_ref_1 in child_sql
assert x_proj_ref_2 in child_sql
assert x_proj_ref_3 in child_sql
assert x_proj_ref_4 in child_sql
assert x_proj_ref_5 in child_sql

teardown_test_project(dest_project_path)