From 6d74490fa6f6e8b0f55f7b6773ea059c22ca6c50 Mon Sep 17 00:00:00 2001 From: "Edgar R. M" Date: Tue, 4 Oct 2022 17:10:44 -0500 Subject: [PATCH] feat: Allow configuring a dedicated metrics logger (#978) Co-authored-by: Aaron ("AJ") Steers --- docs/implementation/logging.md | 48 +++- docs/implementation/metrics.md | 20 +- noxfile.py | 1 + poetry.lock | 25 +- pyproject.toml | 3 + singer_sdk/default_logging.yml | 15 ++ singer_sdk/helpers/_resources.py | 24 ++ singer_sdk/metrics.py | 408 +++++++++++++++++++++++++++++++ singer_sdk/plugin_base.py | 4 + singer_sdk/streams/core.py | 211 ++++++---------- singer_sdk/streams/rest.py | 78 ++++-- tests/core/test_metrics.py | 87 +++++++ 12 files changed, 744 insertions(+), 180 deletions(-) create mode 100644 singer_sdk/default_logging.yml create mode 100644 singer_sdk/helpers/_resources.py create mode 100644 singer_sdk/metrics.py create mode 100644 tests/core/test_metrics.py diff --git a/docs/implementation/logging.md b/docs/implementation/logging.md index 3f6740333..4f5142be9 100644 --- a/docs/implementation/logging.md +++ b/docs/implementation/logging.md @@ -1,5 +1,49 @@ # Logging -Logs are configurable by the environment variables `_LOGLEVEL` (preferred) or `LOGLEVEL`. Use `LOGLEVEL` when you intend to control the log output for all taps and targets running within the environment. In contrast, we recommend setting `_LOGLEVEL` for more granual control of each tap or target individually. +Logs are configurable by the environment variables `_LOGLEVEL` (preferred) +or `LOGLEVEL`. Use `LOGLEVEL` when you intend to control the log output for all taps +and targets running within the environment. In contrast, we recommend setting +`_LOGLEVEL` for more granual control of each tap or target individually. -From most verbose to least verbose, the accepted values for logging level are `debug`, `info`, `warning`, and `error`. Logging level inputs are case insensitive. +From most verbose to least verbose, the accepted values for logging level are `debug`, +`info`, `warning`, and `error`. Logging level inputs are case-insensitive. + +## Custom logging configuration + +Users of a tap can configure the SDK logging by setting the `SINGER_SDK_LOG_CONFIG` +environment variable. The value of this variable should be a path to a YAML file in the +[Python logging dict format](https://docs.python.org/3/library/logging.config.html#dictionary-schema-details). + +For example, to send [metrics](./metrics.md) (with logger name `singer_sdk.metrics`) to a file, you could use the following config: + +```yaml +version: 1 +disable_existing_loggers: false +formatters: + metrics: + format: "{asctime} {message}" + style: "{" +handlers: + metrics: + class: logging.FileHandler + formatter: metrics + filename: metrics.log +loggers: + singer_sdk.metrics: + level: INFO + handlers: [ metrics ] + propagate: yes +``` + +This will send metrics to a `metrics.log`: + +``` +2022-09-29 00:48:52,746 INFO METRIC: {"metric_type": "timer", "metric": "http_request_duration", "value": 0.501743, "tags": {"stream": "continents", "endpoint": "", "http_status_code": 200, "status": "succeeded"}} +2022-09-29 00:48:52,775 INFO METRIC: {"metric_type": "counter", "metric": "http_request_count", "value": 1, "tags": {"stream": "continents", "endpoint": ""}} +2022-09-29 00:48:52,776 INFO METRIC: {"metric_type": "timer", "metric": "sync_duration", "value": 0.7397160530090332, "tags": {"stream": "continents", "context": {}, "status": "succeeded"}} +2022-09-29 00:48:52,776 INFO METRIC: {"metric_type": "counter", "metric": "record_count", "value": 7, "tags": {"stream": "continents", "context": {}}} +2022-09-29 00:48:53,225 INFO METRIC: {"metric_type": "timer", "metric": "http_request_duration", "value": 0.392148, "tags": {"stream": "countries", "endpoint": "", "http_status_code": 200, "status": "succeeded"}} +2022-09-29 00:48:53,302 INFO METRIC: {"metric_type": "counter", "metric": "http_request_count", "value": 1, "tags": {"stream": "countries", "endpoint": ""}} +2022-09-29 00:48:53,302 INFO METRIC: {"metric_type": "timer", "metric": "sync_duration", "value": 0.5258760452270508, "tags": {"stream": "countries", "context": {}, "status": "succeeded"}} +2022-09-29 00:48:53,303 INFO METRIC: {"metric_type": "counter", "metric": "record_count", "value": 250, "tags": {"stream": "countries", "context": {}}} +``` diff --git a/docs/implementation/metrics.md b/docs/implementation/metrics.md index c6a0a13f2..fede38532 100644 --- a/docs/implementation/metrics.md +++ b/docs/implementation/metrics.md @@ -2,17 +2,21 @@ Metrics logging is specified in the [Singer Spec](https://hub.meltano.com/singer/spec#metrics). The SDK will automatically -emit two types of metrics `record_count` and `http_request_duration`. +emit metrics for `record_count`, `http_request_duration` and `sync_duration`. -Customization options: +## Customization options -Developers may optionally add a `metrics_log_level` config option to their taps, -which will automatically allow this metrics logging to be customized at runtime. +### `metrics_log_level` -When `metrics_log_level` is supported, users can then -set one of these values (case insensitive), `INFO`, `DEBUG`, `NONE`, to override the -default logging level for metrics. This can be helpful for REST-type sources which use -make a large number of REST calls can therefor have very noisy metrics. +Metrics are logged at the `INFO` level. Developers may optionally add a +`metrics_log_level` config option to their taps, `WARNING` or `ERROR` to disable +metrics logging. + +### `SINGER_SDK_LOG_CONFIG` + +Metrics are written by the `singer_sdk.metrics` logger, so the end user can set +`SINGER_SDK_LOG_CONFIG` to a logging config file that defines the format and output +for metrics. See the [logging docs](./logging.md) for an example file. ## Additional Singer Metrics References diff --git a/noxfile.py b/noxfile.py index f7cc2be2c..00f4801cd 100644 --- a/noxfile.py +++ b/noxfile.py @@ -42,6 +42,7 @@ def mypy(session: Session) -> None: "types-requests", "types-pytz", "types-simplejson", + "types-PyYAML", ) session.run("mypy", *args) if not session.posargs: diff --git a/poetry.lock b/poetry.lock index 602acfdbb..72371b986 100644 --- a/poetry.lock +++ b/poetry.lock @@ -894,7 +894,7 @@ optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [[package]] -name = "pyyaml" +name = "PyYAML" version = "6.0" description = "YAML parser and emitter for Python" category = "main" @@ -1213,6 +1213,14 @@ category = "dev" optional = false python-versions = "*" +[[package]] +name = "types-PyYAML" +version = "6.0.12" +description = "Typing stubs for PyYAML" +category = "dev" +optional = false +python-versions = "*" + [[package]] name = "types-requests" version = "2.28.11.1" @@ -1323,7 +1331,7 @@ docs = ["sphinx", "sphinx-rtd-theme", "sphinx-copybutton", "myst-parser", "sphin [metadata] lock-version = "1.1" python-versions = "<3.11,>=3.7.1" -content-hash = "43a1b35db11df30a93263ae2704960d474797b830190524ffb9c4c23c2a9cbdf" +content-hash = "432ebb55808a3164695cc7dca560290aa95635693ea15cb04c5ff4db6f01f4a2" [metadata.files] alabaster = [ @@ -1993,7 +2001,7 @@ pytzdata = [ {file = "pytzdata-2020.1-py2.py3-none-any.whl", hash = "sha256:e1e14750bcf95016381e4d472bad004eef710f2d6417240904070b3d6654485f"}, {file = "pytzdata-2020.1.tar.gz", hash = "sha256:3efa13b335a00a8de1d345ae41ec78dd11c9f8807f522d39850f2dd828681540"}, ] -pyyaml = [ +PyYAML = [ {file = "PyYAML-6.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d4db7c7aef085872ef65a8fd7d6d09a14ae91f691dec3e87ee5ee0539d516f53"}, {file = "PyYAML-6.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:9df7ed3b3d2e0ecfe09e14741b857df43adb5a3ddadc919a2d94fbdf78fea53c"}, {file = "PyYAML-6.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:77f396e6ef4c73fdc33a9157446466f1cff553d979bd00ecb64385760c6babdc"}, @@ -2001,13 +2009,6 @@ pyyaml = [ {file = "PyYAML-6.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:f84fbc98b019fef2ee9a1cb3ce93e3187a6df0b2538a651bfb890254ba9f90b5"}, {file = "PyYAML-6.0-cp310-cp310-win32.whl", hash = "sha256:2cd5df3de48857ed0544b34e2d40e9fac445930039f3cfe4bcc592a1f836d513"}, {file = "PyYAML-6.0-cp310-cp310-win_amd64.whl", hash = "sha256:daf496c58a8c52083df09b80c860005194014c3698698d1a57cbcfa182142a3a"}, - {file = "PyYAML-6.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:d4b0ba9512519522b118090257be113b9468d804b19d63c71dbcf4a48fa32358"}, - {file = "PyYAML-6.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:81957921f441d50af23654aa6c5e5eaf9b06aba7f0a19c18a538dc7ef291c5a1"}, - {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:afa17f5bc4d1b10afd4466fd3a44dc0e245382deca5b3c353d8b757f9e3ecb8d"}, - {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:dbad0e9d368bb989f4515da330b88a057617d16b6a8245084f1b05400f24609f"}, - {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:432557aa2c09802be39460360ddffd48156e30721f5e8d917f01d31694216782"}, - {file = "PyYAML-6.0-cp311-cp311-win32.whl", hash = "sha256:bfaef573a63ba8923503d27530362590ff4f576c626d86a9fed95822a8255fd7"}, - {file = "PyYAML-6.0-cp311-cp311-win_amd64.whl", hash = "sha256:01b45c0191e6d66c470b6cf1b9531a771a83c1c4208272ead47a3ae4f2f603bf"}, {file = "PyYAML-6.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:897b80890765f037df3403d22bab41627ca8811ae55e9a722fd0392850ec4d86"}, {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:50602afada6d6cbfad699b0c7bb50d5ccffa7e46a3d738092afddc1f9758427f"}, {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:48c346915c114f5fdb3ead70312bd042a953a8ce5c7106d5bfb1a5254e47da92"}, @@ -2260,6 +2261,10 @@ types-pytz = [ {file = "types-pytz-2022.2.1.0.tar.gz", hash = "sha256:47cfb19c52b9f75896440541db392fd312a35b279c6307a531db71152ea63e2b"}, {file = "types_pytz-2022.2.1.0-py3-none-any.whl", hash = "sha256:50ead2254b524a3d4153bc65d00289b66898060d2938e586170dce918dbaf3b3"}, ] +types-PyYAML = [ + {file = "types-PyYAML-6.0.12.tar.gz", hash = "sha256:f6f350418125872f3f0409d96a62a5a5ceb45231af5cc07ee0034ec48a3c82fa"}, + {file = "types_PyYAML-6.0.12-py3-none-any.whl", hash = "sha256:29228db9f82df4f1b7febee06bbfb601677882e98a3da98132e31c6874163e15"}, +] types-requests = [ {file = "types-requests-2.28.11.1.tar.gz", hash = "sha256:02b1806c5b9904edcd87fa29236164aea0e6cdc4d93ea020cd615ef65cb43d65"}, {file = "types_requests-2.28.11.1-py3-none-any.whl", hash = "sha256:1ff2c1301f6fe58b5d1c66cdf631ca19734cb3b1a4bbadc878d75557d183291a"}, diff --git a/pyproject.toml b/pyproject.toml index 3c4456702..6cc09c4ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ PyJWT = "~=2.4" requests = "^2.25.1" cryptography = ">=3.4.6,<39.0.0" importlib-metadata = {version = "*", markers = "python_version < \"3.8\""} +importlib-resources = {version = "^5.9.0", markers = "python_version < \"3.9\""} memoization = ">=0.3.2,<0.5.0" jsonpath-ng = "^1.5.3" joblib = "^1.0.1" @@ -54,6 +55,7 @@ typing-extensions = "^4.2.0" simplejson = "^3.17.6" jsonschema = "^4.16.0" pytz = "^2022.2.1" +PyYAML = "^6.0" # Sphinx dependencies installed as optional 'docs' extras # https://github.com/readthedocs/readthedocs.org/issues/4912#issuecomment-664002569 @@ -89,6 +91,7 @@ types-python-dateutil = "^2.8.19" types-pytz = "^2022.2.1.0" types-requests = "^2.28.11" types-simplejson = "^3.17.7" +types-PyYAML = "^6.0.12" coverage = {extras = ["toml"], version = "^6.5"} # Cookiecutter tests diff --git a/singer_sdk/default_logging.yml b/singer_sdk/default_logging.yml new file mode 100644 index 000000000..c121fb47d --- /dev/null +++ b/singer_sdk/default_logging.yml @@ -0,0 +1,15 @@ +version: 1 +disable_existing_loggers: false +formatters: + console: + format: "{asctime} {message}" + style: "{" +handlers: + default: + class: logging.StreamHandler + formatter: console + stream: ext://sys.stderr +root: + level: INFO + propagate: true + handlers: [default] diff --git a/singer_sdk/helpers/_resources.py b/singer_sdk/helpers/_resources.py new file mode 100644 index 000000000..a458e0a55 --- /dev/null +++ b/singer_sdk/helpers/_resources.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +import sys +from types import ModuleType + +if sys.version_info >= (3, 9): + import importlib.resources as importlib_resources + from importlib.abc import Traversable +else: + import importlib_resources + from importlib_resources.abc import Traversable + + +def get_package_files(package: str | ModuleType) -> Traversable: + """Load a file from a package. + + Args: + package: The package to load the file from. + file: The file to load. + + Returns: + The file as a Traversable object. + """ + return importlib_resources.files(package) diff --git a/singer_sdk/metrics.py b/singer_sdk/metrics.py new file mode 100644 index 000000000..b7f2da023 --- /dev/null +++ b/singer_sdk/metrics.py @@ -0,0 +1,408 @@ +"""Singer metrics logging.""" + +from __future__ import annotations + +import abc +import enum +import json +import logging +import logging.config +import os +from dataclasses import asdict, dataclass, field +from pathlib import Path +from time import time +from types import TracebackType +from typing import Any, Generic, Mapping, TypeVar + +import yaml + +from singer_sdk.helpers._resources import Traversable, get_package_files + +DEFAULT_LOG_INTERVAL = 60.0 +METRICS_LOGGER_NAME = __name__ +METRICS_LOG_LEVEL_SETTING = "metrics_log_level" + +_TVal = TypeVar("_TVal") + + +class Status(str, enum.Enum): + """Constants for commonly used status values.""" + + SUCCEEDED = "succeeded" + FAILED = "failed" + + +class Tag(str, enum.Enum): + """Constants for commonly used tags.""" + + STREAM = "stream" + CONTEXT = "context" + ENDPOINT = "endpoint" + JOB_TYPE = "job_type" + HTTP_STATUS_CODE = "http_status_code" + STATUS = "status" + + +class Metric(str, enum.Enum): + """Common metric types.""" + + RECORD_COUNT = "record_count" + BATCH_COUNT = "batch_count" + HTTP_REQUEST_DURATION = "http_request_duration" + HTTP_REQUEST_COUNT = "http_request_count" + JOB_DURATION = "job_duration" + SYNC_DURATION = "sync_duration" + + +@dataclass +class Point(Generic[_TVal]): + """An individual metric measurement.""" + + metric_type: str + metric: Metric + value: _TVal + tags: dict[str, Any] = field(default_factory=dict) + + def __str__(self) -> str: + """Get string representation of this measurement. + + Returns: + A string representation of this measurement. + """ + return self.to_json() + + def to_json(self) -> str: + """Convert this measure to a JSON object. + + Returns: + A JSON object. + """ + return json.dumps(asdict(self)) + + +def log(logger: logging.Logger, point: Point) -> None: + """Log a measurement. + + Args: + logger: An logger instance. + point: A measurement. + """ + logger.info("INFO METRIC: %s", point) + + +class Meter(metaclass=abc.ABCMeta): + """Base class for all meters.""" + + def __init__(self, metric: Metric, tags: dict | None = None) -> None: + """Initialize a meter. + + Args: + metric: The metric type. + tags: Tags to add to the measurement. + """ + self.metric = metric + self.tags = tags or {} + self.logger = get_metrics_logger() + + @property + def context(self) -> dict | None: + """Get the context for this meter. + + Returns: + A context dictionary. + """ + return self.tags.get(Tag.CONTEXT) + + @context.setter + def context(self, value: dict | None) -> None: + """Set the context for this meter. + + Args: + value: A context dictionary. + """ + if value is None: + self.tags.pop(Tag.CONTEXT, None) + else: + self.tags[Tag.CONTEXT] = value + + @abc.abstractmethod + def __enter__(self) -> Meter: + """Enter the meter context.""" + ... + + @abc.abstractmethod + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Exit the meter context. + + Args: + exc_type: The exception type. + exc_val: The exception value. + exc_tb: The exception traceback. + """ + ... + + +class Counter(Meter): + """A meter for counting things.""" + + def __init__( + self, + metric: Metric, + tags: dict | None = None, + log_interval: float = DEFAULT_LOG_INTERVAL, + ) -> None: + """Initialize a counter. + + Args: + metric: The metric type. + tags: Tags to add to the measurement. + log_interval: The interval at which to log the count. + """ + super().__init__(metric, tags) + self.value = 0 + self.log_interval = log_interval + self.last_log_time = time() + + def __enter__(self) -> Counter: + """Enter the counter context. + + Returns: + The counter instance. + """ + self.last_log_time = time() + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Exit the counter context. + + Args: + exc_type: The exception type. + exc_val: The exception value. + exc_tb: The exception traceback. + """ + self._pop() + + def _pop(self) -> None: + """Log and reset the counter.""" + log(self.logger, Point("counter", self.metric, self.value, self.tags)) + self.value = 0 + self.last_log_time = time() + + def increment(self, value: int = 1) -> None: + """Increment the counter. + + Args: + value: The value to increment by. + """ + self.value += value + if self._ready_to_log(): + self._pop() + + def _ready_to_log(self) -> bool: + """Check if the counter is ready to log. + + Returns: + True if the counter is ready to log. + """ + return time() - self.last_log_time > self.log_interval + + +class Timer(Meter): + """A meter for timing things.""" + + def __init__(self, metric: Metric, tags: dict | None = None) -> None: + """Initialize a timer. + + Args: + metric: The metric type. + tags: Tags to add to the measurement. + """ + super().__init__(metric, tags) + self.start_time = time() + + def __enter__(self) -> Timer: + """Enter the timer context. + + Returns: + The timer instance. + """ + self.start_time = time() + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Exit the timer context. + + Args: + exc_type: The exception type. + exc_val: The exception value. + exc_tb: The exception traceback. + """ + if Tag.STATUS not in self.tags: + if exc_type is None: + self.tags[Tag.STATUS] = Status.SUCCEEDED + else: + self.tags[Tag.STATUS] = Status.FAILED + log(self.logger, Point("timer", self.metric, self.elapsed(), self.tags)) + + def elapsed(self) -> float: + """Get the elapsed time. + + Returns: + The elapsed time. + """ + return time() - self.start_time + + +def get_metrics_logger() -> logging.Logger: + """Get a logger for emitting metrics. + + Returns: + A logger that can be used to emit metrics. + """ + return logging.getLogger(METRICS_LOGGER_NAME) + + +def record_counter( + stream: str, + endpoint: str | None = None, + log_interval: float = DEFAULT_LOG_INTERVAL, + **tags: Any, +) -> Counter: + """Use for counting records retrieved from the source. + + with record_counter("my_stream", endpoint="/users") as counter: + for record in my_records: + # Do something with the record + counter.increment() + + Args: + stream: The stream name. + endpoint: The endpoint name. + log_interval: The interval at which to log the count. + tags: Tags to add to the measurement. + + Returns: + A counter for counting records. + """ + tags[Tag.STREAM] = stream + if endpoint: + tags[Tag.ENDPOINT] = endpoint + return Counter(Metric.RECORD_COUNT, tags, log_interval=log_interval) + + +def batch_counter(stream: str, **tags: Any) -> Counter: + """Use for counting batches sent to the target. + + with batch_counter("my_stream") as counter: + for batch in my_batches: + # Do something with the batch + counter.increment() + + Args: + stream: The stream name. + tags: Tags to add to the measurement. + + Returns: + A counter for counting batches. + """ + tags[Tag.STREAM] = stream + return Counter(Metric.BATCH_COUNT, tags) + + +def http_request_counter( + stream: str, + endpoint: str, + log_interval: float = DEFAULT_LOG_INTERVAL, + **tags: Any, +) -> Counter: + """Use for counting HTTP requests. + + with http_request_counter() as counter: + for record in my_records: + # Do something with the record + counter.increment() + + Args: + stream: The stream name. + endpoint: The endpoint name. + log_interval: The interval at which to log the count. + tags: Tags to add to the measurement. + + Returns: + A counter for counting HTTP requests. + """ + tags.update({Tag.STREAM: stream, Tag.ENDPOINT: endpoint}) + return Counter(Metric.HTTP_REQUEST_COUNT, tags, log_interval=log_interval) + + +def sync_timer(stream: str, **tags: Any) -> Timer: + """Use for timing the sync of a stream. + + with singer.metrics.sync_timer() as timer: + # Do something + print(f"Sync took {timer.elapsed()} seconds") + + Args: + stream: The stream name. + tags: Tags to add to the measurement. + + Returns: + A timer for timing the sync of a stream. + """ + tags[Tag.STREAM] = stream + return Timer(Metric.SYNC_DURATION, tags) + + +def _load_yaml_logging_config(path: Traversable | Path) -> Any: # noqa: ANN401 + """Load the logging config from the YAML file. + + Args: + path: A path to the YAML file. + + Returns: + The logging config. + """ + with path.open() as f: + return yaml.safe_load(f) + + +def _get_default_config() -> Any: # noqa: ANN401 + """Get a logging configuration. + + Returns: + A logging configuration. + """ + log_config_path = get_package_files("singer_sdk").joinpath("default_logging.yml") + return _load_yaml_logging_config(log_config_path) + + +def _setup_logging(config: Mapping[str, Any]) -> None: + """Setup logging. + + Args: + config: A plugin configuration dictionary. + """ + logging.config.dictConfig(_get_default_config()) + + config = config or {} + metrics_log_level = config.get(METRICS_LOG_LEVEL_SETTING, "INFO").upper() + logging.getLogger(METRICS_LOGGER_NAME).setLevel(metrics_log_level) + + if "SINGER_SDK_LOG_CONFIG" in os.environ: + log_config_path = Path(os.environ["SINGER_SDK_LOG_CONFIG"]) + logging.config.dictConfig(_load_yaml_logging_config(log_config_path)) diff --git a/singer_sdk/plugin_base.py b/singer_sdk/plugin_base.py index 7ef4d4857..9c2258baa 100644 --- a/singer_sdk/plugin_base.py +++ b/singer_sdk/plugin_base.py @@ -23,6 +23,7 @@ import click from jsonschema import Draft4Validator, SchemaError, ValidationError +from singer_sdk import metrics from singer_sdk.configuration._dict_config import parse_environment_config from singer_sdk.exceptions import ConfigValidationError from singer_sdk.helpers._classproperty import classproperty @@ -120,6 +121,9 @@ def __init__( self._validate_config(raise_errors=validate_config) self.mapper: PluginMapper + metrics._setup_logging(self.config) + self.metrics_logger = metrics.get_metrics_logger() + @classproperty def capabilities(self) -> List[CapabilitiesEnum]: """Get capabilities. diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 9c7d2e2d6..bc41601be 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -12,13 +12,13 @@ from os import PathLike from pathlib import Path from types import MappingProxyType -from typing import Any, Callable, Generator, Iterable, Iterator, Mapping, TypeVar, cast +from typing import Any, Generator, Iterable, Iterator, Mapping, TypeVar, cast from uuid import uuid4 import pendulum -import requests import singer_sdk._singerlib as singer +from singer_sdk import metrics from singer_sdk.exceptions import InvalidStreamSortException, MaxRecordsLimitException from singer_sdk.helpers._batch import ( BaseBatchFileEncoding, @@ -52,8 +52,6 @@ FactoryType = TypeVar("FactoryType", bound="Stream") _T = TypeVar("_T") -METRICS_LOG_LEVEL_SETTING = "metrics_log_level" - def lazy_chunked_generator( iterable: Iterable[_T], @@ -116,6 +114,7 @@ def __init__( raise ValueError("Missing argument or class variable 'name'.") self.logger: logging.Logger = tap.logger + self.metrics_logger = tap.metrics_logger self.tap_name: str = tap.name self._config: dict = dict(tap.config) self._tap = tap @@ -832,95 +831,13 @@ def _write_batch_message( ) ) - @property - def _metric_logging_function(self) -> Callable | None: - """Return the metrics logging function. - - Returns: - The logging function for emitting metrics. - - Raises: - ValueError: If logging level setting is an unsupported value. - """ - if METRICS_LOG_LEVEL_SETTING not in self.config: - return self.logger.info - - if self.config[METRICS_LOG_LEVEL_SETTING].upper() == "INFO": - return self.logger.info - - if self.config[METRICS_LOG_LEVEL_SETTING].upper() == "DEBUG": - return self.logger.debug - - if self.config[METRICS_LOG_LEVEL_SETTING].upper() == "NONE": - return None - - raise ValueError( - "Unexpected logging level for metrics: " - + self.config[METRICS_LOG_LEVEL_SETTING] - ) - - def _write_metric_log(self, metric: dict, extra_tags: dict | None) -> None: - """Emit a metric log. Optionally with appended tag info. - - Args: - metric: TODO - extra_tags: TODO - - Returns: - None - """ - if not self._metric_logging_function: - return None - - if extra_tags: - metric["tags"].update(extra_tags) - self._metric_logging_function(f"INFO METRIC: {json.dumps(metric)}") - - def _write_record_count_log(self, record_count: int, context: dict | None) -> None: - """Emit a metric log. Optionally with appended tag info. + def _log_metric(self, point: metrics.Point) -> None: + """Log a single measurement. Args: - record_count: TODO - context: Stream partition or context dictionary. + point: A single measurement value. """ - extra_tags = {} if not context else {"context": context} - counter_metric: dict[str, Any] = { - "type": "counter", - "metric": "record_count", - "value": record_count, - "tags": {"stream": self.name}, - } - self._write_metric_log(counter_metric, extra_tags=extra_tags) - - def _write_request_duration_log( - self, - endpoint: str, - response: requests.Response, - context: dict | None, - extra_tags: dict | None, - ) -> None: - """TODO. - - Args: - endpoint: TODO - response: TODO - context: Stream partition or context dictionary. - extra_tags: TODO - """ - request_duration_metric: dict[str, Any] = { - "type": "timer", - "metric": "http_request_duration", - "value": response.elapsed.total_seconds(), - "tags": { - "endpoint": endpoint, - "http_status_code": response.status_code, - "status": "succeeded" if response.status_code < 400 else "failed", - }, - } - extra_tags = extra_tags or {} - if context: - extra_tags["context"] = context - self._write_metric_log(metric=request_duration_metric, extra_tags=extra_tags) + metrics.log(self.metrics_logger, point=point) def log_sync_costs(self) -> None: """Log a summary of Sync costs. @@ -1040,70 +957,80 @@ def _sync_records( Yields: Each record from the source. """ + # Initialize metrics + record_counter = metrics.record_counter(self.name) + timer = metrics.sync_timer(self.name) + record_count = 0 current_context: dict | None context_list: list[dict] | None context_list = [context] if context is not None else self.partitions selected = self.selected - for current_context in context_list or [{}]: - partition_record_count = 0 - current_context = current_context or None - state = self.get_context_state(current_context) - state_partition_context = self._get_state_partition_context(current_context) - self._write_starting_replication_value(current_context) - child_context: dict | None = ( - None if current_context is None else copy.copy(current_context) - ) + with record_counter, timer: + for current_context in context_list or [{}]: + record_counter.context = current_context + timer.context = current_context - for record_result in self.get_records(current_context): - if isinstance(record_result, tuple): - # Tuple items should be the record and the child context - record, child_context = record_result - else: - record = record_result - try: - self._process_record( - record, - child_context=child_context, - partition_context=state_partition_context, - ) - except InvalidStreamSortException as ex: - log_sort_error( - log_fn=self.logger.error, - ex=ex, - record_count=record_count + 1, - partition_record_count=partition_record_count + 1, - current_context=current_context, - state_partition_context=state_partition_context, - stream_name=self.name, - ) - raise ex + partition_record_count = 0 + current_context = current_context or None + state = self.get_context_state(current_context) + state_partition_context = self._get_state_partition_context( + current_context + ) + self._write_starting_replication_value(current_context) + child_context: dict | None = ( + None if current_context is None else copy.copy(current_context) + ) - self._check_max_record_limit(record_count) + for record_result in self.get_records(current_context): + if isinstance(record_result, tuple): + # Tuple items should be the record and the child context + record, child_context = record_result + else: + record = record_result + try: + self._process_record( + record, + child_context=child_context, + partition_context=state_partition_context, + ) + except InvalidStreamSortException as ex: + log_sort_error( + log_fn=self.logger.error, + ex=ex, + record_count=record_count + 1, + partition_record_count=partition_record_count + 1, + current_context=current_context, + state_partition_context=state_partition_context, + stream_name=self.name, + ) + raise ex - if selected: - if ( - record_count - 1 - ) % self.STATE_MSG_FREQUENCY == 0 and write_messages: - self._write_state_message() - if write_messages: - self._write_record_message(record) - self._increment_stream_state(record, context=current_context) + self._check_max_record_limit(record_count) - yield record + if selected: + if ( + record_count - 1 + ) % self.STATE_MSG_FREQUENCY == 0 and write_messages: + self._write_state_message() + if write_messages: + self._write_record_message(record) + self._increment_stream_state(record, context=current_context) - record_count += 1 - partition_record_count += 1 + yield record - if current_context == state_partition_context: - # Finalize per-partition state only if 1:1 with context - finalize_state_progress_markers(state) + record_counter.increment() + record_count += 1 + partition_record_count += 1 + + if current_context == state_partition_context: + # Finalize per-partition state only if 1:1 with context + finalize_state_progress_markers(state) if not context: # Finalize total stream only if we have the full full context. # Otherwise will be finalized by tap at end of sync. finalize_state_progress_markers(self.stream_state) - self._write_record_count_log(record_count=record_count, context=context) if write_messages: # Reset interim bookmarks before emitting final STATE message: @@ -1120,9 +1047,11 @@ def _sync_batches( batch_config: The batch configuration. context: Stream partition or context dictionary. """ - for encoding, manifest in self.get_batches(batch_config, context): - self._write_batch_message(encoding=encoding, manifest=manifest) - self._write_state_message() + with metrics.batch_counter(self.name, context=context) as counter: + for encoding, manifest in self.get_batches(batch_config, context): + counter.increment() + self._write_batch_message(encoding=encoding, manifest=manifest) + self._write_state_message() # Public methods ("final", not recommended to be overridden) diff --git a/singer_sdk/streams/rest.py b/singer_sdk/streams/rest.py index cd7709edc..65f134cc0 100644 --- a/singer_sdk/streams/rest.py +++ b/singer_sdk/streams/rest.py @@ -13,6 +13,7 @@ import backoff import requests +from singer_sdk import metrics from singer_sdk._singerlib import Schema from singer_sdk.authenticators import APIAuthenticatorBase, SimpleAuthenticator from singer_sdk.exceptions import FatalAPIError, RetriableAPIError @@ -241,16 +242,14 @@ def _request( TODO """ response = self.requests_session.send(prepared_request, timeout=self.timeout) - if self._LOG_REQUEST_METRICS: - extra_tags = {} - if self._LOG_REQUEST_METRIC_URLS: - extra_tags["url"] = prepared_request.path_url - self._write_request_duration_log( - endpoint=self.path, - response=response, - context=context, - extra_tags=extra_tags, - ) + self._write_request_duration_log( + endpoint=self.path, + response=response, + context=context, + extra_tags={"url": prepared_request.path_url} + if self._LOG_REQUEST_METRIC_URLS + else None, + ) self.validate_response(response) logging.debug("Response received successfully.") return response @@ -347,16 +346,57 @@ def request_records(self, context: dict | None) -> Iterable[dict]: paginator = self.get_new_paginator() decorated_request = self.request_decorator(self._request) - while not paginator.finished: - prepared_request = self.prepare_request( - context, - next_page_token=paginator.current_value, - ) - resp = decorated_request(prepared_request, context) - self.update_sync_costs(prepared_request, resp, context) - yield from self.parse_response(resp) + with metrics.http_request_counter(self.name, self.path) as request_counter: + request_counter.context = context + + while not paginator.finished: + prepared_request = self.prepare_request( + context, + next_page_token=paginator.current_value, + ) + resp = decorated_request(prepared_request, context) + request_counter.increment() + self.update_sync_costs(prepared_request, resp, context) + yield from self.parse_response(resp) - paginator.advance(resp) + paginator.advance(resp) + + def _write_request_duration_log( + self, + endpoint: str, + response: requests.Response, + context: dict | None, + extra_tags: dict | None, + ) -> None: + """TODO. + + Args: + endpoint: TODO + response: TODO + context: Stream partition or context dictionary. + extra_tags: TODO + """ + extra_tags = extra_tags or {} + if context: + extra_tags[metrics.Tag.CONTEXT] = context + + point = metrics.Point( + "timer", + metric=metrics.Metric.HTTP_REQUEST_DURATION, + value=response.elapsed.total_seconds(), + tags={ + metrics.Tag.STREAM: self.name, + metrics.Tag.ENDPOINT: self.path, + metrics.Tag.HTTP_STATUS_CODE: response.status_code, + metrics.Tag.STATUS: ( + metrics.Status.SUCCEEDED + if response.status_code < 400 + else metrics.Status.FAILED + ), + **extra_tags, + }, + ) + self._log_metric(point) def update_sync_costs( self, diff --git a/tests/core/test_metrics.py b/tests/core/test_metrics.py new file mode 100644 index 000000000..7bd8c7481 --- /dev/null +++ b/tests/core/test_metrics.py @@ -0,0 +1,87 @@ +import logging +import time + +import pytest + +from singer_sdk import metrics + + +def test_meter(): + class _MyMeter(metrics.Meter): + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + meter = _MyMeter(metrics.Metric.RECORD_COUNT) + + assert meter.tags == {} + + stream_context = {"parent_id": 1} + meter.context = stream_context + assert meter.tags == {metrics.Tag.CONTEXT: stream_context} + + meter.context = None + assert metrics.Tag.CONTEXT not in meter.tags + + +def test_record_counter(caplog: pytest.LogCaptureFixture): + caplog.set_level(logging.INFO, logger=metrics.METRICS_LOGGER_NAME) + with metrics.record_counter( + "test_stream", + endpoint="test_endpoint", + custom_tag="pytest", + ) as counter: + for _ in range(100): + counter.last_log_time = 0 + assert counter._ready_to_log() + + counter.increment() + + total = 0 + + assert len(caplog.records) == 100 + 1 + + for record in caplog.records: + assert record.levelname == "INFO" + assert record.msg == "INFO METRIC: %s" + + point: metrics.Point[int] = record.args[0] + assert point.metric_type == "counter" + assert point.metric == "record_count" + assert point.tags == { + metrics.Tag.STREAM: "test_stream", + metrics.Tag.ENDPOINT: "test_endpoint", + "custom_tag": "pytest", + } + + total += point.value + + assert total == 100 + + +def test_sync_timer(caplog: pytest.LogCaptureFixture): + caplog.set_level(logging.INFO, logger=metrics.METRICS_LOGGER_NAME) + with metrics.sync_timer("test_stream", custom_tag="pytest") as timer: + start_time = timer.start_time + for _ in range(1000): + time.sleep(0.001) + end_time = time.time() + + assert len(caplog.records) == 1 + + record = caplog.records[0] + assert record.levelname == "INFO" + assert record.msg == "INFO METRIC: %s" + + point: metrics.Point[float] = record.args[0] + assert point.metric_type == "timer" + assert point.metric == "sync_duration" + assert point.tags == { + metrics.Tag.STREAM: "test_stream", + metrics.Tag.STATUS: "succeeded", + "custom_tag": "pytest", + } + + assert pytest.approx(point.value, rel=0.001) == end_time - start_time