Skip to content

Commit

Permalink
[#4069] Use more standard disabled lookup functions
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank committed Oct 15, 2021
1 parent 4307a82 commit 65dc76c
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 87 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Under the hood
- Fix intermittent errors in partial parsing tests ([#4060](https://github.com/dbt-labs/dbt-core/issues/4060), [#4068](https://github.com/dbt-labs/dbt-core/pull/4068))
- Make finding disabled nodes more consistent ([#4069](https://github.com/dbt-labs/dbt-core/issues/4069), [#4073](https://github.com/dbt-labas/dbt-core/pull/4073))

## dbt-core 1.0.0b1 (October 11, 2021)

Expand Down
149 changes: 76 additions & 73 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,23 @@ def perform_lookup(

class SourceLookup(dbtClassMixin):
def __init__(self, manifest: 'Manifest'):
self.storage: Dict[Tuple[str, str], Dict[PackageName, UniqueID]] = {}
self.storage: Dict[str, Dict[PackageName, UniqueID]] = {}
self.populate(manifest)

def get_unique_id(self, key, package: Optional[PackageName]):
return find_unique_id_for_package(self.storage, key, package)
def get_unique_id(self, search_name, package: Optional[PackageName]):
return find_unique_id_for_package(self.storage, search_name, package)

def find(self, key, package: Optional[PackageName], manifest: 'Manifest'):
unique_id = self.get_unique_id(key, package)
def find(self, search_name, package: Optional[PackageName], manifest: 'Manifest'):
unique_id = self.get_unique_id(search_name, package)
if unique_id is not None:
return self.perform_lookup(unique_id, manifest)
return None

def add_source(self, source: ParsedSourceDefinition):
key = (source.source_name, source.name)
if key not in self.storage:
self.storage[key] = {}
if source.search_name not in self.storage:
self.storage[source.search_name] = {}

self.storage[key][source.package_name] = source.unique_id
self.storage[source.search_name][source.package_name] = source.unique_id

def populate(self, manifest):
for source in manifest.sources.values():
Expand Down Expand Up @@ -169,6 +168,43 @@ def perform_lookup(
return manifest.nodes[unique_id]


# This handles both models/seeds/snapshots and sources
class DisabledLookup(dbtClassMixin):

def __init__(self, manifest: 'Manifest'):
self.storage: Dict[str, Dict[PackageName, List[Any]]] = {}
self.populate(manifest)

def populate(self, manifest):
for node in list(chain.from_iterable(manifest.disabled.values())):
self.add_node(node)

def add_node(self, node):
if node.search_name not in self.storage:
self.storage[node.search_name] = {}
if node.package_name not in self.storage[node.search_name]:
self.storage[node.search_name][node.package_name] = []
self.storage[node.search_name][node.package_name].append(node)

# This should return a list of disabled nodes. It's different from
# the other Lookup functions in that it returns full nodes, not just unique_ids
def find(self, search_name, package: Optional[PackageName]):
if search_name not in self.storage:
return None

pkg_dct: Mapping[PackageName, List[Any]] = self.storage[search_name]

if package is None:
if not pkg_dct:
return None
else:
return next(iter(pkg_dct.values()))
elif package in pkg_dct:
return pkg_dct[package]
else:
return None


class AnalysisLookup(RefableLookup):
_lookup_types: ClassVar[set] = set([NodeType.Analysis])

Expand Down Expand Up @@ -378,39 +414,6 @@ def search_name(self) -> str:
raise NotImplementedError('search_name not implemented')


N = TypeVar('N', bound=Searchable)


@dataclass
class DisabledNameSearcher():
name: str
package: Optional[str]
nodetypes: List[NodeType]

def _matches(self, model: N) -> bool:
"""Return True if the model matches the given name, package, and type.
If package is None, any package is allowed.
nodetypes should be a container of NodeTypes that implements the 'in'
operator.
"""
if model.resource_type not in self.nodetypes:
return False

if self.name != model.search_name:
return False

return self.package is None or self.package == model.package_name

def search(self, haystack) -> Optional[N]:
"""Find an entry in the given iterable by name."""
for model_list in haystack.values():
for model in model_list:
if self._matches(model):
return model
return None


D = TypeVar('D')


Expand Down Expand Up @@ -576,6 +579,9 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
_ref_lookup: Optional[RefableLookup] = field(
default=None, metadata={'serialize': lambda x: None, 'deserialize': lambda x: None}
)
_disabled_lookup: Optional[DisabledLookup] = field(
default=None, metadata={'serialize': lambda x: None, 'deserialize': lambda x: None}
)
_analysis_lookup: Optional[AnalysisLookup] = field(
default=None, metadata={'serialize': lambda x: None, 'deserialize': lambda x: None}
)
Expand Down Expand Up @@ -649,26 +655,12 @@ def build_flat_graph(self):
}
}

def find_disabled_by_name(
self, name: str, package: Optional[str] = None
) -> Optional[ManifestNode]:
searcher: DisabledNameSearcher = DisabledNameSearcher(
name, package, NodeType.refable()
)
result = searcher.search(self.disabled)
return result

def find_disabled_source_by_name(
self, source_name: str, table_name: str, package: Optional[str] = None
) -> Optional[ParsedSourceDefinition]:
search_name = f'{source_name}.{table_name}'
searcher: DisabledNameSearcher = DisabledNameSearcher(
search_name, package, [NodeType.Source]
)
result = searcher.search(self.disabled)
if result is not None:
assert isinstance(result, ParsedSourceDefinition)
return result
def build_disabled_by_file_id(self):
disabled_by_file_id = {}
for node_list in self.disabled.values():
for node in node_list:
disabled_by_file_id[node.file_id] = node
return disabled_by_file_id

def _materialization_candidates_for(
self, project_name: str,
Expand Down Expand Up @@ -818,6 +810,15 @@ def ref_lookup(self) -> RefableLookup:
def rebuild_ref_lookup(self):
self._ref_lookup = RefableLookup(self)

@property
def disabled_lookup(self) -> DisabledLookup:
if self._disabled_lookup is None:
self._disabled_lookup = DisabledLookup(self)
return self._disabled_lookup

def rebuild_disabled_lookup(self):
self._disabled_lookup = DisabledLookup(self)

@property
def analysis_lookup(self) -> AnalysisLookup:
if self._analysis_lookup is None:
Expand All @@ -835,7 +836,7 @@ def resolve_ref(
) -> MaybeNonSource:

node: Optional[ManifestNode] = None
disabled: Optional[ManifestNode] = None
disabled: Optional[List[ManifestNode]] = None

candidates = _search_packages(
current_project, node_package, target_model_package
Expand All @@ -848,12 +849,12 @@ def resolve_ref(

# it's possible that the node is disabled
if disabled is None:
disabled = self.find_disabled_by_name(
disabled = self.disabled_lookup.find(
target_model_name, pkg
)

if disabled is not None:
return Disabled(disabled)
if disabled:
return Disabled(disabled[0])
return None

# Called by dbt.parser.manifest._resolve_sources_for_exposure
Expand All @@ -865,24 +866,24 @@ def resolve_source(
current_project: str,
node_package: str
) -> MaybeParsedSource:
key = (target_source_name, target_table_name)
search_name = f'{target_source_name}.{target_table_name}'
candidates = _search_packages(current_project, node_package)

source: Optional[ParsedSourceDefinition] = None
disabled: Optional[ParsedSourceDefinition] = None
disabled: Optional[List[ParsedSourceDefinition]] = None

for pkg in candidates:
source = self.source_lookup.find(key, pkg, self)
source = self.source_lookup.find(search_name, pkg, self)
if source is not None and source.config.enabled:
return source

if disabled is None:
disabled = self.find_disabled_source_by_name(
target_source_name, target_table_name, pkg
disabled = self.disabled_lookup.find(
f'{target_source_name}.{target_table_name}', pkg
)

if disabled is not None:
return Disabled(disabled)
if disabled:
return Disabled(disabled[0])
return None

# Called by DocsRuntimeContext.doc
Expand Down Expand Up @@ -1050,6 +1051,8 @@ def __reduce_ex__(self, protocol):
self._doc_lookup,
self._source_lookup,
self._ref_lookup,
self._disabled_lookup,
self._analysis_lookup,
)
return self.__class__, args

Expand Down
8 changes: 0 additions & 8 deletions core/dbt/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,14 +606,6 @@ def source_target_not_found(
raise_compiler_error(msg, model)


def ref_disabled_dependency(model, target_model):
raise_compiler_error(
"Model '{}' depends on model '{}' which is disabled in "
"the project config".format(model.unique_id,
target_model.unique_id),
model)


def dependency_not_found(model, target_model_name):
raise_compiler_error(
"'{}' depends on '{}' which is not in the graph!"
Expand Down
3 changes: 3 additions & 0 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ def load(self):
# aren't in place yet
self.manifest.rebuild_ref_lookup()
self.manifest.rebuild_doc_lookup()
self.manifest.rebuild_disabled_lookup()

# Load yaml files
parser_types = [SchemaParser]
Expand All @@ -343,6 +344,8 @@ def load(self):
self._perf_info.patch_sources_elapsed = (
time.perf_counter() - start_patch
)
# We need to rebuild disabled in order to include disabled sources
self.manifest.rebuild_disabled_lookup()

# copy the selectors from the root_project to the manifest
self.manifest.selectors = self.root_project.manifest_selectors
Expand Down
10 changes: 8 additions & 2 deletions core/dbt/parser/partial.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def __init__(self, saved_manifest: Manifest, new_files: MutableMapping[str, AnyS
self.build_file_diff()
self.processing_file = None
self.deleted_special_override_macro = False
self.disabled_by_file_id = self.saved_manifest.build_disabled_by_file_id()

def skip_parsing(self):
return (
Expand Down Expand Up @@ -267,13 +268,18 @@ def remove_node_in_saved(self, source_file, unique_id):
# delete node in saved
node = self.saved_manifest.nodes.pop(unique_id)
self.deleted_manifest.nodes[unique_id] = node
elif unique_id in self.saved_manifest.disabled:
elif (source_file.file_id in self.disabled_by_file_id and
unique_id in self.saved_manifest.disabled):
# This node is disabled. Find the node and remove it from disabled dictionary.
for dis_index, dis_node in enumerate(self.saved_manifest.disabled[unique_id]):
if dis_node.file_id == source_file.file_id:
node = dis_node
break
if dis_node:
del self.saved_manifest.disabled[dis_index]
# Remove node from disabled and unique_id from disabled dict if necessary
del self.saved_manifest.disabled[unique_id][dis_index]
if not self.saved_manifest.disabled[unique_id]:
self.saved_manifest.disabled.pop(unique_id)
else:
# Has already been deleted by another action
return
Expand Down
8 changes: 4 additions & 4 deletions core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,11 +827,11 @@ def parse_patch(
f'file {source_file.path.original_file_path}'
)
if unique_id is None:
# Node might be disabled. Following call returns first matching node encountered.
found_node = self.manifest.find_disabled_by_name(patch.package_name, patch.name)
if found_node:
# Node might be disabled. Following call returns list of matching disabled nodes
found_nodes = self.manifest.disabled_lookup.find(patch.name, patch.package_name)
if found_nodes:
# There might be multiple disabled nodes for this model
for node in self.manifest.disabled[found_node.unique_id]:
for node in found_nodes:
# We're saving the patch_path because we need to schedule
# re-application of the patch in partial parsing.
node.patch_path = source_file.file_id
Expand Down

0 comments on commit 65dc76c

Please sign in to comment.