-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
macros.py
110 lines (92 loc) · 3.81 KB
/
macros.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from typing import Iterable, List
import jinja2
from dbt.clients import jinja
from dbt.contracts.graph.unparsed import UnparsedMacro
from dbt.contracts.graph.parsed import ParsedMacro
from dbt.contracts.files import FilePath, SourceFile
from dbt.exceptions import ParsingException
from dbt.events.functions import fire_event
from dbt.events.types import MacroFileParse
from dbt.node_types import NodeType
from dbt.parser.base import BaseParser
from dbt.parser.search import FileBlock, filesystem_search
from dbt.utils import MACRO_PREFIX
class MacroParser(BaseParser[ParsedMacro]):
# This is only used when creating a MacroManifest separate
# from the normal parsing flow.
def get_paths(self) -> List[FilePath]:
return filesystem_search(
project=self.project, relative_dirs=self.project.macro_paths, extension=".sql"
)
@property
def resource_type(self) -> NodeType:
return NodeType.Macro
@classmethod
def get_compiled_path(cls, block: FileBlock):
return block.path.relative_path
def parse_macro(
self, block: jinja.BlockTag, base_node: UnparsedMacro, name: str
) -> ParsedMacro:
unique_id = self.generate_unique_id(name)
return ParsedMacro(
path=base_node.path,
macro_sql=block.full_block,
original_file_path=base_node.original_file_path,
package_name=base_node.package_name,
resource_type=base_node.resource_type,
name=name,
unique_id=unique_id,
)
def parse_unparsed_macros(self, base_node: UnparsedMacro) -> Iterable[ParsedMacro]:
try:
blocks: List[jinja.BlockTag] = [
t
for t in jinja.extract_toplevel_blocks(
base_node.raw_code,
allowed_blocks={"macro", "materialization", "test"},
collect_raw_data=False,
)
if isinstance(t, jinja.BlockTag)
]
except ParsingException as exc:
exc.add_node(base_node)
raise
for block in blocks:
try:
ast = jinja.parse(block.full_block)
except ParsingException as e:
e.add_node(base_node)
raise
macro_nodes = list(ast.find_all(jinja2.nodes.Macro))
if len(macro_nodes) != 1:
# things have gone disastrously wrong, we thought we only
# parsed one block!
raise ParsingException(
f"Found multiple macros in {block.full_block}, expected 1", node=base_node
)
macro = macro_nodes[0]
if not macro.name.startswith(MACRO_PREFIX):
continue
name: str = macro.name.replace(MACRO_PREFIX, "")
node = self.parse_macro(block, base_node, name)
# get supported_languages for materialization macro
if "materialization" in name:
node.supported_languages = jinja.get_supported_languages(macro)
yield node
def parse_file(self, block: FileBlock):
assert isinstance(block.file, SourceFile)
source_file = block.file
assert isinstance(source_file.contents, str)
original_file_path = source_file.path.original_file_path
fire_event(MacroFileParse(path=original_file_path))
# this is really only used for error messages
base_node = UnparsedMacro(
path=original_file_path,
original_file_path=original_file_path,
package_name=self.project.project_name,
raw_code=source_file.contents,
resource_type=NodeType.Macro,
language="sql",
)
for node in self.parse_unparsed_macros(base_node):
self.manifest.add_macro(block.file, node)