Skip to content

Commit

Permalink
Merge pull request #542 from mdekstrand/feature/fix-process-logging
Browse files Browse the repository at this point in the history
Fix subprocess logging
  • Loading branch information
mdekstrand authored Dec 7, 2024
2 parents bf64152 + 87abf4c commit 2d66234
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 4 deletions.
2 changes: 1 addition & 1 deletion lenskit/lenskit/logging/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,6 @@ def _handle_log_message(self):
logger = structlog.get_logger(name)
data = json.loads(data)
method = getattr(logger, data["method"])
method(data["event"])
method(**data["event"])
else:
_log.error("invalid log backend")
15 changes: 15 additions & 0 deletions lenskit/lenskit/logging/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
LensKit logging processors.
"""

import multiprocessing as mp
from datetime import datetime
from typing import Any

Expand Down Expand Up @@ -32,3 +33,17 @@ def format_timestamp(logger: Any, method: str, event_dict: EventDict) -> EventDi
return event_dict
else:
return event_dict


def add_process_info(logger: Any, method: str, event_dict: EventDict) -> EventDict:
"""
Add process info if it does not exist.
"""

proc = mp.current_process()
if "pid" not in event_dict:
event_dict["pid"] = proc.pid
if "pname" not in event_dict:
event_dict["pname"] = proc.name

return event_dict
9 changes: 8 additions & 1 deletion lenskit/lenskit/logging/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@

from .config import CORE_PROCESSORS, active_logging_config
from .monitor import get_monitor
from .processors import add_process_info

_active_context: WorkerContext | None = None
_log = structlog.stdlib.get_logger(__name__)


@dataclass
Expand Down Expand Up @@ -82,9 +84,13 @@ def start(self):
root.setLevel(self.config.level)

structlog.configure(
CORE_PROCESSORS + [self._log_handler.send_structlog],
[add_process_info]
+ CORE_PROCESSORS
+ [structlog.processors.ExceptionPrettyPrinter(), self._log_handler.send_structlog],
wrapper_class=structlog.make_filtering_bound_logger(self.config.level),
logger_factory=structlog.stdlib.LoggerFactory(),
)
_log.debug("log context activated")

def shutdown(self):
root = getLogger()
Expand Down Expand Up @@ -119,6 +125,7 @@ def handle(self, record: LogRecord) -> LogRecord | bool: # type: ignore
# update messages for copyability
if not hasattr(record, "message"):
record.message = record.msg % record.args

record.exc_info = None
record.exc_text = None
record.stack_info = None
Expand Down
2 changes: 1 addition & 1 deletion lenskit/lenskit/parallel/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ def __init__(self, *args: Any, **kwargs: Any):

@override
def run(self):
log = _log.bind(pid=self.pid, pname=self.name)
with WorkerContext(self._log_config):
log = _log.bind(pid=self.pid, pname=self.name)
log.info("multiprocessing worker started")
super().run()

Expand Down
2 changes: 1 addition & 1 deletion lenskit/lenskit/parallel/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def initalize(cfg: WorkerConfig, ctx: ModelData) -> None:
log.error("deserialization failed: %s", e)
raise e

log.debug("worker %d ready (process %s)", os.getpid(), mp.current_process())
log.debug("worker %d ready (process %s)", os.getpid(), mp.current_process().name)


def worker(arg: Any) -> Any:
Expand Down

0 comments on commit 2d66234

Please sign in to comment.