-
Notifications
You must be signed in to change notification settings - Fork 69
/
fixup_keywords.py.j2
176 lines (148 loc) · 6.11 KB
/
fixup_keywords.py.j2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
{% extends '_base.py.j2' %}
{% block content %}
import argparse
import os
import libcst as cst
import pathlib
import sys
from typing import (Any, Callable, Dict, List, Sequence, Tuple)
def partition(
predicate: Callable[[Any], bool],
iterator: Sequence[Any]
) -> Tuple[List[Any], List[Any]]:
"""A stable, out-of-place partition."""
results = ([], [])
for i in iterator:
results[int(predicate(i))].append(i)
# Returns trueList, falseList
return results[1], results[0]
class {{ api.naming.module_name }}CallTransformer(cst.CSTTransformer):
CTRL_PARAMS: Tuple[str] = ('retry', 'timeout', 'metadata')
{% with all_methods = [] -%}
{% for service in api.services.values() %}{% for method in service.methods.values() -%}
{% do all_methods.append(method) -%}
{% endfor %}{% endfor -%}
METHOD_TO_PARAMS: Dict[str, Tuple[str]] = {
{% for method in all_methods|sort(attribute='name')|unique(attribute='name') -%}
'{{ method.name|snake_case }}': ({% for field in method.legacy_flattened_fields.values() %}'{{ field.name }}', {% endfor %}),
{% endfor -%}
}
{% endwith %}
def leave_Call(self, original: cst.Call, updated: cst.Call) -> cst.CSTNode:
try:
key = original.func.attr.value
kword_params = self.METHOD_TO_PARAMS[key]
except (AttributeError, KeyError):
# Either not a method from the API or too convoluted to be sure.
return updated
# If the existing code is valid, keyword args come after positional args.
# Therefore, all positional args must map to the first parameters.
args, kwargs = partition(lambda a: not bool(a.keyword), updated.args)
if any(k.keyword.value == "request" for k in kwargs):
# We've already fixed this file, don't fix it again.
return updated
kwargs, ctrl_kwargs = partition(
lambda a: not a.keyword.value in self.CTRL_PARAMS,
kwargs
)
args, ctrl_args = args[:len(kword_params)], args[len(kword_params):]
ctrl_kwargs.extend(cst.Arg(value=a.value, keyword=cst.Name(value=ctrl))
for a, ctrl in zip(ctrl_args, self.CTRL_PARAMS))
request_arg = cst.Arg(
value=cst.Dict([
cst.DictElement(
cst.SimpleString("'{}'".format(name)),
{# Inline comments and formatting are currently stripped out. -#}
{# My current attempts at preverving comments and formatting -#}
{# keep the comments, but the formatting is run through a log -#}
{# chipper, and an extra comma gets added, which causes a -#}
{# parse error. -#}
cst.Element(value=arg.value)
)
# Note: the args + kwargs looks silly, but keep in mind that
# the control parameters had to be stripped out, and that
# those could have been passed positionally or by keyword.
for name, arg in zip(kword_params, args + kwargs)]),
keyword=cst.Name("request")
)
return updated.with_changes(
args=[request_arg] + ctrl_kwargs
)
def fix_files(
in_dir: pathlib.Path,
out_dir: pathlib.Path,
*,
transformer={{ api.naming.module_name }}CallTransformer(),
):
"""Duplicate the input dir to the output dir, fixing file method calls.
Preconditions:
* in_dir is a real directory
* out_dir is a real, empty directory
"""
pyfile_gen = (
pathlib.Path(os.path.join(root, f))
for root, _, files in os.walk(in_dir)
for f in files if os.path.splitext(f)[1] == ".py"
)
for fpath in pyfile_gen:
with open(fpath, 'r') as f:
src = f.read()
# Parse the code and insert method call fixes.
tree = cst.parse_module(src)
updated = tree.visit(transformer)
# Create the path and directory structure for the new file.
updated_path = out_dir.joinpath(fpath.relative_to(in_dir))
updated_path.parent.mkdir(parents=True, exist_ok=True)
# Generate the updated source file at the corresponding path.
with open(updated_path, 'w') as f:
f.write(updated.code)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="""Fix up source that uses the {{ api.naming.module_name }} client library.
The existing sources are NOT overwritten but are copied to output_dir with changes made.
Note: This tool operates at a best-effort level at converting positional
parameters in client method calls to keyword based parameters.
Cases where it WILL FAIL include
A) * or ** expansion in a method call.
B) Calls via function or method alias (includes free function calls)
C) Indirect or dispatched calls (e.g. the method is looked up dynamically)
These all constitute false negatives. The tool will also detect false
positives when an API method shares a name with another method.
""")
parser.add_argument(
'-d',
'--input-directory',
required=True,
dest='input_dir',
help='the input directory to walk for python files to fix up',
)
parser.add_argument(
'-o',
'--output-directory',
required=True,
dest='output_dir',
help='the directory to output files fixed via un-flattening',
)
args = parser.parse_args()
input_dir = pathlib.Path(args.input_dir)
output_dir = pathlib.Path(args.output_dir)
if not input_dir.is_dir():
print(
f"input directory '{input_dir}' does not exist or is not a directory",
file=sys.stderr,
)
sys.exit(-1)
if not output_dir.is_dir():
print(
f"output directory '{output_dir}' does not exist or is not a directory",
file=sys.stderr,
)
sys.exit(-1)
if os.listdir(output_dir):
print(
f"output directory '{output_dir}' is not empty",
file=sys.stderr,
)
sys.exit(-1)
fix_files(input_dir, output_dir)
{% endblock %}