Skip to content

Commit

Permalink
Merge pull request #104 from dbt-labs/connect-command
Browse files Browse the repository at this point in the history
Feature: implement the `dbt-meshify connect` command
  • Loading branch information
dave-connors-3 authored Aug 8, 2023
2 parents 97f43bb + d29b431 commit d66e4fc
Show file tree
Hide file tree
Showing 10 changed files with 606 additions and 138 deletions.
26 changes: 26 additions & 0 deletions dbt_meshify/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,32 @@
help="The path to the dbt project to operate on. Defaults to the current directory.",
)

project_paths = click.option(
"--project-paths",
cls=MultiOption,
multiple=True,
type=tuple,
default=None,
help="The paths to the set of dbt projects to connect. Must supply 2+ paths.",
)

projects_dir = click.option(
"--projects-dir",
type=click.Path(exists=True),
default=None,
help="The path to a directory containing multiple dbt projects. Directory must contain 2+ projects.",
)

exclude_projects = click.option(
"--exclude-projects",
"-e",
cls=MultiOption,
multiple=True,
type=tuple,
default=None,
help="The set of dbt projects to exclude from the operation when using the --projects-dir option.",
)

create_path = click.option(
"--create-path",
type=click.Path(exists=False),
Expand Down
190 changes: 173 additions & 17 deletions dbt_meshify/linker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
from dataclasses import dataclass
from enum import Enum
from typing import Set
from typing import Set, Union

from dbt_meshify.dbt_projects import BaseDbtProject
from dbt.node_types import AccessType
from loguru import logger

from dbt_meshify.dbt_projects import BaseDbtProject, DbtProject
from dbt_meshify.exceptions import FatalMeshifyException, FileEditorException
from dbt_meshify.storage.dbt_project_editors import DbtProjectEditor, YMLOperationType
from dbt_meshify.storage.file_content_editors import DbtMeshConstructor


class ProjectDependencyType(str, Enum):
Expand All @@ -16,12 +22,14 @@ class ProjectDependencyType(str, Enum):
class ProjectDependency:
"""ProjectDependencies define shared resources between two different projects"""

upstream: str
downstream: str
upstream_resource: str
upstream_project_name: str
downstream_resource: str
downstream_project_name: str
type: ProjectDependencyType

def __key(self):
return self.upstream, self.downstream, self.type
return self.upstream_resource, self.downstream_resource, self.type

def __hash__(self):
return hash(self.__key())
Expand All @@ -45,12 +53,14 @@ def _find_relation_dependencies(
source_relations: Set[str], target_relations: Set[str]
) -> Set[str]:
"""
Identify dependencies between projects using shared relations.
Identify dependencies between projects using shared relation names.
"""
return source_relations.intersection(target_relations)

def _source_dependencies(
self, project: BaseDbtProject, other_project: BaseDbtProject
self,
project: Union[BaseDbtProject, DbtProject],
other_project: Union[BaseDbtProject, DbtProject],
) -> Set[ProjectDependency]:
"""
Identify source-hack dependencies between projects.
Expand All @@ -74,8 +84,10 @@ def _source_dependencies(

forward_dependencies = {
ProjectDependency(
upstream=project.model_relation_names[relation],
downstream=other_project.source_relation_names[relation],
upstream_resource=project.model_relation_names[relation],
upstream_project_name=project.name,
downstream_resource=other_project.source_relation_names[relation],
downstream_project_name=other_project.name,
type=ProjectDependencyType.Source,
)
for relation in relations
Expand All @@ -96,8 +108,10 @@ def _source_dependencies(

backward_dependencies = {
ProjectDependency(
upstream=other_project.model_relation_names[relation],
downstream=project.source_relation_names[relation],
upstream_resource=other_project.model_relation_names[relation],
upstream_project_name=other_project.name,
downstream_resource=project.source_relation_names[relation],
downstream_project_name=project.name,
type=ProjectDependencyType.Source,
)
for relation in backwards_relations
Expand All @@ -106,7 +120,9 @@ def _source_dependencies(
return forward_dependencies | backward_dependencies

def _package_dependencies(
self, project: BaseDbtProject, other_project: BaseDbtProject
self,
project: Union[BaseDbtProject, DbtProject],
other_project: Union[BaseDbtProject, DbtProject],
) -> Set[ProjectDependency]:
"""
Identify package-imported dependencies between projects.
Expand All @@ -115,9 +131,13 @@ def _package_dependencies(
project (Project B) imports Project A and references the model.
"""

if project.project_id not in other_project.installed_packages():
if (
project.project_id not in other_project.installed_packages()
and other_project.project_id not in project.installed_packages()
):
return set()

# find which models are in both manifests
relations = self._find_relation_dependencies(
source_relations={
model.relation_name
Expand All @@ -131,17 +151,54 @@ def _package_dependencies(
},
)

return {
# find the children of the shared models in the downstream project
package_children = [
{
'upstream_resource': project.model_relation_names[relation],
'downstream_resource': child,
}
for relation in relations
for child in other_project.manifest.child_map[project.model_relation_names[relation]]
]

forward_dependencies = {
ProjectDependency(
upstream=project.model_relation_names[relation],
downstream=other_project.model_relation_names[relation],
upstream_resource=child['upstream_resource'],
upstream_project_name=project.name,
downstream_resource=child['downstream_resource'],
downstream_project_name=other_project.name,
type=ProjectDependencyType.Package,
)
for child in package_children
}

# find the children of the shared models in the downstream project
backward_package_children = [
{
'upstream_resource': other_project.model_relation_names[relation],
'downstream_resource': child,
}
for relation in relations
for child in project.manifest.child_map[other_project.model_relation_names[relation]]
]

backward_dependencies = {
ProjectDependency(
upstream_resource=child['upstream_resource'],
upstream_project_name=other_project.name,
downstream_resource=child['downstream_resource'],
downstream_project_name=project.name,
type=ProjectDependencyType.Package,
)
for child in backward_package_children
}

return forward_dependencies | backward_dependencies

def dependencies(
self, project: BaseDbtProject, other_project: BaseDbtProject
self,
project: Union[BaseDbtProject, DbtProject],
other_project: Union[BaseDbtProject, DbtProject],
) -> Set[ProjectDependency]:
"""Detect dependencies between two projects and return a list of resources shared."""

Expand All @@ -156,3 +213,102 @@ def dependencies(
dependencies.update(package_dependencies)

return dependencies

def resolve_dependency(
self,
dependency: ProjectDependency,
upstream_project: DbtProject,
downstream_project: DbtProject,
):
upstream_manifest_entry = upstream_project.get_manifest_node(dependency.upstream_resource)
if not upstream_manifest_entry:
raise ValueError(
f"Could not find upstream resource {dependency.upstream_resource} in project {upstream_project.name}"
)
downstream_manifest_entry = downstream_project.get_manifest_node(
dependency.downstream_resource
)
upstream_catalog_entry = upstream_project.get_catalog_entry(dependency.upstream_resource)
upstream_mesh_constructor = DbtMeshConstructor(
project_path=upstream_project.path,
node=upstream_manifest_entry, # type: ignore
catalog=upstream_catalog_entry,
)

downstream_mesh_constructor = DbtMeshConstructor(
project_path=downstream_project.path,
node=downstream_manifest_entry, # type: ignore
catalog=None,
)
downstream_editor = DbtProjectEditor(downstream_project)
# for either dependency type, add contracts and make model public
try:
upstream_mesh_constructor.add_model_access(AccessType.Public)
logger.success(
f"Successfully update model access : {dependency.upstream_resource} is now {AccessType.Public}"
)
except FatalMeshifyException as e:
logger.error(f"Failed to update model access: {dependency.upstream_resource}")
raise e
try:
upstream_mesh_constructor.add_model_contract()
logger.success(f"Successfully added contract to model: {dependency.upstream_resource}")
except FatalMeshifyException as e:
logger.error(f"Failed to add contract to model: {dependency.upstream_resource}")
raise e

if dependency.type == ProjectDependencyType.Source:
for child in downstream_project.manifest.child_map[dependency.downstream_resource]:
constructor = DbtMeshConstructor(
project_path=downstream_project.path,
node=downstream_project.get_manifest_node(child), # type: ignore
catalog=None,
)
try:
constructor.replace_source_with_refs(
source_unique_id=dependency.downstream_resource,
model_unique_id=dependency.upstream_resource,
)
logger.success(
f"Successfully replaced source function with ref to upstream resource: {dependency.downstream_resource} now calls {dependency.upstream_resource} directly"
)
except FileEditorException as e:
logger.error(
f"Failed to replace source function with ref to upstream resource"
)
raise e
try:
downstream_editor.update_resource_yml_entry(
downstream_mesh_constructor, operation_type=YMLOperationType.Delete
)
logger.success(
f"Successfully deleted unnecessary source: {dependency.downstream_resource}"
)
except FatalMeshifyException as e:
logger.error(f"Failed to delete unnecessary source")
raise e

if dependency.type == ProjectDependencyType.Package:
try:
downstream_mesh_constructor.update_model_refs(
model_name=upstream_manifest_entry.name,
project_name=dependency.upstream_project_name,
)
logger.success(
f"Successfully updated model refs: {dependency.downstream_resource} now references {dependency.upstream_resource}"
)
except FileEditorException as e:
logger.error(f"Failed to update model refs")
raise e

# for both types, add upstream project to downstream project's dependencies.yml
try:
downstream_editor.update_dependencies_yml(name=upstream_project.name)
logger.success(
f"Successfully added {dependency.upstream_project_name} to {dependency.downstream_project_name}'s dependencies.yml"
)
except FileEditorException as e:
logger.error(
f"Failed to add {dependency.upstream_project_name} to {dependency.downstream_project_name}'s dependencies.yml"
)
raise e
Loading

0 comments on commit d66e4fc

Please sign in to comment.