Skip to content

Commit

Permalink
Allow passing custo logging config
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Sep 29, 2022
1 parent a347eb6 commit 131c498
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 69 deletions.
41 changes: 33 additions & 8 deletions docs/implementation/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,42 @@

Metrics logging is specified in the
[Singer Spec](https://hub.meltano.com/singer/spec#metrics). The SDK will automatically
emit two types of metrics `record_count` and `http_request_duration`.
emit two types of metrics `record_count`, `http_request_duration` and `sync_duration`.

Customization options:
## Customization options

Developers may optionally add a `metrics_log_level` config option to their taps,
which will automatically allow this metrics logging to be customized at runtime.
### `metrics_log_level`

When `metrics_log_level` is supported, users can then
set one of these values (case insensitive), `INFO`, `DEBUG`, `NONE`, to override the
default logging level for metrics. This can be helpful for REST-type sources which use
make a large number of REST calls can therefor have very noisy metrics.
Metrics are logged at the `INFO` level. Developers may optionally add a
`metrics_log_level` config option to their taps, `WARNING` or `ERROR` to disable
metrics logging.

### `SINGER_SDK_LOG_CONFIG`

Users of a tap can configure the SDK logging by setting the `SINGER_SDK_LOG_CONFIG`
environment variable. The value of this variable should be a path to a YAML file in the
[Python logging dict format](https://docs.python.org/3/library/logging.config.html#dictionary-schema-details).

For example, to direct metrics records to a file, you could use the following config:

```yaml
version: 1
disable_existing_loggers: false
formatters:
metrics:
format: "{asctime} {message}"
style: "{"
handlers:
metrics:
class: logging.FileHandler
formatter: metrics
filename: metrics.log
loggers:
singer_sdk.metrics:
level: INFO
handlers: [ metrics ]
propagate: yes
```
## Additional Singer Metrics References
Expand Down
1 change: 1 addition & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def mypy(session: Session) -> None:
"types-requests",
"types-pytz",
"types-simplejson",
"types-PyYAML",
)
session.run("mypy", *args)
if not session.posargs:
Expand Down
25 changes: 15 additions & 10 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ PyJWT = "~=2.4"
requests = "^2.25.1"
cryptography = ">=3.4.6,<39.0.0"
importlib-metadata = {version = "*", markers = "python_version < \"3.8\""}
importlib-resources = {version = "^5.9.0", markers = "python_version < \"3.9\""}
memoization = ">=0.3.2,<0.5.0"
jsonpath-ng = "^1.5.3"
joblib = "^1.0.1"
Expand All @@ -54,6 +55,7 @@ typing-extensions = "^4.2.0"
simplejson = "^3.17.6"
jsonschema = "^4.16.0"
pytz = "^2022.2.1"
PyYAML = "^6.0"

# Sphinx dependencies installed as optional 'docs' extras
# https://github.com/readthedocs/readthedocs.org/issues/4912#issuecomment-664002569
Expand Down Expand Up @@ -89,6 +91,7 @@ types-python-dateutil = "^2.8.19"
types-pytz = "^2022.2.1.0"
types-requests = "^2.28.11"
types-simplejson = "^3.17.7"
types-PyYAML = "^6.0.12"
coverage = {extras = ["toml"], version = "^6.4"}

# Cookiecutter tests
Expand Down
15 changes: 15 additions & 0 deletions singer_sdk/default_logging.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version: 1
disable_existing_loggers: false
formatters:
console:
format: "{asctime} {message}"
style: "{"
handlers:
default:
class: logging.StreamHandler
formatter: console
stream: ext://sys.stderr
root:
level: INFO
propagate: true
handlers: [default]
24 changes: 24 additions & 0 deletions singer_sdk/helpers/_resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

import sys
from types import ModuleType

if sys.version_info >= (3, 9):
import importlib.resources as importlib_resources
from importlib.abc import Traversable
else:
import importlib_resources
from importlib_resources.abc import Traversable


def get_package_files(package: str | ModuleType) -> Traversable:
"""Load a file from a package.
Args:
package: The package to load the file from.
file: The file to load.
Returns:
The file as a Traversable object.
"""
return importlib_resources.files(package)
69 changes: 34 additions & 35 deletions singer_sdk/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@
import json
import logging
import logging.config
import os
from dataclasses import asdict, dataclass, field
from pathlib import Path
from time import time
from types import TracebackType
from typing import Any, Generic, Mapping, TypeVar

import yaml

from singer_sdk.helpers._resources import Traversable, get_package_files

DEFAULT_LOG_INTERVAL = 60.0
METRICS_LOGGER_NAME = "singer.metrics"
METRICS_LOGGER_NAME = __name__
METRICS_LOG_LEVEL_SETTING = "metrics_log_level"

_TVal = TypeVar("_TVal")
Expand Down Expand Up @@ -279,7 +285,7 @@ def record_counter(
) -> Counter:
"""Use for counting records retrieved from the source.
with singer.metrics.record_counter(endpoint="users") as counter:
with record_counter("my_stream", endpoint="/users") as counter:
for record in my_records:
# Do something with the record
counter.increment()
Expand All @@ -302,7 +308,7 @@ def record_counter(
def batch_counter(stream: str, **tags: Any) -> Counter:
"""Use for counting batches sent to the target.
with singer.metrics.batch_counter() as counter:
with batch_counter("my_stream") as counter:
for batch in my_batches:
# Do something with the batch
counter.increment()
Expand All @@ -326,7 +332,7 @@ def http_request_counter(
) -> Counter:
"""Use for counting HTTP requests.
with singer.metrics.http_request_counter() as counter:
with http_request_counter() as counter:
for record in my_records:
# Do something with the record
counter.increment()
Expand Down Expand Up @@ -362,42 +368,27 @@ def sync_timer(stream: str, **tags: Any) -> Timer:
return Timer(Metric.SYNC_DURATION, tags)


def _get_logging_config(config: Mapping[str, Any] | None = None) -> dict:
"""Get a logging configuration.
def _load_yaml_logging_config(path: Traversable | Path) -> Any: # noqa: ANN401
"""Load the logging config from the YAML file.
Args:
config: A logging configuration.
path: A path to the YAML file.
Returns:
The logging config.
"""
with path.open() as f:
return yaml.safe_load(f)


def _get_default_config() -> Any: # noqa: ANN401
"""Get a logging configuration.
Returns:
A logging configuration.
"""
config = config or {}
metrics_log_level = config.get(METRICS_LOG_LEVEL_SETTING, "INFO")

return {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"metrics": {
"format": "{asctime} {message}",
"style": "{",
},
},
"handlers": {
"metrics": {
"class": "logging.FileHandler",
"formatter": "metrics",
"filename": "metrics.log",
},
},
"loggers": {
METRICS_LOGGER_NAME: {
"level": metrics_log_level,
"handlers": ["metrics"],
"propagate": True,
},
},
}
log_config_path = get_package_files("singer_sdk").joinpath("default_logging.yml")
return _load_yaml_logging_config(log_config_path)


def _setup_logging(config: Mapping[str, Any]) -> None:
Expand All @@ -406,4 +397,12 @@ def _setup_logging(config: Mapping[str, Any]) -> None:
Args:
config: A plugin configuration dictionary.
"""
logging.config.dictConfig(_get_logging_config(config))
logging.config.dictConfig(_get_default_config())

config = config or {}
metrics_log_level = config.get(METRICS_LOG_LEVEL_SETTING, "INFO").upper()
logging.getLogger(METRICS_LOGGER_NAME).setLevel(metrics_log_level)

if "SINGER_SDK_LOG_CONFIG" in os.environ:
log_config_path = Path(os.environ["SINGER_SDK_LOG_CONFIG"])
logging.config.dictConfig(_load_yaml_logging_config(log_config_path))
34 changes: 18 additions & 16 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,20 +967,22 @@ def _sync_records(
context_list = [context] if context is not None else self.partitions
selected = self.selected

for current_context in context_list or [{}]:
record_counter.context = current_context
timer.context = current_context

partition_record_count = 0
current_context = current_context or None
state = self.get_context_state(current_context)
state_partition_context = self._get_state_partition_context(current_context)
self._write_starting_replication_value(current_context)
child_context: dict | None = (
None if current_context is None else copy.copy(current_context)
)
with record_counter, timer:
for current_context in context_list or [{}]:
record_counter.context = current_context
timer.context = current_context

partition_record_count = 0
current_context = current_context or None
state = self.get_context_state(current_context)
state_partition_context = self._get_state_partition_context(
current_context
)
self._write_starting_replication_value(current_context)
child_context: dict | None = (
None if current_context is None else copy.copy(current_context)
)

with record_counter, timer:
for record_result in self.get_records(current_context):
if isinstance(record_result, tuple):
# Tuple items should be the record and the child context
Expand Down Expand Up @@ -1022,9 +1024,9 @@ def _sync_records(
record_count += 1
partition_record_count += 1

if current_context == state_partition_context:
# Finalize per-partition state only if 1:1 with context
finalize_state_progress_markers(state)
if current_context == state_partition_context:
# Finalize per-partition state only if 1:1 with context
finalize_state_progress_markers(state)
if not context:
# Finalize total stream only if we have the full full context.
# Otherwise will be finalized by tap at end of sync.
Expand Down

0 comments on commit 131c498

Please sign in to comment.