Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add type annotations to parser.py #887

Merged
merged 1 commit into from
Feb 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions prometheus_client/parser.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import io as StringIO
import re
from typing import Dict, Iterable, List, Match, Optional, TextIO, Tuple

from .metrics_core import Metric
from .samples import Sample


def text_string_to_metric_families(text):
def text_string_to_metric_families(text: str) -> Iterable[Metric]:
"""Parse Prometheus text format from a unicode string.

See text_fd_to_metric_families.
Expand All @@ -20,32 +21,32 @@ def text_string_to_metric_families(text):
}


def replace_escape_sequence(match):
def replace_escape_sequence(match: Match[str]) -> str:
return ESCAPE_SEQUENCES[match.group(0)]


HELP_ESCAPING_RE = re.compile(r'\\[\\n]')
ESCAPING_RE = re.compile(r'\\[\\n"]')


def _replace_help_escaping(s):
def _replace_help_escaping(s: str) -> str:
return HELP_ESCAPING_RE.sub(replace_escape_sequence, s)


def _replace_escaping(s):
def _replace_escaping(s: str) -> str:
return ESCAPING_RE.sub(replace_escape_sequence, s)


def _is_character_escaped(s, charpos):
def _is_character_escaped(s: str, charpos: int) -> bool:
num_bslashes = 0
while (charpos > num_bslashes
and s[charpos - 1 - num_bslashes] == '\\'):
num_bslashes += 1
return num_bslashes % 2 == 1


def _parse_labels(labels_string):
labels = {}
def _parse_labels(labels_string: str) -> Dict[str, str]:
labels: Dict[str, str] = {}
# Return if we don't have valid labels
if "=" not in labels_string:
return labels
Expand Down Expand Up @@ -95,7 +96,7 @@ def _parse_labels(labels_string):


# If we have multiple values only consider the first
def _parse_value_and_timestamp(s):
def _parse_value_and_timestamp(s: str) -> Tuple[float, Optional[float]]:
s = s.lstrip()
separator = " "
if separator not in s:
Expand All @@ -108,7 +109,7 @@ def _parse_value_and_timestamp(s):
return value, timestamp


def _parse_sample(text):
def _parse_sample(text: str) -> Sample:
# Detect the labels in the text
try:
label_start, label_end = text.index("{"), text.rindex("}")
Expand All @@ -133,7 +134,7 @@ def _parse_sample(text):
return Sample(name, {}, value, timestamp)


def text_fd_to_metric_families(fd):
def text_fd_to_metric_families(fd: TextIO) -> Iterable[Metric]:
"""Parse Prometheus text format from a file descriptor.

This is a laxer parser than the main Go parser,
Expand All @@ -145,10 +146,10 @@ def text_fd_to_metric_families(fd):
name = ''
documentation = ''
typ = 'untyped'
samples = []
samples: List[Sample] = []
allowed_names = []

def build_metric(name, documentation, typ, samples):
def build_metric(name: str, documentation: str, typ: str, samples: List[Sample]) -> Metric:
# Munge counters into OpenMetrics representation
# used internally.
if typ == 'counter':
Expand Down