From d25dd7324e97316a9ebbae4bc24432c47eb8715c Mon Sep 17 00:00:00 2001 From: "m.kindritskiy" Date: Mon, 20 Feb 2023 18:43:54 +0200 Subject: [PATCH 1/2] track each subquery field proc separately --- hiku/telemetry/prometheus.py | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/hiku/telemetry/prometheus.py b/hiku/telemetry/prometheus.py index c080e5fe..9831e466 100644 --- a/hiku/telemetry/prometheus.py +++ b/hiku/telemetry/prometheus.py @@ -38,12 +38,6 @@ def wrapper(*args): return wrapper -def _subquery_field_names(func): - def wrapper(fields, *args): - return func([f.name for _, f in fields], fields, *args) - return wrapper - - class GraphMetricsBase(GraphTransformer): root_name = 'Root' @@ -102,7 +96,6 @@ def _wrap_link(self, node_name, link_name, func): def _wrap_subquery(self, node_name, subquery): observe = self._observe_fields(node_name) wrapper = self.subquery_wrapper(observe, subquery) - wrapper = _subquery_field_names(wrapper) wrapper.__subquery__ = lambda: wrapper return wrapper @@ -153,15 +146,23 @@ def visit_link(self, obj): class _SubqueryMixin: def subquery_wrapper(self, observe, subquery): - def wrapper(field_names, *args): + def wrap_proc(start_time, field_name, proc): + def _proc_wrapper(*args): + result = proc(*args) + observe(start_time, [field_name]) + return result + + return _proc_wrapper + + def wrapper(fields, *args): start_time = time.perf_counter() - result_proc = subquery(*args) + wrapped_fields = [] + for gf, qf in fields: + gf.func.proc = wrap_proc(start_time, gf.name, gf.func.proc) + wrapped_fields.append((gf, qf)) + + return subquery(wrapped_fields, *args) - def proc_wrapper(): - result = result_proc() - observe(start_time, field_names) - return result - return proc_wrapper return wrapper @@ -200,4 +201,4 @@ async def wrapper(link_name, *args): result = await func(*args) observe(start_time, [link_name]) return result - return wrapper + return wrapper \ No newline at end of file From cd7fddde69030ab31b2d1e81f622720ef65388e0 Mon Sep 17 00:00:00 2001 From: "m.kindritskiy" Date: Wed, 29 Mar 2023 20:17:25 +0300 Subject: [PATCH 2/2] experimental separate metrics for @define functions the idea is that we track field with low level graph time + separate metric for just @define func so we can see how much time spent both low_level + @define and just @define --- hiku/telemetry/prometheus.py | 66 +++++++++++- tests/test_telemetry_prometheus.py | 168 ++++++++++++++++++++++++++++- 2 files changed, 229 insertions(+), 5 deletions(-) diff --git a/hiku/telemetry/prometheus.py b/hiku/telemetry/prometheus.py index 9831e466..5b6001c5 100644 --- a/hiku/telemetry/prometheus.py +++ b/hiku/telemetry/prometheus.py @@ -96,6 +96,7 @@ def _wrap_link(self, node_name, link_name, func): def _wrap_subquery(self, node_name, subquery): observe = self._observe_fields(node_name) wrapper = self.subquery_wrapper(observe, subquery) + wrapper = _subquery_field_names(wrapper) wrapper.__subquery__ = lambda: wrapper return wrapper @@ -143,13 +144,37 @@ def visit_link(self, obj): return obj +def _subquery_field_names(func): + def wrapper(fields, *args): + return func([f.name for _, f in fields], fields, *args) + return wrapper + + class _SubqueryMixin: def subquery_wrapper(self, observe, subquery): - def wrap_proc(start_time, field_name, proc): + def wrapper(field_names, *args): + start_time = time.perf_counter() + result_proc = subquery(*args) + + def proc_wrapper(): + result = result_proc() + observe(start_time, field_names) + return result + + return proc_wrapper + + return wrapper + + +class _SubqueryMixinNew: + + def subquery_wrapper(self, observe, subquery): + def wrap_proc(field_name, proc): def _proc_wrapper(*args): + proc_start_time = time.perf_counter() result = proc(*args) - observe(start_time, [field_name]) + observe(proc_start_time, [f'{field_name}__define']) return result return _proc_wrapper @@ -157,11 +182,20 @@ def _proc_wrapper(*args): def wrapper(fields, *args): start_time = time.perf_counter() wrapped_fields = [] + field_names = [] for gf, qf in fields: - gf.func.proc = wrap_proc(start_time, gf.name, gf.func.proc) + gf.func.proc = wrap_proc(gf.name, gf.func.proc) wrapped_fields.append((gf, qf)) + field_names.append(gf.name) + + result_proc = subquery(wrapped_fields, *args) + + def result_proc_wrapper(): + result = result_proc() + return result - return subquery(wrapped_fields, *args) + observe(start_time, field_names) + return result_proc_wrapper return wrapper @@ -185,6 +219,30 @@ def wrapper(link_name, *args): return wrapper +class GraphMetricsNew(_SubqueryMixinNew, GraphMetricsBase): + def _wrap_subquery(self, node_name, subquery): + observe = self._observe_fields(node_name) + wrapper = self.subquery_wrapper(observe, subquery) + wrapper.__subquery__ = lambda: wrapper + return wrapper + + def field_wrapper(self, observe, func): + def wrapper(field_names, *args): + start_time = time.perf_counter() + result = func(*args) + observe(start_time, field_names) + return result + return wrapper + + def link_wrapper(self, observe, func): + def wrapper(link_name, *args): + start_time = time.perf_counter() + result = func(*args) + observe(start_time, [link_name]) + return result + return wrapper + + class AsyncGraphMetrics(_SubqueryMixin, GraphMetricsBase): def field_wrapper(self, observe, func): diff --git a/tests/test_telemetry_prometheus.py b/tests/test_telemetry_prometheus.py index f455f233..7199cc34 100644 --- a/tests/test_telemetry_prometheus.py +++ b/tests/test_telemetry_prometheus.py @@ -1,16 +1,23 @@ +import time + import faker import pytest from prometheus_client import REGISTRY from hiku import query as q +from hiku.expr.core import S +from hiku.expr.core import define from hiku.graph import Graph, Node, Field, Link, Root, apply +from hiku.telemetry.prometheus import GraphMetricsNew +from hiku.types import Any from hiku.types import TypeRef from hiku.engine import Engine, pass_context from hiku.sources.graph import SubGraph from hiku.executors.sync import SyncExecutor from hiku.executors.asyncio import AsyncIOExecutor from hiku.telemetry.prometheus import GraphMetrics, AsyncGraphMetrics +from hiku.utils import listify from tests.base import check_result @@ -26,13 +33,25 @@ def graph_name_fixture(): @pytest.fixture(name='sample_count') def sample_count_fixture(graph_name): def sample_count(node, field): - return REGISTRY.get_sample_value( + REGISTRY.get_sample_value( 'graph_field_time_count', dict(graph=graph_name, node=node, field=field), ) return sample_count +@pytest.fixture(name='sample_sum') +def sample_sum_fixture(graph_name): + def sample_count(node, field): + value = REGISTRY.get_sample_value( + 'graph_field_time_sum', + dict(graph=graph_name, node=node, field=field), + ) + print('{}.{}, value: {}'.format(node, field, value)) + return value + return sample_count + + def test_simple_sync(graph_name, sample_count): def x_fields(fields, ids): @@ -176,3 +195,150 @@ def root_fields2(ctx, fields): assert sample_count('Root', 'a') == 1.0 assert sample_count('Root', 'b') == 1.0 + + +@pytest.mark.parametrize('tracker', [ + # old correctly tracks time for y1 if only low level is slow + # but does not track time for x1, x2 separately + # GraphMetrics, + # new correctly tracks time for x1, x2 separately but does not see y1 low level slowness, because y1 proc is just a simple return + # of value from index + GraphMetricsNew +]) +def test_track_time(tracker, graph_name, sample_sum): + + x1 = 0.12 # 12 + 22 + 52 = 86, because all fields are from LL + # 12 + 32 + 52 = 96 + x2 = 0.22 # 34 + x3 = 0.32 # 66 # HL only field + y1 = 0.52 # 118, 1.18 + y2 = 0.62 # 180 + + @listify + def x_fields(fields, ids): + """LL""" + def get_field(f): + if f == 'x1': + return x1 + elif f == 'x2': + return x2 + + for id_ in ids: + yield [get_field(f.name) for f in fields] + + @listify + def y_fields(fields, ids): + """LL""" + def get_field(f): + if f == 'y1': + time.sleep(y1) + return y1 + elif f == 'y2': + return y2 + + for id_ in ids: + yield [get_field(f.name) for f in fields] + + def root_fields(fields): + return [1 for _ in fields] + + def x_link(): + return 2 + + ll_graph = Graph([ + Node('X', [ + Field('x1', None, x_fields), + Field('x2', None, x_fields), + ]), + Node('Y', [ + Field('y1', None, y_fields), + Field('y2', None, y_fields), + ]), + ]) + + x_sg = SubGraph(ll_graph, 'X') + y_sg = SubGraph(ll_graph, 'Y') + + @define(Any) + def x1_field(val): + """HL""" + time.sleep(x1) + return val + + @define(Any) + def x2_field(val): + """HL""" + # time.sleep(x2) + return val + + @listify + def x3_field(fields, ids): + """HL""" + def get_field(f): + if f == 'x3_3': + # time.sleep(x3) + return x3 + + for id_ in ids: + yield [get_field(f.name) for f in fields] + + @define(Any) + def y2_field(val): + """HL""" + # time.sleep(y2) + return val + + hl_graph = Graph([ + Node('X_h', [ + Field('x1_1', None, x_sg.c(x1_field(S.this.x1))), + Field('x2_2', None, x_sg.c(x2_field(S.this.x2))), + # in old tracker x3_3 is the only field that is tracked correctly + # because it not uses subgraph + Field('x3_3', None, x3_field), + Field('y1_1', None, y_sg.c(S.this.y1)), + Field('y2_2', None, y_sg.c(y2_field(S.this.y2))), + ]), + Root([ + Field('a', None, root_fields), + Link('x', TypeRef['X_h'], x_link, requires=None), + ]), + ]) + + hl_graph = apply(hl_graph, [tracker(graph_name)]) + + result = Engine(SyncExecutor()).execute(hl_graph, q.Node([ + # q.Field('a'), + q.Link('x', q.Node([ + q.Field('x1_1'), + q.Field('x2_2'), + q.Field('x3_3'), + q.Field('y1_1'), + q.Field('y2_2'), + ])), + ])) + # check_result(result, { + # 'a': 1, + # 'x': { + # 'x1_1': x1, + # 'x2_2': x2, + # 'x3_3': x3, + # 'y1_1': y1, + # }, + # }) + + print('') + print('Testing with', tracker.__name__) + + got_x = sum([ + sample_sum('X_h', 'x1_1'), + sample_sum('X_h', 'x2_2'), + sample_sum('X_h', 'x3_3'), + sample_sum('X_h', 'y1_1'), + sample_sum('X_h', 'y2_2') + ]) + sample_sum('X_h', 'x1_1__define'), + sample_sum('X_h', 'x2_2__define'), + sample_sum('X_h', 'y1_1__define'), + sample_sum('X_h', 'y2_2__define') + print('x total exp', x1 + x2 + x3 + y1 + y2) + print('x total got', got_x)