From 172733fd9a78c4db8a0f2d0c57a2c216bcd79fd8 Mon Sep 17 00:00:00 2001 From: Masaya Suzuki Date: Mon, 16 Oct 2023 19:21:48 -0700 Subject: [PATCH] Add mostrecent aggregation to Gauge In the multiprocess mode, the process that exposes the metrics needs to aggregate the samples from other processes. Gauge metric allows users to choose the aggregation mode. This implements 'mostrecent' (and 'livemostrecent') mode where the last observed value is exposed. In order to support this, the file format is expanded to store the timestamps in addition to the values. The stored timestamps are read by the reader process and it's used to find the latest value. Closes https://github.com/prometheus/client_python/issues/847 Consideration on the atomicity: Previously, mmap_dict.py had a comment saying "We assume that reading from an 8 byte aligned value is atomic". With this change, the value write becomes a 16 bytes 8-byte aligned write. The code author tried to find a basis on the original assumption, but couldn't find any. According to write(2), **if a file descriptor is shared**, the write becomes atomic. However, we do not share the file descriptors in the current architecture. Considering that Ruby implementation also does the same and hadn't seen an issue with it, this write atomicity problem might be practically not an issue. See also: * https://github.com/prometheus/client_ruby/pull/172 The approach and naming are taken from client_ruby. * https://github.com/prometheus/client_golang/blob/v1.17.0/prometheus/metric.go#L149-L161 client_golang has an API for setting timestamp already. It explains the use case for the timestamp beyond the client-local aggregation. In order to support the same use case in Python, further changes are needed. Signed-off-by: Masaya Suzuki --- prometheus_client/metrics.py | 21 ++++++++++++---- prometheus_client/mmap_dict.py | 40 +++++++++++++++---------------- prometheus_client/multiprocess.py | 11 +++++++-- prometheus_client/values.py | 12 ++++++---- tests/test_multiprocess.py | 40 +++++++++++++++++++++++-------- 5 files changed, 83 insertions(+), 41 deletions(-) diff --git a/prometheus_client/metrics.py b/prometheus_client/metrics.py index 66f51fa6..b7071612 100644 --- a/prometheus_client/metrics.py +++ b/prometheus_client/metrics.py @@ -346,7 +346,8 @@ def f(): d.set_function(lambda: len(my_dict)) """ _type = 'gauge' - _MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum')) + _MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent')) + _MOST_RECENT_MODES = frozenset(('mostrecent', 'livemostrecent')) def __init__(self, name: str, @@ -357,7 +358,7 @@ def __init__(self, unit: str = '', registry: Optional[CollectorRegistry] = REGISTRY, _labelvalues: Optional[Sequence[str]] = None, - multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum'] = 'all', + multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'] = 'all', ): self._multiprocess_mode = multiprocess_mode if multiprocess_mode not in self._MULTIPROC_MODES: @@ -382,18 +383,30 @@ def _metric_init(self) -> None: def inc(self, amount: float = 1) -> None: """Increment gauge by the given amount.""" + if self._multiprocess_mode in self._MOST_RECENT_MODES: + raise RuntimeError("inc must not be used with the mostrecent mode") self._raise_if_not_observable() self._value.inc(amount) def dec(self, amount: float = 1) -> None: """Decrement gauge by the given amount.""" + if self._multiprocess_mode in self._MOST_RECENT_MODES: + raise RuntimeError("dec must not be used with the mostrecent mode") self._raise_if_not_observable() self._value.inc(-amount) def set(self, value: float) -> None: - """Set gauge to the given value.""" + """Set gauge to the given value. + + This can take an optional timestamp to indicate when the sample was + taken. This is used for the most_recent aggregation and Prometheus + exposition. + """ self._raise_if_not_observable() - self._value.set(float(value)) + if self._multiprocess_mode in self._MOST_RECENT_MODES: + self._value.set(float(value), timestamp=time.time()) + else: + self._value.set(float(value)) def set_to_current_time(self) -> None: """Set gauge to the current unixtime.""" diff --git a/prometheus_client/mmap_dict.py b/prometheus_client/mmap_dict.py index c3de38fa..edd895cd 100644 --- a/prometheus_client/mmap_dict.py +++ b/prometheus_client/mmap_dict.py @@ -6,17 +6,18 @@ _INITIAL_MMAP_SIZE = 1 << 16 _pack_integer_func = struct.Struct(b'i').pack -_pack_double_func = struct.Struct(b'd').pack +_pack_two_doubles_func = struct.Struct(b'dd').pack _unpack_integer = struct.Struct(b'i').unpack_from -_unpack_double = struct.Struct(b'd').unpack_from +_unpack_two_doubles = struct.Struct(b'dd').unpack_from # struct.pack_into has atomicity issues because it will temporarily write 0 into # the mmap, resulting in false reads to 0 when experiencing a lot of writes. # Using direct assignment solves this issue. -def _pack_double(data, pos, value): - data[pos:pos + 8] = _pack_double_func(value) + +def _pack_two_doubles(data, pos, value, timestamp): + data[pos:pos + 16] = _pack_two_doubles_func(value, timestamp) def _pack_integer(data, pos, value): @@ -24,7 +25,7 @@ def _pack_integer(data, pos, value): def _read_all_values(data, used=0): - """Yield (key, value, pos). No locking is performed.""" + """Yield (key, value, timestamp, pos). No locking is performed.""" if used <= 0: # If not valid `used` value is passed in, read it from the file. @@ -41,9 +42,9 @@ def _read_all_values(data, used=0): encoded_key = data[pos:pos + encoded_len] padded_len = encoded_len + (8 - (encoded_len + 4) % 8) pos += padded_len - value = _unpack_double(data, pos)[0] - yield encoded_key.decode('utf-8'), value, pos - pos += 8 + value, timestamp = _unpack_two_doubles(data, pos) + yield encoded_key.decode('utf-8'), value, timestamp, pos + pos += 16 class MmapedDict: @@ -53,7 +54,8 @@ class MmapedDict: Then 4 bytes of padding. There's then a number of entries, consisting of a 4 byte int which is the size of the next field, a utf-8 encoded string key, padding to a 8 byte - alignment, and then a 8 byte float which is the value. + alignment, and then a 8 byte float which is the value and a 8 byte float + which is a UNIX timestamp in seconds. Not thread safe. """ @@ -76,7 +78,7 @@ def __init__(self, filename, read_mode=False): _pack_integer(self._m, 0, self._used) else: if not read_mode: - for key, _, pos in self._read_all_values(): + for key, _, _, pos in self._read_all_values(): self._positions[key] = pos @staticmethod @@ -95,7 +97,7 @@ def _init_value(self, key): encoded = key.encode('utf-8') # Pad to be 8-byte aligned. padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8)) - value = struct.pack(f'i{len(padded)}sd'.encode(), len(encoded), padded, 0.0) + value = struct.pack(f'i{len(padded)}sdd'.encode(), len(encoded), padded, 0.0, 0.0) while self._used + len(value) > self._capacity: self._capacity *= 2 self._f.truncate(self._capacity) @@ -105,30 +107,28 @@ def _init_value(self, key): # Update how much space we've used. self._used += len(value) _pack_integer(self._m, 0, self._used) - self._positions[key] = self._used - 8 + self._positions[key] = self._used - 16 def _read_all_values(self): """Yield (key, value, pos). No locking is performed.""" return _read_all_values(data=self._m, used=self._used) def read_all_values(self): - """Yield (key, value). No locking is performed.""" - for k, v, _ in self._read_all_values(): - yield k, v + """Yield (key, value, timestamp). No locking is performed.""" + for k, v, ts, _ in self._read_all_values(): + yield k, v, ts def read_value(self, key): if key not in self._positions: self._init_value(key) pos = self._positions[key] - # We assume that reading from an 8 byte aligned value is atomic - return _unpack_double(self._m, pos)[0] + return _unpack_two_doubles(self._m, pos) - def write_value(self, key, value): + def write_value(self, key, value, timestamp): if key not in self._positions: self._init_value(key) pos = self._positions[key] - # We assume that writing to an 8 byte aligned value is atomic - _pack_double(self._m, pos, value) + _pack_two_doubles(self._m, pos, value, timestamp) def close(self): if self._f: diff --git a/prometheus_client/multiprocess.py b/prometheus_client/multiprocess.py index dd343913..7021b49a 100644 --- a/prometheus_client/multiprocess.py +++ b/prometheus_client/multiprocess.py @@ -68,7 +68,7 @@ def _parse_key(key): # the file is missing continue raise - for key, value, _ in file_values: + for key, value, timestamp, _ in file_values: metric_name, name, labels, labels_key, help_text = _parse_key(key) metric = metrics.get(metric_name) @@ -79,7 +79,7 @@ def _parse_key(key): if typ == 'gauge': pid = parts[2][:-3] metric._multiprocess_mode = parts[1] - metric.add_sample(name, labels_key + (('pid', pid),), value) + metric.add_sample(name, labels_key + (('pid', pid),), value, timestamp) else: # The duplicates and labels are fixed in the next for. metric.add_sample(name, labels_key, value) @@ -89,6 +89,7 @@ def _parse_key(key): def _accumulate_metrics(metrics, accumulate): for metric in metrics.values(): samples = defaultdict(float) + sample_timestamps = defaultdict(float) buckets = defaultdict(lambda: defaultdict(float)) samples_setdefault = samples.setdefault for s in metric.samples: @@ -105,6 +106,12 @@ def _accumulate_metrics(metrics, accumulate): samples[without_pid_key] = value elif metric._multiprocess_mode in ('sum', 'livesum'): samples[without_pid_key] += value + elif metric._multiprocess_mode in ('mostrecent', 'livemostrecent'): + current_timestamp = sample_timestamps[without_pid_key] + timestamp = float(timestamp or 0) + if current_timestamp < timestamp: + samples[without_pid_key] = value + sample_timestamps[without_pid_key] = timestamp else: # all/liveall samples[(name, labels)] = value diff --git a/prometheus_client/values.py b/prometheus_client/values.py index 3373379b..6ff85e3b 100644 --- a/prometheus_client/values.py +++ b/prometheus_client/values.py @@ -19,7 +19,7 @@ def inc(self, amount): with self._lock: self._value += amount - def set(self, value): + def set(self, value, timestamp=None): with self._lock: self._value = value @@ -82,7 +82,7 @@ def __reset(self): files[file_prefix] = MmapedDict(filename) self._file = files[file_prefix] self._key = mmap_key(metric_name, name, labelnames, labelvalues, help_text) - self._value = self._file.read_value(self._key) + self._value, self._timestamp = self._file.read_value(self._key) def __check_for_pid_change(self): actual_pid = process_identifier() @@ -99,13 +99,15 @@ def inc(self, amount): with lock: self.__check_for_pid_change() self._value += amount - self._file.write_value(self._key, self._value) + self._timestamp = 0.0 + self._file.write_value(self._key, self._value, self._timestamp) - def set(self, value): + def set(self, value, timestamp=None): with lock: self.__check_for_pid_change() self._value = value - self._file.write_value(self._key, self._value) + self._timestamp = timestamp or 0.0 + self._file.write_value(self._key, self._value, self._timestamp) def set_exemplar(self, exemplar): # TODO: Implement exemplars for multiprocess mode. diff --git a/tests/test_multiprocess.py b/tests/test_multiprocess.py index 10990ad3..6e188e51 100644 --- a/tests/test_multiprocess.py +++ b/tests/test_multiprocess.py @@ -185,6 +185,26 @@ def test_gauge_livesum(self): mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR']) self.assertEqual(2, self.registry.get_sample_value('g')) + def test_gauge_mostrecent(self): + g1 = Gauge('g', 'help', registry=None, multiprocess_mode='mostrecent') + values.ValueClass = MultiProcessValue(lambda: 456) + g2 = Gauge('g', 'help', registry=None, multiprocess_mode='mostrecent') + g2.set(2) + g1.set(1) + self.assertEqual(1, self.registry.get_sample_value('g')) + mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR']) + self.assertEqual(1, self.registry.get_sample_value('g')) + + def test_gauge_livemostrecent(self): + g1 = Gauge('g', 'help', registry=None, multiprocess_mode='livemostrecent') + values.ValueClass = MultiProcessValue(lambda: 456) + g2 = Gauge('g', 'help', registry=None, multiprocess_mode='livemostrecent') + g2.set(2) + g1.set(1) + self.assertEqual(1, self.registry.get_sample_value('g')) + mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR']) + self.assertEqual(2, self.registry.get_sample_value('g')) + def test_namespace_subsystem(self): c1 = Counter('c', 'help', registry=None, namespace='ns', subsystem='ss') c1.inc(1) @@ -369,28 +389,28 @@ def setUp(self): self.d = mmap_dict.MmapedDict(self.tempfile) def test_process_restart(self): - self.d.write_value('abc', 123.0) + self.d.write_value('abc', 123.0, 987.0) self.d.close() self.d = mmap_dict.MmapedDict(self.tempfile) - self.assertEqual(123, self.d.read_value('abc')) - self.assertEqual([('abc', 123.0)], list(self.d.read_all_values())) + self.assertEqual((123, 987.0), self.d.read_value('abc')) + self.assertEqual([('abc', 123.0, 987.0)], list(self.d.read_all_values())) def test_expansion(self): key = 'a' * mmap_dict._INITIAL_MMAP_SIZE - self.d.write_value(key, 123.0) - self.assertEqual([(key, 123.0)], list(self.d.read_all_values())) + self.d.write_value(key, 123.0, 987.0) + self.assertEqual([(key, 123.0, 987.0)], list(self.d.read_all_values())) def test_multi_expansion(self): key = 'a' * mmap_dict._INITIAL_MMAP_SIZE * 4 - self.d.write_value('abc', 42.0) - self.d.write_value(key, 123.0) - self.d.write_value('def', 17.0) + self.d.write_value('abc', 42.0, 987.0) + self.d.write_value(key, 123.0, 876.0) + self.d.write_value('def', 17.0, 765.0) self.assertEqual( - [('abc', 42.0), (key, 123.0), ('def', 17.0)], + [('abc', 42.0, 987.0), (key, 123.0, 876.0), ('def', 17.0, 765.0)], list(self.d.read_all_values())) def test_corruption_detected(self): - self.d.write_value('abc', 42.0) + self.d.write_value('abc', 42.0, 987.0) # corrupt the written data self.d._m[8:16] = b'somejunk' with self.assertRaises(RuntimeError):