Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Track DB scheduling delay per-request #2775

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion synapse/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,21 @@
"response_db_txn_count", labels=["method", "servlet", "tag"]
)

# seconds spent waiting for db txns, excluding scheduling time, when processing
# this request
response_db_txn_duration = metrics.register_distribution(
"response_db_txn_duration", labels=["method", "servlet", "tag"]
)

# seconds spent waiting for a db connection, when processing this request
#
# it's a counter rather than a distribution, because the count would always
# be the same as that of all the other distributions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a distribution here.

Honestly I think it makes things clearer to always include the count, as otherwise it loos a bit odd to do: rate(metric_one:total)/rate(another_metric:count)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a distribution here.

er, oops.

Honestly I think it makes things clearer to always include the count, as otherwise it loos a bit odd to do: rate(metric_one:total)/rate(another_metric:count)

It seems really silly to me to maintain six identical copies of the same counter here. That's a lot of pointless objects, hash lookups, and integer increments. IMHO what we ought to be doing is rate(synapse_http_server_response_db_sched_duration)/rate(synapse_http_server_response_count), which feels much more intuitive, but will take a bit of work to get there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems really silly to me to maintain six identical copies of the same counter here. That's a lot of pointless objects, hash lookups, and integer increments.

I would be surprised if they're not completely dwarfed by transaction overhead.

IMHO what we ought to be doing is rate(synapse_http_server_response_db_sched_duration)/rate(synapse_http_server_response_count), which feels much more intuitive, but will take a bit of work to get there.

Possibly, but having things consistent seems more intuitive than having a couple that don't fit.

#
# FIXME: use a floating-point rather than integer metric
response_db_sched_duration = metrics.register_counter(
"response_db_sched_duration", labels=["method", "servlet", "tag"]
)

_next_request_id = 0

Expand Down Expand Up @@ -341,7 +352,10 @@ def stop(self, clock, request):
context.db_txn_count, request.method, self.name, tag
)
response_db_txn_duration.inc_by(
context.db_txn_duration, request.method, self.name, tag
context.db_txn_duration_ms / 1000., request.method, self.name, tag
)
response_db_sched_duration.inc_by(
context.db_sched_duration_ms / 1000., request.method, self.name, tag
)


Expand Down
10 changes: 6 additions & 4 deletions synapse/http/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,24 @@ def finished_processing(self):
context = LoggingContext.current_context()
ru_utime, ru_stime = context.get_resource_usage()
db_txn_count = context.db_txn_count
db_txn_duration = context.db_txn_duration
db_txn_duration_ms = context.db_txn_duration_ms
db_sched_duration_ms = context.db_sched_duration_ms
except Exception:
ru_utime, ru_stime = (0, 0)
db_txn_count, db_txn_duration = (0, 0)
db_txn_count, db_txn_duration_ms = (0, 0)

self.site.access_logger.info(
"%s - %s - {%s}"
" Processed request: %dms (%dms, %dms) (%dms/%d)"
" Processed request: %dms (%dms, %dms) (%dms/%dms/%d)"
" %sB %s \"%s %s %s\" \"%s\"",
self.getClientIP(),
self.site.site_tag,
self.authenticated_entity,
int(time.time() * 1000) - self.start_time,
int(ru_utime * 1000),
int(ru_stime * 1000),
int(db_txn_duration * 1000),
db_sched_duration_ms,
db_txn_duration_ms,
int(db_txn_count),
self.sentLength,
self.code,
Expand Down
53 changes: 35 additions & 18 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ def loop():

def _new_transaction(self, conn, desc, after_callbacks, final_callbacks,
logging_context, func, *args, **kwargs):
start = time.time() * 1000
txn_id = self._TXN_ID

# We don't really need these to be unique, so lets stop it from
Expand Down Expand Up @@ -277,21 +276,24 @@ def _new_transaction(self, conn, desc, after_callbacks, final_callbacks,
logger.debug("[TXN FAIL] {%s} %s", name, e)
raise
finally:
end = time.time() * 1000
duration = end - start
transaction_logger.debug("[TXN END] {%s}", name)

if logging_context is not None:
logging_context.add_database_transaction(duration)
@defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs):
"""Starts a transaction on the database and runs a given function

transaction_logger.debug("[TXN END] {%s} %f", name, duration)
Arguments:
desc (str): description of the transaction, for logging and metrics
func (func): callback function, which will be called with a
database transaction (twisted.enterprise.adbapi.Transaction) as
its first argument, followed by `args` and `kwargs`.

self._current_txn_total_time += duration
self._txn_perf_counters.update(desc, start, end)
sql_txn_timer.inc_by(duration, desc)
args (list): positional args to pass to `func`
kwargs (dict): named args to pass to `func`

@defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs):
"""Wraps the .runInteraction() method on the underlying db_pool."""
Returns:
Deferred: The result of func
"""
current_context = LoggingContext.current_context()

start_time = time.time() * 1000
Expand All @@ -301,17 +303,32 @@ def runInteraction(self, desc, func, *args, **kwargs):

def inner_func(conn, *args, **kwargs):
with LoggingContext("runInteraction") as context:
sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
sched_delay_ms = time.time() * 1000 - start_time
sql_scheduling_timer.inc_by(sched_delay_ms)

if self.database_engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
conn.reconnect()

current_context.copy_to(context)
return self._new_transaction(
conn, desc, after_callbacks, final_callbacks, current_context,
func, *args, **kwargs
)
txn_start_time_ms = time.time() * 1000
try:
return self._new_transaction(
conn, desc, after_callbacks, final_callbacks, current_context,
func, *args, **kwargs
)
finally:
txn_end_time_ms = time.time() * 1000
txn_duration = txn_end_time_ms - txn_start_time_ms

current_context.add_database_transaction(
txn_duration, sched_delay_ms,
)
self._current_txn_total_time += txn_duration
self._txn_perf_counters.update(
desc, txn_start_time_ms, txn_end_time_ms,
)
sql_txn_timer.inc_by(txn_duration, desc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this mean that any transactions created by calling runWithConnection directly won't be measured?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, yes, but they aren't currently measured as part of db_txn_duration.

Looks like the only thing that uses runWithConnection other than the background updates is the event_fetch code, which is not specific to any http requests. I'd like for time spent waiting for events to be fetched to be tracked somehow, but tracking transaction duration here wouldn't help.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They may not be picked up by db_txn_duration, but they are picked up by sql_txn_timer. Losing metrics for fetching event txn duration doesn't sound ideal.


try:
with PreserveLoggingContext():
Expand All @@ -329,7 +346,7 @@ def inner_func(conn, *args, **kwargs):

@defer.inlineCallbacks
def runWithConnection(self, func, *args, **kwargs):
"""Wraps the .runInteraction() method on the underlying db_pool."""
"""Wraps the .runWithConnection() method on the underlying db_pool."""
current_context = LoggingContext.current_context()

start_time = time.time() * 1000
Expand Down
41 changes: 30 additions & 11 deletions synapse/util/logcontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,17 @@ def get_thread_resource_usage():
class LoggingContext(object):
"""Additional context for log formatting. Contexts are scoped within a
"with" block.

Args:
name (str): Name for the context for debugging.
"""

__slots__ = [
"previous_context", "name", "usage_start", "usage_end", "main_thread",
"__dict__", "tag", "alive",
"previous_context", "name", "ru_stime", "ru_utime",
"db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms",
"usage_start", "usage_end",
"main_thread", "alive",
"request", "tag",
]

thread_local = threading.local()
Expand All @@ -80,7 +84,7 @@ def start(self):
def stop(self):
pass

def add_database_transaction(self, duration_ms):
def add_database_transaction(self, duration_ms, sched_ms):
pass

def __nonzero__(self):
Expand All @@ -94,9 +98,17 @@ def __init__(self, name=None):
self.ru_stime = 0.
self.ru_utime = 0.
self.db_txn_count = 0
self.db_txn_duration = 0.

# ms spent waiting for db txns, excluding scheduling time
self.db_txn_duration_ms = 0

# ms spent waiting for db txns to be scheduled
self.db_sched_duration_ms = 0

self.usage_start = None
self.usage_end = None
self.main_thread = threading.current_thread()
self.request = None
self.tag = ""
self.alive = True

Expand All @@ -105,7 +117,11 @@ def __str__(self):

@classmethod
def current_context(cls):
"""Get the current logging context from thread local storage"""
"""Get the current logging context from thread local storage

Returns:
LoggingContext: the current logging context
"""
return getattr(cls.thread_local, "current_context", cls.sentinel)

@classmethod
Expand Down Expand Up @@ -155,11 +171,13 @@ def __exit__(self, type, value, traceback):
self.alive = False

def copy_to(self, record):
"""Copy fields from this context to the record"""
for key, value in self.__dict__.items():
setattr(record, key, value)
"""Copy logging fields from this context to a log record or
another LoggingContext
"""

record.ru_utime, record.ru_stime = self.get_resource_usage()
# 'request' is the only field we currently use in the logger, so that's
# all we need to copy
record.request = self.request

def start(self):
if threading.current_thread() is not self.main_thread:
Expand Down Expand Up @@ -192,9 +210,10 @@ def get_resource_usage(self):

return ru_utime, ru_stime

def add_database_transaction(self, duration_ms):
def add_database_transaction(self, duration_ms, sched_ms):
self.db_txn_count += 1
self.db_txn_duration += duration_ms / 1000.
self.db_txn_duration_ms += duration_ms
self.db_sched_duration_ms += sched_ms


class LoggingContextFilter(logging.Filter):
Expand Down
25 changes: 22 additions & 3 deletions synapse/util/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,21 @@
"block_db_txn_count", labels=["block_name"]
)

# seconds spent waiting for db txns, excluding scheduling time, in this block
block_db_txn_duration = metrics.register_distribution(
"block_db_txn_duration", labels=["block_name"]
)

# seconds spent waiting for a db connection, in this block
#
# it's a counter rather than a distribution, because the count would always
# be the same as that of all the other distributions.
#
# FIXME: use a floating-point rather than integer metric
block_db_sched_duration = metrics.register_counter(
"block_db_sched_duration", labels=["block_name"]
)


def measure_func(name):
def wrapper(func):
Expand All @@ -64,7 +75,9 @@ def measured_func(self, *args, **kwargs):
class Measure(object):
__slots__ = [
"clock", "name", "start_context", "start", "new_context", "ru_utime",
"ru_stime", "db_txn_count", "db_txn_duration", "created_context"
"ru_stime",
"db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms",
"created_context",
]

def __init__(self, clock, name):
Expand All @@ -84,7 +97,8 @@ def __enter__(self):

self.ru_utime, self.ru_stime = self.start_context.get_resource_usage()
self.db_txn_count = self.start_context.db_txn_count
self.db_txn_duration = self.start_context.db_txn_duration
self.db_txn_duration_ms = self.start_context.db_txn_duration_ms
self.db_sched_duration_ms = self.start_context.db_sched_duration_ms

def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(exc_type, Exception) or not self.start_context:
Expand Down Expand Up @@ -114,7 +128,12 @@ def __exit__(self, exc_type, exc_val, exc_tb):
context.db_txn_count - self.db_txn_count, self.name
)
block_db_txn_duration.inc_by(
context.db_txn_duration - self.db_txn_duration, self.name
(context.db_txn_duration_ms - self.db_txn_duration_ms) / 1000.,
self.name
)
block_db_sched_duration.inc_by(
(context.db_sched_duration_ms - self.db_sched_duration_ms) / 1000.,
self.name
)

if self.created_context:
Expand Down
14 changes: 7 additions & 7 deletions tests/crypto/test_keyring.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def setUp(self):

def check_context(self, _, expected):
self.assertEquals(
getattr(LoggingContext.current_context(), "test_key", None),
getattr(LoggingContext.current_context(), "request", None),
expected
)

Expand All @@ -82,7 +82,7 @@ def test_wait_for_previous_lookups(self):
lookup_2_deferred = defer.Deferred()

with LoggingContext("one") as context_one:
context_one.test_key = "one"
context_one.request = "one"

wait_1_deferred = kr.wait_for_previous_lookups(
["server1"],
Expand All @@ -96,7 +96,7 @@ def test_wait_for_previous_lookups(self):
wait_1_deferred.addBoth(self.check_context, "one")

with LoggingContext("two") as context_two:
context_two.test_key = "two"
context_two.request = "two"

# set off another wait. It should block because the first lookup
# hasn't yet completed.
Expand Down Expand Up @@ -137,15 +137,15 @@ def test_verify_json_objects_for_server_awaits_previous_requests(self):
@defer.inlineCallbacks
def get_perspectives(**kwargs):
self.assertEquals(
LoggingContext.current_context().test_key, "11",
LoggingContext.current_context().request, "11",
)
with logcontext.PreserveLoggingContext():
yield persp_deferred
defer.returnValue(persp_resp)
self.http_client.post_json.side_effect = get_perspectives

with LoggingContext("11") as context_11:
context_11.test_key = "11"
context_11.request = "11"

# start off a first set of lookups
res_deferreds = kr.verify_json_objects_for_server(
Expand Down Expand Up @@ -173,7 +173,7 @@ def get_perspectives(**kwargs):
self.assertIs(LoggingContext.current_context(), context_11)

context_12 = LoggingContext("12")
context_12.test_key = "12"
context_12.request = "12"
with logcontext.PreserveLoggingContext(context_12):
# a second request for a server with outstanding requests
# should block rather than start a second call
Expand Down Expand Up @@ -211,7 +211,7 @@ def test_verify_json_for_server(self):
sentinel_context = LoggingContext.current_context()

with LoggingContext("one") as context_one:
context_one.test_key = "one"
context_one.request = "one"

defer = kr.verify_json_for_server("server9", {})
try:
Expand Down
Loading