Skip to content

Commit

Permalink
Add option for timestamps from output of Node.get_logs (#4932)
Browse files Browse the repository at this point in the history
  • Loading branch information
charlesbluca authored Jan 17, 2022
1 parent 0a064e7 commit 922d987
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 14 deletions.
22 changes: 17 additions & 5 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import operator
import os
from collections import OrderedDict, defaultdict
from datetime import datetime
from numbers import Number

import numpy as np
Expand Down Expand Up @@ -3284,13 +3285,24 @@ def update(self):


class SchedulerLogs:
def __init__(self, scheduler):
logs = Log(
"\n".join(line for level, line in scheduler.get_logs())
)._repr_html_()
def __init__(self, scheduler, start=None):
logs = scheduler.get_logs(start=start, timestamps=True)

if not logs:
logs_html = (
'<p style="font-family: monospace; margin: 0;">No logs to report</p>'
)
else:
logs_html = Log(
"\n".join(
"%s - %s"
% (datetime.fromtimestamp(time).strftime("%H:%M:%S.%f"), line)
for time, level, line in logs
)
)._repr_html_()

self.root = Div(
text=logs,
text=logs_html,
style={
"width": "100%",
"height": "100%",
Expand Down
35 changes: 28 additions & 7 deletions distributed/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,35 @@ def _setup_logging(self, logger):
logger.addHandler(self._deque_handler)
weakref.finalize(self, logger.removeHandler, self._deque_handler)

def get_logs(self, comm=None, n=None):
def get_logs(self, comm=None, start=None, n=None, timestamps=False):
"""
Fetch log entries for this node
Parameters
----------
start : float, optional
A time (in seconds) to begin filtering log entries from
n : int, optional
Maximum number of log entries to return from filtered results
timestamps : bool, default False
Do we want log entries to include the time they were generated?
Returns
-------
List of tuples containing the log level, message, and (optional) timestamp for each filtered entry
"""
deque_handler = self._deque_handler
if n is None:
L = list(deque_handler.deque)
else:
L = deque_handler.deque
L = [L[-i] for i in range(min(n, len(L)))]
return [(msg.levelname, deque_handler.format(msg)) for msg in L]
if start is None:
start = -1
L = []
for count, msg in enumerate(deque_handler.deque):
if n and count >= n or msg.created < start:
break
if timestamps:
L.append((msg.created, msg.levelname, deque_handler.format(msg)))
else:
L.append((msg.levelname, deque_handler.format(msg)))
return L

def start_http_server(
self, routes, dashboard_address, default_port=0, ssl_options=None
Expand Down
2 changes: 1 addition & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7635,7 +7635,7 @@ def profile_to_figure(state):
# Scheduler logs
from distributed.dashboard.components.scheduler import SchedulerLogs

logs = SchedulerLogs(self)
logs = SchedulerLogs(self, start=start)

from bokeh.models import Div, Panel, Tabs

Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6434,7 +6434,7 @@ async def f(stacklevel, mode=None):
assert "Dask Performance Report" in data
assert "x = da.random" in data
assert "Threads: 4" in data
assert "distributed.scheduler - INFO - Clear task state" in data
assert "No logs to report" in data
assert dask.__version__ in data

# stacklevel=2 captures code two frames back -- which in this case
Expand Down

0 comments on commit 922d987

Please sign in to comment.