-
Notifications
You must be signed in to change notification settings - Fork 19
/
callbacks.py
161 lines (147 loc) · 4.88 KB
/
callbacks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import logging
import os
import re
import sys
from threading import RLock
from typing import Any, BinaryIO, ClassVar, Optional, Union
import fsspec
from tqdm import tqdm
logger = logging.getLogger(__name__)
tqdm.set_lock(RLock())
def env2bool(var, undefined=False):
"""
undefined: return value if env var is unset
"""
var = os.getenv(var, None)
if var is None:
return undefined
return bool(re.search("1|y|yes|true", var, flags=re.I))
class Tqdm(tqdm):
"""
maximum-compatibility tqdm-based progressbars
"""
BAR_FMT_DEFAULT = (
"{percentage:3.0f}% {desc}|{bar}|"
"{postfix[info]}{n_fmt}/{total_fmt}"
" [{elapsed}<{remaining}, {rate_fmt:>11}]"
)
# nested bars should have fixed bar widths to align nicely
BAR_FMT_DEFAULT_NESTED = (
"{percentage:3.0f}%|{bar:10}|{desc:{ncols_desc}.{ncols_desc}}"
"{postfix[info]}{n_fmt}/{total_fmt}"
" [{elapsed}<{remaining}, {rate_fmt:>11}]"
)
BAR_FMT_NOTOTAL = "{desc}{bar:b}|{postfix[info]}{n_fmt} [{elapsed}, {rate_fmt:>11}]"
BYTES_DEFAULTS: ClassVar[dict[str, Any]] = {
"unit": "B",
"unit_scale": True,
"unit_divisor": 1024,
"miniters": 1,
}
def __init__(
self,
iterable=None,
disable=None,
level=logging.ERROR,
desc=None,
leave=False,
bar_format=None,
bytes=False, # noqa: A002
file=None,
total=None,
postfix=None,
**kwargs,
):
"""
bytes : shortcut for
`unit='B', unit_scale=True, unit_divisor=1024, miniters=1`
desc : persists after `close()`
level : effective logging level for determining `disable`;
used only if `disable` is unspecified
disable : If (default: None) or False,
will be determined by logging level.
May be overridden to `True` due to non-TTY status.
Skip override by specifying env var `DVC_IGNORE_ISATTY`.
kwargs : anything accepted by `tqdm.tqdm()`
"""
kwargs = kwargs.copy()
if bytes:
kwargs = {**self.BYTES_DEFAULTS, **kwargs}
else:
kwargs.setdefault("unit_scale", total > 999 if total else True)
if file is None:
file = sys.stderr
# auto-disable based on `logger.level`
if not disable:
disable = logger.getEffectiveLevel() > level
# auto-disable based on TTY
if (
not disable
and not env2bool("DVC_IGNORE_ISATTY")
and hasattr(file, "isatty")
):
disable = not file.isatty()
super().__init__(
iterable=iterable,
disable=disable,
leave=leave,
desc=desc,
bar_format="!",
lock_args=(False,),
total=total,
**kwargs,
)
self.postfix = postfix or {"info": ""}
if bar_format is None:
if self.__len__():
self.bar_format = (
self.BAR_FMT_DEFAULT_NESTED if self.pos else self.BAR_FMT_DEFAULT
)
else:
self.bar_format = self.BAR_FMT_NOTOTAL
else:
self.bar_format = bar_format
self.refresh()
def close(self):
self.postfix["info"] = ""
# remove ETA (either unknown or zero); remove completed bar
self.bar_format = self.bar_format.replace("<{remaining}", "").replace(
"|{bar:10}|", " "
)
super().close()
@property
def format_dict(self):
"""inject `ncols_desc` to fill the display width (`ncols`)"""
d = super().format_dict
ncols = d["ncols"] or 80
# assumes `bar_format` has max one of ("ncols_desc" & "ncols_info")
meter = self.format_meter( # type: ignore[call-arg]
ncols_desc=1, ncols_info=1, **d
)
ncols_left = ncols - len(meter) + 1
ncols_left = max(ncols_left, 0)
if ncols_left:
d["ncols_desc"] = d["ncols_info"] = ncols_left
else:
# work-around for zero-width description
d["ncols_desc"] = d["ncols_info"] = 1
d["prefix"] = ""
return d
class TqdmCallback(fsspec.callbacks.TqdmCallback):
def __init__(
self,
size: Optional[int] = None,
value: int = 0,
progress_bar: Optional["tqdm"] = None,
tqdm_cls: Optional[type["tqdm"]] = None,
**tqdm_kwargs,
):
tqdm_kwargs.pop("total", None)
super().__init__(
tqdm_kwargs=tqdm_kwargs, tqdm_cls=tqdm_cls or Tqdm, size=size, value=value
)
if progress_bar is None:
self.tqdm = progress_bar
def branched(self, path_1: "Union[str, BinaryIO]", path_2: str, **kwargs):
desc = path_1 if isinstance(path_1, str) else path_2
return TqdmCallback(bytes=True, desc=desc)