Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use start_as_current_span to maintain tracing operation chain #5391

Merged
merged 8 commits into from
Nov 15, 2022
119 changes: 62 additions & 57 deletions jina/serve/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import contextlib
import copy
import inspect
import multiprocessing
import os
import threading
import copy
import warnings
from types import SimpleNamespace
from typing import TYPE_CHECKING, Any, Dict, Optional, Type, Union
Expand All @@ -14,7 +14,10 @@
from jina.importer import ImportExtensions
from jina.jaml import JAML, JAMLCompatible, env_var_regex, internal_var_regex
from jina.logging.logger import JinaLogger
from jina.serve.executors.decorators import avoid_concurrent_lock_cls, _init_requests_by_class
from jina.serve.executors.decorators import (
_init_requests_by_class,
avoid_concurrent_lock_cls,
)
from jina.serve.executors.metas import get_executor_taboo
from jina.serve.helper import store_init_kwargs, wrap_func
from jina.serve.instrumentation import MetricsTimer
Expand Down Expand Up @@ -57,7 +60,7 @@ def register_class(cls):
arg_spec = inspect.getfullargspec(cls.__init__)

if not arg_spec.varkw and not __args_executor_init__.issubset(
arg_spec.args
arg_spec.args
):
raise TypeError(
f'{cls.__init__} does not follow the full signature of `Executor.__init__`, '
Expand Down Expand Up @@ -117,12 +120,12 @@ def __init__(awesomeness=5):
"""

def __init__(
self,
metas: Optional[Dict] = None,
requests: Optional[Dict] = None,
runtime_args: Optional[Dict] = None,
workspace: Optional[str] = None,
**kwargs,
self,
metas: Optional[Dict] = None,
requests: Optional[Dict] = None,
runtime_args: Optional[Dict] = None,
workspace: Optional[str] = None,
**kwargs,
):
"""`metas` and `requests` are always auto-filled with values from YAML config.

Expand Down Expand Up @@ -160,12 +163,12 @@ def _add_runtime_args(self, _runtime_args: Optional[Dict]):

def _init_monitoring(self):
if (
hasattr(self.runtime_args, 'metrics_registry')
and self.runtime_args.metrics_registry
hasattr(self.runtime_args, 'metrics_registry')
and self.runtime_args.metrics_registry
):
with ImportExtensions(
required=True,
help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina',
required=True,
help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina',
):
from prometheus_client import Summary

Expand Down Expand Up @@ -296,7 +299,7 @@ def _add_metas(self, _metas: Optional[Dict]):
if not hasattr(target, k):
if isinstance(v, str):
if not (
env_var_regex.findall(v) or internal_var_regex.findall(v)
env_var_regex.findall(v) or internal_var_regex.findall(v)
):
setattr(target, k, v)
else:
Expand Down Expand Up @@ -348,10 +351,10 @@ async def __acall__(self, req_endpoint: str, **kwargs):
return await self.__acall_endpoint__(__default_endpoint__, **kwargs)

async def __acall_endpoint__(
self, req_endpoint, tracing_context: Optional['Context'], **kwargs
self, req_endpoint, tracing_context: Optional['Context'], **kwargs
):
async def exec_func(
summary, histogram, histogram_metric_labels, tracing_context
summary, histogram, histogram_metric_labels, tracing_context
):
with MetricsTimer(summary, histogram, histogram_metric_labels):
if iscoroutinefunction(func):
Expand Down Expand Up @@ -379,7 +382,9 @@ async def exec_func(
}

if self.tracer:
with self.tracer.start_span(req_endpoint, context=tracing_context) as _:
with self.tracer.start_as_current_span(
req_endpoint, context=tracing_context
):
from opentelemetry.propagate import extract
from opentelemetry.trace.propagation.tracecontext import (
TraceContextTextMapPropagator,
Expand Down Expand Up @@ -409,10 +414,10 @@ def workspace(self) -> Optional[str]:
:return: returns the workspace of the current shard of this Executor.
"""
workspace = (
getattr(self.runtime_args, 'workspace', None)
or getattr(self.metas, 'workspace')
or self._init_workspace
or __cache_path__
getattr(self.runtime_args, 'workspace', None)
or getattr(self.metas, 'workspace')
or self._init_workspace
or __cache_path__
)
if workspace:
complete_workspace = os.path.join(workspace, self.metas.name)
Expand All @@ -435,13 +440,13 @@ def __exit__(self, exc_type, exc_val, exc_tb):

@classmethod
def from_hub(
cls: Type[T],
uri: str,
context: Optional[Dict[str, Any]] = None,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
cls: Type[T],
uri: str,
context: Optional[Dict[str, Any]] = None,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
) -> T:
"""Construct an Executor from Hub.

Expand Down Expand Up @@ -493,12 +498,12 @@ def from_hub(

@classmethod
def serve(
cls,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
stop_event: Optional[Union[threading.Event, multiprocessing.Event]] = None,
**kwargs,
cls,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
stop_event: Optional[Union[threading.Event, multiprocessing.Event]] = None,
**kwargs,
):
"""Serve this Executor in a temporary Flow. Useful in testing an Executor in remote settings.

Expand Down Expand Up @@ -531,16 +536,16 @@ class StandaloneExecutorType(BetterEnum):

@staticmethod
def to_kubernetes_yaml(
uses: str,
output_base_path: str,
k8s_namespace: Optional[str] = None,
executor_type: Optional[
StandaloneExecutorType
] = StandaloneExecutorType.EXTERNAL,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
uses: str,
output_base_path: str,
k8s_namespace: Optional[str] = None,
executor_type: Optional[
StandaloneExecutorType
] = StandaloneExecutorType.EXTERNAL,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
):
"""
Converts the Executor into a set of yaml deployments to deploy in Kubernetes.
Expand Down Expand Up @@ -568,23 +573,23 @@ def to_kubernetes_yaml(
output_base_path=output_base_path,
k8s_namespace=k8s_namespace,
include_gateway=executor_type
== BaseExecutor.StandaloneExecutorType.EXTERNAL,
== BaseExecutor.StandaloneExecutorType.EXTERNAL,
)

to_k8s_yaml = to_kubernetes_yaml

@staticmethod
def to_docker_compose_yaml(
uses: str,
output_path: Optional[str] = None,
network_name: Optional[str] = None,
executor_type: Optional[
StandaloneExecutorType
] = StandaloneExecutorType.EXTERNAL,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
uses: str,
output_path: Optional[str] = None,
network_name: Optional[str] = None,
executor_type: Optional[
StandaloneExecutorType
] = StandaloneExecutorType.EXTERNAL,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
):
"""
Converts the Executor into a yaml file to run with `docker-compose up`
Expand All @@ -609,11 +614,11 @@ def to_docker_compose_yaml(
output_path=output_path,
network_name=network_name,
include_gateway=executor_type
== BaseExecutor.StandaloneExecutorType.EXTERNAL,
== BaseExecutor.StandaloneExecutorType.EXTERNAL,
)

def monitor(
self, name: Optional[str] = None, documentation: Optional[str] = None
self, name: Optional[str] = None, documentation: Optional[str] = None
) -> Optional[MetricsTimer]:
"""
Get a given prometheus metric, if it does not exist yet, it will create it and store it in a buffer.
Expand Down
58 changes: 40 additions & 18 deletions tests/integration/instrumentation/test_flow_instrumentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ def test_flow_metrics(
)
assert len(receiving_request_seconds_metrics) > 0
assert receiving_request_seconds_exported_jobs == {
'gateway/rep-0',
'executor0/head',
'executor0/shard-0/rep-0',
'executor0/shard-1/rep-0',
'gateway/rep-0',
'executor0/head',
'executor0/shard-0/rep-0',
'executor0/shard-1/rep-0',
}

(
Expand Down Expand Up @@ -224,10 +224,10 @@ def test_flow_metrics(
)
assert len(sent_response_bytes_metrics) > 0
assert sent_response_bytes_exported_jobs == {
'gateway/rep-0',
'executor0/shard-0/rep-0',
'executor0/shard-1/rep-0',
'executor0/head'
'gateway/rep-0',
'executor0/shard-0/rep-0',
'executor0/shard-1/rep-0',
'executor0/head',
}

(
Expand All @@ -237,18 +237,21 @@ def test_flow_metrics(
prometheus_client, 'jina_number_of_pending_requests'
)
assert len(number_of_pending_requests_metrics) > 0
assert number_of_pending_requests_exported_jobs == {'gateway/rep-0', 'executor0/head'}
assert number_of_pending_requests_exported_jobs == {
'gateway/rep-0',
'executor0/head',
}

(
failed_requests_metrics,
failed_requests_exported_jobs,
) = get_metrics_and_exported_jobs_by_name(prometheus_client, 'jina_failed_requests')
assert len(failed_requests_metrics) > 0
assert failed_requests_exported_jobs == {
'gateway/rep-0',
'executor0/head',
'executor0/shard-0/rep-0',
'executor0/shard-1/rep-0',
'gateway/rep-0',
'executor0/head',
'executor0/shard-0/rep-0',
'executor0/shard-1/rep-0',
}

(
Expand All @@ -258,7 +261,12 @@ def test_flow_metrics(
prometheus_client, 'jina_successful_requests'
)
assert len(successful_requests_metrics) > 0
assert successful_requests_exported_jobs == {'gateway/rep-0', 'executor0/shard-0/rep-0', 'executor0/shard-1/rep-0', 'executor0/head'}
assert successful_requests_exported_jobs == {
'gateway/rep-0',
'executor0/shard-0/rep-0',
'executor0/shard-1/rep-0',
'executor0/head',
}

(
received_request_bytes_metrics,
Expand All @@ -267,7 +275,12 @@ def test_flow_metrics(
prometheus_client, 'jina_received_request_bytes'
)
assert len(received_request_bytes_metrics) > 0
assert received_request_bytes_exported_jobs == {'gateway/rep-0', 'executor0/shard-0/rep-0', 'executor0/shard-1/rep-0', 'executor0/head'}
assert received_request_bytes_exported_jobs == {
'gateway/rep-0',
'executor0/shard-0/rep-0',
'executor0/shard-1/rep-0',
'executor0/head',
}

(
process_requests_seconds_metrics,
Expand All @@ -276,7 +289,10 @@ def test_flow_metrics(
prometheus_client, 'jina_process_request_seconds'
)
assert len(process_requests_seconds_metrics) > 0
assert process_requests_seconds_exported_jobs == {'executor0/shard-0/rep-0', 'executor0/shard-1/rep-0'}
assert process_requests_seconds_exported_jobs == {
'executor0/shard-0/rep-0',
'executor0/shard-1/rep-0',
}

# filter by attributes/labels
(
Expand All @@ -288,7 +304,10 @@ def test_flow_metrics(
{'executor_endpoint': '/search'},
)
assert len(process_requests_seconds_search_endpoint) > 0
assert process_requests_seconds_search_endpoint_exported_jobs == {'executor0/shard-0/rep-0', 'executor0/shard-1/rep-0'}
assert process_requests_seconds_search_endpoint_exported_jobs == {
'executor0/shard-0/rep-0',
'executor0/shard-1/rep-0',
}

(
process_requests_seconds_executor,
Expand All @@ -299,7 +318,10 @@ def test_flow_metrics(
{'executor': 'ExecutorFailureWithTracing'},
)
assert len(process_requests_seconds_executor) > 0
assert process_requests_seconds_executor_exported_jobs == {'executor0/shard-0/rep-0', 'executor0/shard-1/rep-0'}
assert process_requests_seconds_executor_exported_jobs == {
'executor0/shard-0/rep-0',
'executor0/shard-1/rep-0',
}

(
process_requests_seconds_runtime,
Expand Down
43 changes: 43 additions & 0 deletions tests/integration/instrumentation/test_worker_instrumentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import time

from jina import Flow
from tests.integration.instrumentation import ExecutorTestWithTracing, get_traces


def test_span_order(jaeger_port, otlp_collector, otlp_receiver_port):
f = Flow(
tracing=True,
traces_exporter_host='localhost',
traces_exporter_port=otlp_receiver_port,
).add(uses=ExecutorTestWithTracing)

with f:
from jina import DocumentArray

f.post(f'/search', DocumentArray.empty(), continue_on_error=True)
# give some time for the tracing and metrics exporters to finish exporting.
# the client is slow to export the data
time.sleep(8)

traces = get_traces(jaeger_port, 'executor0/rep-0')
process_single_data_span_ids = set()
search_request_parent_span_ids = set()
for trace in traces:
for span in trace['spans']:
if (
span['operationName']
== '/jina.JinaSingleDataRequestRPC/process_single_data'
):
process_single_data_span_ids.add(span['spanID'])

if span['operationName'] == '/search':
references = span.get('references', [])
for ref in references:
search_request_parent_span_ids.add(ref.get('spanID', ''))

assert any(
[
search_span in process_single_data_span_ids
for search_span in search_request_parent_span_ids
]
)