-
Notifications
You must be signed in to change notification settings - Fork 47
/
utils.py
278 lines (224 loc) · 9.3 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
import codecs
import os
import platform
import re
import shutil
import socket
import sys
import threading
import time
from collections.abc import Callable, Iterator
from concurrent.futures import ThreadPoolExecutor
from os import environ
from pathlib import Path
from shutil import which
from typing import Any
import click
import dotenv
from algokit.core import proc
CLEAR_LINE = "\033[K"
SPINNER_FRAMES = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
# From _WIN_DEFAULT_PATHEXT from shutils
WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC"
def extract_version_triple(version_str: str) -> str:
match = re.search(r"\d+\.\d+\.\d+", version_str)
if not match:
raise ValueError("Unable to parse version number")
return match.group()
def is_minimum_version(system_version: str, minimum_version: str) -> bool:
system_version_as_tuple = tuple(map(int, system_version.split(".")))
minimum_version_as_tuple = tuple(map(int, minimum_version.split(".")))
return system_version_as_tuple >= minimum_version_as_tuple
def is_network_available(host: str = "8.8.8.8", port: int = 53, timeout: float = 3.0) -> bool:
"""
Check if internet is available by trying to establish a socket connection.
"""
try:
socket.setdefaulttimeout(timeout)
with socket.create_connection((host, port), timeout=timeout):
return True
except OSError:
return False
def animate(name: str, stop_event: threading.Event) -> None:
spinner = {
"interval": 100,
"frames": SPINNER_FRAMES,
}
while not stop_event.is_set():
for frame in spinner["frames"]: # type: ignore # noqa: PGH003
if stop_event.is_set():
break
try:
text = codecs.decode(frame, "utf-8")
except Exception:
text = frame
output = f"\r{text} {name}"
sys.stdout.buffer.write(output.encode("utf-8"))
sys.stdout.buffer.write(CLEAR_LINE.encode("utf-8"))
sys.stdout.buffer.flush()
time.sleep(0.001 * spinner["interval"]) # type: ignore # noqa: PGH003
sys.stdout.buffer.write(b"\r")
sys.stdout.buffer.write(b"\n")
sys.stdout.buffer.flush()
def run_with_animation(
target_function: Callable[..., Any], animation_text: str = "Loading", *args: Any, **kwargs: Any
) -> Any: # noqa: ANN401
with ThreadPoolExecutor(max_workers=2) as executor:
stop_event = threading.Event()
animation_future = executor.submit(animate, animation_text, stop_event)
function_future = executor.submit(target_function, *args, **kwargs)
try:
result = function_future.result()
except Exception as e:
stop_event.set()
animation_future.result()
raise e
else:
stop_event.set()
animation_future.result()
return result
def find_valid_pipx_command(error_message: str) -> list[str]:
for pipx_command in get_candidate_pipx_commands():
try:
pipx_version_result = proc.run([*pipx_command, "--version"])
except OSError:
pass # in case of path/permission issues, go to next candidate
else:
if pipx_version_result.exit_code == 0:
return pipx_command
# If pipx isn't found in global path or python -m pipx then bail out
# this is an exceptional circumstance since pipx should always be present with algokit
# since it's installed with brew / choco as a dependency, and otherwise is used to install algokit
raise click.ClickException(error_message)
def get_candidate_pipx_commands() -> Iterator[list[str]]:
# first try is pipx via PATH
yield ["pipx"]
# otherwise try getting an interpreter with pipx installed as a module,
# this won't work if pipx is installed in its own venv but worth a shot
for python_path in get_python_paths():
yield [python_path, "-m", "pipx"]
def get_npm_command(error_message: str, *, is_npx: bool = False) -> list[str]:
command = "npx" if is_npx else "npm"
path = shutil.which(command)
if not path:
raise click.ClickException(error_message)
# Create the npm directory inside %APPDATA% if it doesn't exist, as npx on windows needs this.
# See https://github.com/npm/cli/issues/7089 for more details.
if is_windows():
appdata_dir = os.getenv("APPDATA")
if appdata_dir is not None:
appdata_dir_path = Path(appdata_dir).expanduser()
npm_dir = appdata_dir_path / "npm"
try:
if not npm_dir.exists():
npm_dir.mkdir(parents=True)
except OSError as ex:
raise click.ClickException(
f"Failed to create the `npm` directory in {appdata_dir_path}.\n"
"This command uses `npx`, which requires the `npm` directory to exist "
"in the above path, otherwise an ENOENT 4058 error will occur.\n"
"Please create this directory manually and try again."
) from ex
return [f"{command}.cmd"]
return [command]
def get_python_paths() -> Iterator[str]:
for python_name in ("python3", "python"):
if python_path := which(python_name):
yield python_path
python_base_path = get_base_python_path()
if python_base_path is not None:
yield python_base_path
def get_base_python_path() -> str | None:
this_python: str | None = sys.executable
if not this_python or this_python.endswith("algokit"):
# Not: can be empty or None... yikes! unlikely though
# https://docs.python.org/3.10/library/sys.html#sys.executable
return None
# not in venv... not recommended to install algokit this way, but okay
if sys.prefix == sys.base_prefix:
return this_python
this_python_path = Path(this_python)
# try resolving symlink, this should be default on *nix
try:
if this_python_path.is_symlink():
return str(this_python_path.resolve())
except (OSError, RuntimeError):
pass
# otherwise, try getting an internal value which should be set when running in a .venv
# this will be the value of `home = <path>` in pyvenv.cfg if it exists
if base_home := getattr(sys, "_home", None):
base_home_path = Path(base_home)
for name in ("python", "python3", f"python3.{sys.version_info.minor}"):
candidate_path = base_home_path / name
if is_windows():
candidate_path = candidate_path.with_suffix(".exe")
if candidate_path.is_file():
return str(candidate_path)
# give up, we tried...
return this_python
def is_binary_mode() -> bool:
"""
Check if the current Python interpreter is running in a native binary frozen environment.
return: True if running in a native binary frozen environment, False otherwise.
"""
return getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS")
def is_windows() -> bool:
return platform.system() == "Windows"
def split_command_string(command: str) -> list[str]:
"""
Parses a command string into a list of arguments, handling both shell and non-shell commands
"""
if platform.system() == "Windows":
import mslex
return mslex.split(command)
else:
import shlex
return shlex.split(command)
def resolve_command_path(
command: list[str],
) -> list[str]:
"""
Encapsulates custom command resolution, promotes reusability
Args:
command (list[str]): The command to resolve
allow_chained_commands (bool): Whether to allow chained commands (e.g. "&&" or "||")
Returns:
list[str]: The resolved command
"""
cmd, *args = command
# No resolution needed if the command already has a path or is not Windows-specific
if Path(cmd).name != cmd:
return command
# Windows-specific handling if 'shutil.which' fails:
if is_windows():
for ext in environ.get("PATHEXT", WIN_DEFAULT_PATHEXT).split(";"):
potential_path = shutil.which(cmd + ext)
if potential_path:
return [potential_path, *args]
# If resolves directly, return
if resolved_cmd := shutil.which(cmd):
return [resolved_cmd, *args]
# Command not found with any extension
raise click.ClickException(f"Failed to resolve command path, '{cmd}' wasn't found")
def load_env_file(path: Path) -> dict[str, str | None]:
"""Load the general .env configuration.
Args:
path (Path): Path to the .env file or directory containing the .env file.
Returns:
dict[str, str | None]: Dictionary with .env configurations.
"""
# Check if the path is a file, if yes, use it directly
if path.is_file():
env_path = path
else:
# Assume the default .env file name in the given directory
env_path = path / ".env"
if env_path.exists():
return dotenv.dotenv_values(env_path, verbose=True)
return {}
def alphanumeric_sort_key(s: str) -> list[int | str]:
"""
Generate a key for sorting strings that contain both text and numbers.
For instance, ensures that "name_digit_1" comes before "name_digit_2".
"""
return [int(text) if text.isdigit() else text.lower() for text in re.split("([0-9]+)", s)]