Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: implement the dbt-meshify connect command #104

Merged
merged 19 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions dbt_meshify/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,32 @@
help="The path to the dbt project to operate on. Defaults to the current directory.",
)

project_paths = click.option(
"--project-paths",
cls=MultiOption,
multiple=True,
type=tuple,
default=None,
help="The paths to the set of dbt projects to connect. Must supply 2+ paths.",
)

projects_dir = click.option(
"--projects-dir",
type=click.Path(exists=True),
default=None,
help="The path to a directory containing multiple dbt projects. Directory must contain 2+ projects.",
)

exclude_projects = click.option(
"--exclude-projects",
"-e",
cls=MultiOption,
multiple=True,
type=tuple,
default=None,
help="The set of dbt projects to exclude from the operation when using the --projects-dir option.",
)

create_path = click.option(
"--create-path",
type=click.Path(exists=False),
Expand Down
206 changes: 188 additions & 18 deletions dbt_meshify/linker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
from dataclasses import dataclass
from enum import Enum
from typing import Set
from typing import Set, Union

from dbt_meshify.dbt_projects import BaseDbtProject
from dbt.node_types import AccessType
from loguru import logger

from dbt_meshify.dbt_projects import BaseDbtProject, DbtProject
from dbt_meshify.exceptions import FatalMeshifyException, FileEditorException
from dbt_meshify.storage.dbt_project_editors import DbtProjectEditor, YMLOperationType
from dbt_meshify.storage.file_content_editors import DbtMeshConstructor


class ProjectDependencyType(str, Enum):
Expand All @@ -16,12 +22,14 @@ class ProjectDependencyType(str, Enum):
class ProjectDependency:
"""ProjectDependencies define shared resources between two different projects"""

upstream: str
downstream: str
upstream_resource: str
upstream_project_name: str
downstream_resource: str
downstream_project_name: str
type: ProjectDependencyType

def __key(self):
return self.upstream, self.downstream, self.type
return self.upstream_resource, self.downstream_resource, self.type

def __hash__(self):
return hash(self.__key())
Expand All @@ -45,12 +53,14 @@ def _find_relation_dependencies(
source_relations: Set[str], target_relations: Set[str]
) -> Set[str]:
"""
Identify dependencies between projects using shared relations.
Identify dependencies between projects using shared relation names.
"""
return source_relations.intersection(target_relations)

def _source_dependencies(
self, project: BaseDbtProject, other_project: BaseDbtProject
self,
project: Union[BaseDbtProject, DbtProject],
other_project: Union[BaseDbtProject, DbtProject],
) -> Set[ProjectDependency]:
"""
Identify source-hack dependencies between projects.
Expand All @@ -74,8 +84,10 @@ def _source_dependencies(

forward_dependencies = {
ProjectDependency(
upstream=project.model_relation_names[relation],
downstream=other_project.source_relation_names[relation],
upstream_resource=project.model_relation_names[relation],
upstream_project_name=project.name,
downstream_resource=other_project.source_relation_names[relation],
downstream_project_name=other_project.name,
type=ProjectDependencyType.Source,
)
for relation in relations
Expand All @@ -96,8 +108,10 @@ def _source_dependencies(

backward_dependencies = {
ProjectDependency(
upstream=other_project.model_relation_names[relation],
downstream=project.source_relation_names[relation],
upstream_resource=other_project.model_relation_names[relation],
upstream_project_name=other_project.name,
downstream_resource=project.source_relation_names[relation],
downstream_project_name=project.name,
type=ProjectDependencyType.Source,
)
for relation in backwards_relations
Expand All @@ -106,7 +120,9 @@ def _source_dependencies(
return forward_dependencies | backward_dependencies

def _package_dependencies(
self, project: BaseDbtProject, other_project: BaseDbtProject
self,
project: Union[BaseDbtProject, DbtProject],
other_project: Union[BaseDbtProject, DbtProject],
) -> Set[ProjectDependency]:
"""
Identify package-imported dependencies between projects.
Expand All @@ -115,9 +131,13 @@ def _package_dependencies(
project (Project B) imports Project A and references the model.
"""

if project.project_id not in other_project.installed_packages():
if (
project.project_id not in other_project.installed_packages()
and other_project.project_id not in project.installed_packages()
):
return set()

# find which models are in both manifests
relations = self._find_relation_dependencies(
source_relations={
model.relation_name
Expand All @@ -131,17 +151,68 @@ def _package_dependencies(
},
)

return {
# find the children of the shared models in the downstream project
package_children = [
{
'upstream_resource': project.model_relation_names[relation],
'downstream_resource': child,
}
for relation in relations
for child in other_project.manifest.child_map[project.model_relation_names[relation]]
]

forward_dependencies = {
ProjectDependency(
upstream=project.model_relation_names[relation],
downstream=other_project.model_relation_names[relation],
upstream_resource=child['upstream_resource'],
upstream_project_name=project.name,
downstream_resource=child['downstream_resource'],
downstream_project_name=other_project.name,
type=ProjectDependencyType.Package,
)
for relation in relations
for child in package_children
}

# find which models are in both manifests
backward_relations = self._find_relation_dependencies(
source_relations={
model.relation_name
for model in other_project.models.values()
if model.relation_name is not None
},
target_relations={
model.relation_name
for model in project.models.values()
if model.relation_name is not None
},
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not confident that we need this backwards_relations calculation. Given that we're working with models for both the sources and the targets, and given that intersection is undirected, this should yield the same results as the relations call on line 141.

Is my understanding accurate?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes! that's absolutely correct -- we can perform the reverse operation on the same set of relations in relations


# find the children of the shared models in the downstream project
backward_package_children = [
{
'upstream_resource': other_project.model_relation_names[relation],
'downstream_resource': child,
}
for relation in backward_relations
for child in project.manifest.child_map[other_project.model_relation_names[relation]]
]

backward_dependencies = {
ProjectDependency(
upstream_resource=child['upstream_resource'],
upstream_project_name=other_project.name,
downstream_resource=child['downstream_resource'],
downstream_project_name=project.name,
type=ProjectDependencyType.Package,
)
for child in backward_package_children
}

return forward_dependencies | backward_dependencies

def dependencies(
self, project: BaseDbtProject, other_project: BaseDbtProject
self,
project: Union[BaseDbtProject, DbtProject],
other_project: Union[BaseDbtProject, DbtProject],
) -> Set[ProjectDependency]:
"""Detect dependencies between two projects and return a list of resources shared."""

Expand All @@ -156,3 +227,102 @@ def dependencies(
dependencies.update(package_dependencies)

return dependencies

def resolve_dependency(
self,
dependency: ProjectDependency,
upstream_project: DbtProject,
downstream_project: DbtProject,
):
Comment on lines +217 to +222
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dave-connors-3: We previously discussed the appropriate place for this type of logic, and after some thinking I believe that this approach is the right one. Specifically, having the Linker class present a resolve_dependency method that uses ProjectDependencys to identify file updates. In a ChangeSet world, this method will return a ChangeSet that describes what file operations need to be made for each Ref update, which is beautiful to work with. ✅

upstream_manifest_entry = upstream_project.get_manifest_node(dependency.upstream_resource)
if not upstream_manifest_entry:
raise ValueError(
f"Could not find upstream resource {dependency.upstream_resource} in project {upstream_project.name}"
)
downstream_manifest_entry = downstream_project.get_manifest_node(
dependency.downstream_resource
)
upstream_catalog_entry = upstream_project.get_catalog_entry(dependency.upstream_resource)
upstream_mesh_constructor = DbtMeshConstructor(
project_path=upstream_project.path,
node=upstream_manifest_entry, # type: ignore
catalog=upstream_catalog_entry,
)
# upstream_editor = DbtProjectEditor(upstream_project)
dave-connors-3 marked this conversation as resolved.
Show resolved Hide resolved
downstream_mesh_constructor = DbtMeshConstructor(
project_path=downstream_project.path,
node=downstream_manifest_entry, # type: ignore
catalog=None,
)
downstream_editor = DbtProjectEditor(downstream_project)
# for either dependency type, add contracts and make model public
try:
upstream_mesh_constructor.add_model_access(AccessType.Public)
logger.success(
f"Successfully update model access : {dependency.upstream_resource} is now {AccessType.Public}"
)
except FatalMeshifyException as e:
logger.error(f"Failed to update model access: {dependency.upstream_resource}")
raise e
try:
upstream_mesh_constructor.add_model_contract()
logger.success(f"Successfully added contract to model: {dependency.upstream_resource}")
except FatalMeshifyException as e:
logger.error(f"Failed to add contract to model: {dependency.upstream_resource}")
raise e

if dependency.type == ProjectDependencyType.Source:
for child in downstream_project.manifest.child_map[dependency.downstream_resource]:
constructor = DbtMeshConstructor(
project_path=downstream_project.path,
node=downstream_project.get_manifest_node(child), # type: ignore
catalog=None,
)
try:
constructor.replace_source_with_refs(
source_unique_id=dependency.downstream_resource,
model_unique_id=dependency.upstream_resource,
)
logger.success(
f"Successfully replaced source function with ref to upstream resource: {dependency.downstream_resource} now calls {dependency.upstream_resource} directly"
)
except FileEditorException as e:
logger.error(
f"Failed to replace source function with ref to upstream resource"
)
raise e
try:
downstream_editor.update_resource_yml_entry(
downstream_mesh_constructor, operation_type=YMLOperationType.Delete
)
logger.success(
f"Successfully deleted unnecessary source: {dependency.downstream_resource}"
)
except FatalMeshifyException as e:
logger.error(f"Failed to delete unnecessary source")
raise e

if dependency.type == ProjectDependencyType.Package:
try:
downstream_mesh_constructor.update_model_refs(
model_name=upstream_manifest_entry.name,
project_name=dependency.upstream_project_name,
)
logger.success(
f"Successfully updated model refs: {dependency.downstream_resource} now references {dependency.upstream_resource}"
)
except FileEditorException as e:
logger.error(f"Failed to update model refs")
raise e

# for both types, add upstream project to downstream project's dependencies.yml
try:
downstream_editor.update_dependencies_yml(name=upstream_project.name)
logger.success(
f"Successfully added {dependency.upstream_project_name} to {dependency.downstream_project_name}'s dependencies.yml"
)
except FileEditorException as e:
logger.error(
f"Failed to add {dependency.upstream_project_name} to {dependency.downstream_project_name}'s dependencies.yml"
)
raise e
Loading