Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mostrecent aggregation to Gauge #1

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions prometheus_client/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def f():
d.set_function(lambda: len(my_dict))
"""
_type = 'gauge'
_MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum'))
_MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'))

def __init__(self,
name: str,
Expand All @@ -357,7 +357,7 @@ def __init__(self,
unit: str = '',
registry: Optional[CollectorRegistry] = REGISTRY,
_labelvalues: Optional[Sequence[str]] = None,
multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum'] = 'all',
multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'] = 'all',
):
self._multiprocess_mode = multiprocess_mode
if multiprocess_mode not in self._MULTIPROC_MODES:
Expand Down Expand Up @@ -390,10 +390,15 @@ def dec(self, amount: float = 1) -> None:
self._raise_if_not_observable()
self._value.inc(-amount)

def set(self, value: float) -> None:
"""Set gauge to the given value."""
def set(self, value: float, timestamp_sec: Optional[float] = None) -> None:
"""Set gauge to the given value.

This can take an optional timestamp to indicate when the sample was
taken. This is used for the most_recent aggregation and Prometheus
exposition.
"""
self._raise_if_not_observable()
self._value.set(float(value))
self._value.set(float(value), timestamp_sec=timestamp_sec)

def set_to_current_time(self) -> None:
"""Set gauge to the current unixtime."""
Expand Down
40 changes: 20 additions & 20 deletions prometheus_client/mmap_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,26 @@

_INITIAL_MMAP_SIZE = 1 << 16
_pack_integer_func = struct.Struct(b'i').pack
_pack_double_func = struct.Struct(b'd').pack
_pack_two_doubles_func = struct.Struct(b'dd').pack
_unpack_integer = struct.Struct(b'i').unpack_from
_unpack_double = struct.Struct(b'd').unpack_from
_unpack_two_doubles = struct.Struct(b'dd').unpack_from


# struct.pack_into has atomicity issues because it will temporarily write 0 into
# the mmap, resulting in false reads to 0 when experiencing a lot of writes.
# Using direct assignment solves this issue.

def _pack_double(data, pos, value):
data[pos:pos + 8] = _pack_double_func(value)

def _pack_two_doubles(data, pos, value, timestamp_sec):
data[pos:pos + 16] = _pack_two_doubles_func(value, timestamp_sec)


def _pack_integer(data, pos, value):
data[pos:pos + 4] = _pack_integer_func(value)


def _read_all_values(data, used=0):
"""Yield (key, value, pos). No locking is performed."""
"""Yield (key, value, timestamp_sec, pos). No locking is performed."""

if used <= 0:
# If not valid `used` value is passed in, read it from the file.
Expand All @@ -41,9 +42,9 @@ def _read_all_values(data, used=0):
encoded_key = data[pos:pos + encoded_len]
padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
pos += padded_len
value = _unpack_double(data, pos)[0]
yield encoded_key.decode('utf-8'), value, pos
pos += 8
value, timestamp_sec = _unpack_two_doubles(data, pos)
yield encoded_key.decode('utf-8'), value, timestamp_sec, pos
pos += 16


class MmapedDict:
Expand All @@ -53,7 +54,8 @@ class MmapedDict:
Then 4 bytes of padding.
There's then a number of entries, consisting of a 4 byte int which is the
size of the next field, a utf-8 encoded string key, padding to a 8 byte
alignment, and then a 8 byte float which is the value.
alignment, and then a 8 byte float which is the value and a 8 byte float
which is a UNIX timestamp in seconds.

Not thread safe.
"""
Expand All @@ -76,7 +78,7 @@ def __init__(self, filename, read_mode=False):
_pack_integer(self._m, 0, self._used)
else:
if not read_mode:
for key, _, pos in self._read_all_values():
for key, _, _, pos in self._read_all_values():
self._positions[key] = pos

@staticmethod
Expand All @@ -95,7 +97,7 @@ def _init_value(self, key):
encoded = key.encode('utf-8')
# Pad to be 8-byte aligned.
padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8))
value = struct.pack(f'i{len(padded)}sd'.encode(), len(encoded), padded, 0.0)
value = struct.pack(f'i{len(padded)}sdd'.encode(), len(encoded), padded, 0.0, 0.0)
while self._used + len(value) > self._capacity:
self._capacity *= 2
self._f.truncate(self._capacity)
Expand All @@ -105,30 +107,28 @@ def _init_value(self, key):
# Update how much space we've used.
self._used += len(value)
_pack_integer(self._m, 0, self._used)
self._positions[key] = self._used - 8
self._positions[key] = self._used - 16

def _read_all_values(self):
"""Yield (key, value, pos). No locking is performed."""
return _read_all_values(data=self._m, used=self._used)

def read_all_values(self):
"""Yield (key, value). No locking is performed."""
for k, v, _ in self._read_all_values():
yield k, v
"""Yield (key, value, timestamp_sec). No locking is performed."""
for k, v, ts, _ in self._read_all_values():
yield k, v, ts

def read_value(self, key):
if key not in self._positions:
self._init_value(key)
pos = self._positions[key]
# We assume that reading from an 8 byte aligned value is atomic
return _unpack_double(self._m, pos)[0]
return _unpack_two_doubles(self._m, pos)

def write_value(self, key, value):
def write_value(self, key, value, timestamp_sec):
if key not in self._positions:
self._init_value(key)
pos = self._positions[key]
# We assume that writing to an 8 byte aligned value is atomic
_pack_double(self._m, pos, value)
_pack_two_doubles(self._m, pos, value, timestamp_sec)

def close(self):
if self._f:
Expand Down
13 changes: 10 additions & 3 deletions prometheus_client/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def _parse_key(key):
# the file is missing
continue
raise
for key, value, _ in file_values:
for key, value, timestamp_sec, _ in file_values:
metric_name, name, labels, labels_key, help_text = _parse_key(key)

metric = metrics.get(metric_name)
Expand All @@ -79,7 +79,7 @@ def _parse_key(key):
if typ == 'gauge':
pid = parts[2][:-3]
metric._multiprocess_mode = parts[1]
metric.add_sample(name, labels_key + (('pid', pid),), value)
metric.add_sample(name, labels_key + (('pid', pid),), value, timestamp_sec)
else:
# The duplicates and labels are fixed in the next for.
metric.add_sample(name, labels_key, value)
Expand All @@ -89,6 +89,7 @@ def _parse_key(key):
def _accumulate_metrics(metrics, accumulate):
for metric in metrics.values():
samples = defaultdict(float)
sample_timestamp_secs = defaultdict(float)
buckets = defaultdict(lambda: defaultdict(float))
samples_setdefault = samples.setdefault
for s in metric.samples:
Expand All @@ -105,6 +106,12 @@ def _accumulate_metrics(metrics, accumulate):
samples[without_pid_key] = value
elif metric._multiprocess_mode in ('sum', 'livesum'):
samples[without_pid_key] += value
elif metric._multiprocess_mode in ('mostrecent', 'livemostrecent'):
current_ts_sec = sample_timestamp_secs[without_pid_key]
ts_sec = float(timestamp or 0)
if current_ts_sec < ts_sec:
samples[without_pid_key] = value
sample_timestamp_secs[without_pid_key] = ts_sec
else: # all/liveall
samples[(name, labels)] = value

Expand Down Expand Up @@ -143,7 +150,7 @@ def _accumulate_metrics(metrics, accumulate):
samples[(metric.name + '_count', labels)] = acc

# Convert to correct sample format.
metric.samples = [Sample(name_, dict(labels), value) for (name_, labels), value in samples.items()]
metric.samples = [Sample(name_, dict(labels), value, sample_timestamp_secs.get((name_, labels), None)) for (name_, labels), value in samples.items()]
return metrics.values()

def collect(self):
Expand Down
30 changes: 23 additions & 7 deletions prometheus_client/values.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
from threading import Lock
import time
import warnings

from .mmap_dict import mmap_key, MmapedDict
Expand All @@ -12,16 +13,23 @@ class MutexValue:

def __init__(self, typ, metric_name, name, labelnames, labelvalues, help_text, **kwargs):
self._value = 0.0
self._timestamp_sec = 0.0
self._exemplar = None
self._lock = Lock()

def inc(self, amount):
def inc(self, amount, timestamp_sec=None):
if not timestamp_sec:
timestamp_sec = time.time()
with self._lock:
self._value += amount
self._timestamp_sec = timestamp_sec

def set(self, value):
def set(self, value, timestamp_sec=None):
if not timestamp_sec:
timestamp_sec = time.time()
with self._lock:
self._value = value
self._timestamp_sec = timestamp_sec

def set_exemplar(self, exemplar):
with self._lock:
Expand Down Expand Up @@ -82,7 +90,7 @@ def __reset(self):
files[file_prefix] = MmapedDict(filename)
self._file = files[file_prefix]
self._key = mmap_key(metric_name, name, labelnames, labelvalues, help_text)
self._value = self._file.read_value(self._key)
self._value, self._timestamp_sec = self._file.read_value(self._key)

def __check_for_pid_change(self):
actual_pid = process_identifier()
Expand All @@ -95,17 +103,25 @@ def __check_for_pid_change(self):
for value in values:
value.__reset()

def inc(self, amount):
def inc(self, amount, timestamp_sec=None):
if not timestamp_sec:
timestamp_sec = time.time()

with lock:
self.__check_for_pid_change()
self._value += amount
self._file.write_value(self._key, self._value)
self._timestamp_sec = timestamp_sec
self._file.write_value(self._key, self._value, self._timestamp_sec)

def set(self, value, timestamp_sec=None):
if not timestamp_sec:
timestamp_sec = time.time()

def set(self, value):
with lock:
self.__check_for_pid_change()
self._value = value
self._file.write_value(self._key, self._value)
self._timestamp_sec = timestamp_sec
self._file.write_value(self._key, self._value, self._timestamp_sec)

def set_exemplar(self, exemplar):
# TODO: Implement exemplars for multiprocess mode.
Expand Down
40 changes: 30 additions & 10 deletions tests/test_multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,26 @@ def test_gauge_livesum(self):
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(2, self.registry.get_sample_value('g'))

def test_gauge_mostrecent(self):
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='mostrecent')
values.ValueClass = MultiProcessValue(lambda: 456)
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='mostrecent')
g1.set(1, 2000)
g2.set(2, 1000)
self.assertEqual(1, self.registry.get_sample_value('g'))
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(1, self.registry.get_sample_value('g'))

def test_gauge_livemostrecent(self):
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='livemostrecent')
values.ValueClass = MultiProcessValue(lambda: 456)
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='livemostrecent')
g1.set(1, 2000)
g2.set(2, 1000)
self.assertEqual(1, self.registry.get_sample_value('g'))
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(2, self.registry.get_sample_value('g'))

def test_namespace_subsystem(self):
c1 = Counter('c', 'help', registry=None, namespace='ns', subsystem='ss')
c1.inc(1)
Expand Down Expand Up @@ -369,28 +389,28 @@ def setUp(self):
self.d = mmap_dict.MmapedDict(self.tempfile)

def test_process_restart(self):
self.d.write_value('abc', 123.0)
self.d.write_value('abc', 123.0, 987.0)
self.d.close()
self.d = mmap_dict.MmapedDict(self.tempfile)
self.assertEqual(123, self.d.read_value('abc'))
self.assertEqual([('abc', 123.0)], list(self.d.read_all_values()))
self.assertEqual((123, 987.0), self.d.read_value('abc'))
self.assertEqual([('abc', 123.0, 987.0)], list(self.d.read_all_values()))

def test_expansion(self):
key = 'a' * mmap_dict._INITIAL_MMAP_SIZE
self.d.write_value(key, 123.0)
self.assertEqual([(key, 123.0)], list(self.d.read_all_values()))
self.d.write_value(key, 123.0, 987.0)
self.assertEqual([(key, 123.0, 987.0)], list(self.d.read_all_values()))

def test_multi_expansion(self):
key = 'a' * mmap_dict._INITIAL_MMAP_SIZE * 4
self.d.write_value('abc', 42.0)
self.d.write_value(key, 123.0)
self.d.write_value('def', 17.0)
self.d.write_value('abc', 42.0, 987.0)
self.d.write_value(key, 123.0, 876.0)
self.d.write_value('def', 17.0, 765.0)
self.assertEqual(
[('abc', 42.0), (key, 123.0), ('def', 17.0)],
[('abc', 42.0, 987.0), (key, 123.0, 876.0), ('def', 17.0, 765.0)],
list(self.d.read_all_values()))

def test_corruption_detected(self):
self.d.write_value('abc', 42.0)
self.d.write_value('abc', 42.0, 987.0)
# corrupt the written data
self.d._m[8:16] = b'somejunk'
with self.assertRaises(RuntimeError):
Expand Down