Skip to content

Commit

Permalink
Include update info in logging output (#664)
Browse files Browse the repository at this point in the history
* Make LoggerAdapter behavior identical to merge_extra=True
* Add update info to logging `extra`
  • Loading branch information
dandavison authored Nov 6, 2024
1 parent 0b327b0 commit 723d234
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 11 deletions.
36 changes: 28 additions & 8 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,14 @@ class UpdateInfo:
name: str
"""Update type name."""

@property
def _logger_details(self) -> Mapping[str, Any]:
"""Data to be included in string appended to default logging output."""
return {
"update_id": self.id,
"update_name": self.name,
}


class _Runtime(ABC):
@staticmethod
Expand Down Expand Up @@ -1211,6 +1219,10 @@ class LoggerAdapter(logging.LoggerAdapter):
use by others. Default is False.
log_during_replay: Boolean for whether logs should occur during replay.
Default is False.
Values added to ``extra`` are merged with the ``extra`` dictionary from a
logging call, with values from the logging call taking precedence. I.e. the
behavior is that of `merge_extra=True` in Python >= 3.13.
"""

def __init__(
Expand All @@ -1232,20 +1244,28 @@ def process(
or self.workflow_info_on_extra
or self.full_workflow_info_on_extra
):
extra: Dict[str, Any] = {}
msg_extra: Dict[str, Any] = {}
runtime = _Runtime.maybe_current()
if runtime:
workflow_details = runtime.logger_details
if self.workflow_info_on_message:
msg = f"{msg} ({runtime.logger_details})"
msg_extra.update(workflow_details)
if self.workflow_info_on_extra:
# Extra can be absent or None, this handles both
extra = kwargs.get("extra", None) or {}
extra["temporal_workflow"] = runtime.logger_details
kwargs["extra"] = extra
extra["temporal_workflow"] = workflow_details
if self.full_workflow_info_on_extra:
# Extra can be absent or None, this handles both
extra = kwargs.get("extra", None) or {}
extra["workflow_info"] = runtime.workflow_info()
kwargs["extra"] = extra
update_info = current_update_info()
if update_info:
update_details = update_info._logger_details
if self.workflow_info_on_message:
msg_extra.update(update_details)
if self.workflow_info_on_extra:
extra.setdefault("temporal_workflow", {}).update(update_details)

kwargs["extra"] = {**extra, **(kwargs.get("extra") or {})}
if msg_extra:
msg = f"{msg} ({msg_extra})"
return (msg, kwargs)

def isEnabledFor(self, level: int) -> bool:
Expand Down
27 changes: 24 additions & 3 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1908,6 +1908,10 @@ def my_signal(self, value: str) -> None:
self._last_signal = value
workflow.logger.info(f"Signal: {value}")

@workflow.update
def my_update(self, value: str) -> None:
workflow.logger.info(f"Update: {value}")

@workflow.query
def last_signal(self) -> str:
return self._last_signal
Expand Down Expand Up @@ -1955,14 +1959,22 @@ async def test_workflow_logging(client: Client, env: WorkflowEnvironment):
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
# Send a couple signals
# Send some signals and updates
await handle.signal(LoggingWorkflow.my_signal, "signal 1")
await handle.signal(LoggingWorkflow.my_signal, "signal 2")
await handle.execute_update(
LoggingWorkflow.my_update, "update 1", id="update-1"
)
await handle.execute_update(
LoggingWorkflow.my_update, "update 2", id="update-2"
)
assert "signal 2" == await handle.query(LoggingWorkflow.last_signal)

# Confirm two logs happened
# Confirm logs were produced
assert capturer.find_log("Signal: signal 1 ({'attempt':")
assert capturer.find_log("Signal: signal 2")
assert capturer.find_log("Update: update 1")
assert capturer.find_log("Update: update 2")
assert not capturer.find_log("Signal: signal 3")
# Also make sure it has some workflow info and correct funcName
record = capturer.find_log("Signal: signal 1")
Expand All @@ -1974,6 +1986,15 @@ async def test_workflow_logging(client: Client, env: WorkflowEnvironment):
)
# Since we enabled full info, make sure it's there
assert isinstance(record.__dict__["workflow_info"], workflow.Info)
# Check the log emitted by the update execution.
record = capturer.find_log("Update: update 1")
assert (
record
and record.__dict__["temporal_workflow"]["update_id"] == "update-1"
and record.__dict__["temporal_workflow"]["update_name"] == "my_update"
and "'update_id': 'update-1'" in record.message
and "'update_name': 'my_update'" in record.message
)

# Clear queue and start a new one with more signals
capturer.log_queue.queue.clear()
Expand All @@ -1983,7 +2004,7 @@ async def test_workflow_logging(client: Client, env: WorkflowEnvironment):
task_queue=worker.task_queue,
max_cached_workflows=0,
) as worker:
# Send a couple signals
# Send signals and updates
await handle.signal(LoggingWorkflow.my_signal, "signal 3")
await handle.signal(LoggingWorkflow.my_signal, "finish")
await handle.result()
Expand Down

0 comments on commit 723d234

Please sign in to comment.