forked from JetBrains-Research/python-change-miner
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
143 lines (116 loc) · 5.63 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import ast
import os
import pickle
import sys
import stackimpact
import datetime
import argparse
import multiprocessing
from log import logger
from patterns import Miner
from patterns.models import Fragment, Pattern
from vcs.traverse import GitAnalyzer, RepoInfo, Method
import pyflowgraph
import changegraph
import settings
class RunModes:
BUILD_PY_FLOW_GRAPH = 'pfg'
BUILD_CHANGE_GRAPH = 'cg'
COLLECT_CHANGE_GRAPHS = 'collect-cgs'
MINE_PATTERNS = 'patterns'
ALL = [BUILD_PY_FLOW_GRAPH, BUILD_CHANGE_GRAPH, COLLECT_CHANGE_GRAPHS, MINE_PATTERNS]
def main():
logger.info('------------------------------ Starting ------------------------------')
if settings.get('use_stackimpact', required=False):
_ = stackimpact.start(
agent_key=settings.get('stackimpact_agent_key'),
app_name='CodeChangesMiner',
debug=True,
app_version=str(datetime.datetime.now())
)
sys.setrecursionlimit(2**31-1)
multiprocessing.set_start_method('spawn', force=True)
parser = argparse.ArgumentParser()
parser.add_argument('mode', help=f'One of {RunModes.ALL}', type=str)
args, _ = parser.parse_known_args()
current_mode = args.mode
if current_mode == RunModes.BUILD_PY_FLOW_GRAPH:
parser.add_argument('-i', '--input', help='Path to source code file', type=str, required=True)
parser.add_argument('-o', '--output', help='Path to output file', type=str, default='pyflowgraph.dot')
parser.add_argument('--no-closure', action='store_true')
parser.add_argument('--show-deps', action='store_true')
parser.add_argument('--hide-op-kinds', action='store_true')
parser.add_argument('--show-data-keys', action='store_true')
args = parser.parse_args()
fg = pyflowgraph.build_from_file(
args.input, show_dependencies=args.show_deps, build_closure=not args.no_closure)
pyflowgraph.export_graph_image(
fg, args.output, show_op_kinds=not args.hide_op_kinds, show_data_keys=args.show_data_keys)
elif current_mode == RunModes.BUILD_CHANGE_GRAPH:
parser.add_argument('-s', '--src', help='Path to source code before changes', type=str, required=True)
parser.add_argument('-d', '--dest', help='Path to source code after changes', type=str, required=True)
parser.add_argument('-o', '--output', help='Path to output file', type=str, default='changegraph.dot')
args = parser.parse_args()
fg = changegraph.build_from_files(args.src, args.dest)
changegraph.export_graph_image(fg, args.output)
elif current_mode == RunModes.COLLECT_CHANGE_GRAPHS:
GitAnalyzer().build_change_graphs()
elif current_mode == RunModes.MINE_PATTERNS:
parser.add_argument('-s', '--src', help='Path to source code before changes', type=str, nargs='+')
parser.add_argument('-d', '--dest', help='Path to source code after changes', type=str, nargs='+')
parser.add_argument('--fake-mining', action='store_true')
args = parser.parse_args()
if args.src or args.dest or args.fake_mining:
if not args.src or len(args.src) != len(args.dest):
raise ValueError('src and dest have different size or unset')
change_graphs = []
for old_path, new_path in zip(args.src, args.dest):
methods = []
for n, path in enumerate([old_path, new_path]):
with open(path, 'r+') as f:
src = f.read()
methods.append(Method(path, 'test_name', ast.parse(src, mode='exec').body[0], src))
mock_commit_dtm = datetime.datetime.now(tz=datetime.timezone.utc)
repo_info = RepoInfo(
'mock repo path', 'mock repo name', 'mock repo url', 'mock hash', mock_commit_dtm,
'mock old file path', 'mock new file path', methods[0], methods[1])
cg = changegraph.build_from_files(old_path, new_path, repo_info=repo_info)
change_graphs.append(cg)
miner = Miner()
if args.fake_mining:
for cg in change_graphs:
fragment = Fragment()
fragment.graph = cg
fragment.nodes = cg.nodes
pattern = Pattern([fragment])
miner.add_pattern(pattern)
else:
miner.mine_patterns(change_graphs)
miner.print_patterns()
else:
storage_dir = settings.get('change_graphs_storage_dir')
file_names = os.listdir(storage_dir)
logger.warning(f'Found {len(file_names)} files in storage directory')
change_graphs = []
for file_num, file_name in enumerate(file_names):
file_path = os.path.join(storage_dir, file_name)
try:
with open(file_path, 'rb') as f:
graphs = pickle.load(f)
for graph in graphs:
change_graphs.append(pickle.loads(graph))
except:
logger.warning(f'Incorrect file {file_path}')
if file_num % 1000 == 0:
logger.warning(f'Loaded [{1+file_num}/{len(file_names)}] files')
logger.warning('Pattern mining has started')
miner = Miner()
try:
miner.mine_patterns(change_graphs)
except KeyboardInterrupt:
logger.warning('KeyboardInterrupt: mined patterns will be stored before exit')
miner.print_patterns()
else:
raise ValueError
if __name__ == '__main__':
main()