Skip to content

Commit

Permalink
Merge branch 'master' into fix-serve-5386
Browse files Browse the repository at this point in the history
  • Loading branch information
girishc13 authored Nov 15, 2022
2 parents 26823a3 + f738d34 commit 66fd738
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 32 deletions.
6 changes: 4 additions & 2 deletions jina/serve/instrumentation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from timeit import default_timer
from typing import TYPE_CHECKING, Dict, Optional, Sequence

if TYPE_CHECKING: # pragma: no cover
if TYPE_CHECKING: # pragma: no cover
from grpc.aio._interceptor import ClientInterceptor, ServerInterceptor
from opentelemetry.instrumentation.grpc._client import (
OpenTelemetryClientInterceptor,
Expand Down Expand Up @@ -147,7 +147,9 @@ def __init__(
self._histogram_metric_labels = histogram_metric_labels

def _new_timer(self):
return self.__class__(self._summary_metric, self._histogram, self._histogram_metric_labels)
return self.__class__(
self._summary_metric, self._histogram, self._histogram_metric_labels
)

def __enter__(self):
self._start = default_timer()
Expand Down
2 changes: 1 addition & 1 deletion jina/serve/runtimes/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from jina.serve.runtimes.monitoring import MonitoringMixin
from jina.types.request.data import DataRequest

if TYPE_CHECKING: # pragma: no cover
if TYPE_CHECKING: # pragma: no cover
import multiprocessing
import threading

Expand Down
9 changes: 6 additions & 3 deletions jina/serve/runtimes/gateway/grpc/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(
self.grpc_server_options = grpc_server_options
self.ssl_keyfile = ssl_keyfile
self.ssl_certfile = ssl_certfile
self.health_servicer = health.HealthServicer(experimental_non_blocking=True)
self.health_servicer = health.aio.HealthServicer()

async def setup_server(self):
"""
Expand Down Expand Up @@ -63,7 +63,9 @@ async def setup_server(self):
health_pb2_grpc.add_HealthServicer_to_server(self.health_servicer, self.server)

for service in service_names:
self.health_servicer.set(service, health_pb2.HealthCheckResponse.SERVING)
await self.health_servicer.set(
service, health_pb2.HealthCheckResponse.SERVING
)
reflection.enable_server_reflection(service_names, self.server)

bind_addr = f'{__default_host__}:{self.port}'
Expand Down Expand Up @@ -97,7 +99,7 @@ async def setup_server(self):
async def teardown(self):
"""Free other resources allocated with the server, e.g, gateway object, ..."""
await super().teardown()
self.health_servicer.enter_graceful_shutdown()
await self.health_servicer.enter_graceful_shutdown()

async def stop_server(self):
"""
Expand All @@ -118,6 +120,7 @@ async def dry_run(self, empty, context) -> jina_pb2.StatusProto:
:returns: the response request
"""
from docarray import DocumentArray

from jina.clients.request import request_generator
from jina.enums import DataInputType
from jina.serve.executors import __dry_run_endpoint__
Expand Down
8 changes: 5 additions & 3 deletions jina/serve/runtimes/head/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(
:param args: args from CLI
:param kwargs: keyword args
"""
self._health_servicer = health.HealthServicer(experimental_non_blocking=True)
self._health_servicer = health.aio.HealthServicer()

super().__init__(args, **kwargs)
if args.name is None:
Expand Down Expand Up @@ -154,7 +154,9 @@ async def async_setup(self):
)

for service in service_names:
self._health_servicer.set(service, health_pb2.HealthCheckResponse.SERVING)
await self._health_servicer.set(
service, health_pb2.HealthCheckResponse.SERVING
)
reflection.enable_server_reflection(service_names, self._grpc_server)

bind_addr = f'0.0.0.0:{self.args.port}'
Expand All @@ -174,7 +176,7 @@ async def async_cancel(self):

async def async_teardown(self):
"""Close the connection pool"""
self._health_servicer.enter_graceful_shutdown()
await self._health_servicer.enter_graceful_shutdown()
await self.async_cancel()
await self.connection_pool.close()

Expand Down
8 changes: 5 additions & 3 deletions jina/serve/runtimes/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(
:param args: args from CLI
:param kwargs: keyword args
"""
self._health_servicer = health.HealthServicer(experimental_non_blocking=True)
self._health_servicer = health.aio.HealthServicer()
super().__init__(args, **kwargs)

async def async_setup(self):
Expand Down Expand Up @@ -140,7 +140,9 @@ async def _async_setup_grpc_server(self):
)

for service in service_names:
self._health_servicer.set(service, health_pb2.HealthCheckResponse.SERVING)
await self._health_servicer.set(
service, health_pb2.HealthCheckResponse.SERVING
)
reflection.enable_server_reflection(service_names, self._grpc_server)
bind_addr = f'0.0.0.0:{self.args.port}'
self.logger.debug(f'start listening on {bind_addr}')
Expand All @@ -162,7 +164,7 @@ async def async_cancel(self):

async def async_teardown(self):
"""Close the data request handler"""
self._health_servicer.enter_graceful_shutdown()
await self._health_servicer.enter_graceful_shutdown()
await self.async_cancel()
self._request_handler.close()

Expand Down
25 changes: 25 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import os
import time

import pytest


@pytest.fixture()
def docker_image_name():
return 'jina/replica-exec'


@pytest.fixture()
def docker_image_built(docker_image_name):
cur_dir = os.path.dirname(os.path.abspath(__file__))
import docker

client = docker.from_env()
client.images.build(
path=os.path.join(cur_dir, 'replica-exec'), tag=docker_image_name
)
client.close()
yield
time.sleep(2)
client = docker.from_env()
client.containers.prune()
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,16 @@

from jina import Client, Document, DocumentArray, Flow

cur_dir = os.path.dirname(os.path.abspath(__file__))

img_name = 'jina/replica-exec'


@pytest.fixture(scope='function')
def docker_image_built():
import docker

client = docker.from_env()
client.images.build(path=os.path.join(cur_dir, 'replica-exec'), tag=img_name)
client.close()
yield
time.sleep(2)
client = docker.from_env()
client.containers.prune()


@pytest.mark.parametrize('shards', [1, 2])
@pytest.mark.parametrize('replicas', [1, 3, 4])
def test_containerruntime_args(docker_image_built, shards, replicas, port_generator):
def test_containerruntime_args(
docker_image_name, docker_image_built, shards, replicas, port_generator
):
exposed_port = port_generator()
f = Flow(port=exposed_port).add(
name='executor_container',
uses=f'docker://{img_name}',
uses=f'docker://{docker_image_name}',
replicas=replicas,
shards=shards,
polling='ANY',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ async def setup_server(self):
)

for service in service_names:
self.health_servicer.set(
await self.health_servicer.set(
service, health_pb2.HealthCheckResponse.SERVING
)
reflection.enable_server_reflection(service_names, self.server)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import os
import time

import pytest

from jina import Flow
from tests.integration.instrumentation import (
get_exported_jobs,
get_flow_metric_labels,
get_services,
)


def test_docker_instrumentation(
jaeger_port,
otlp_collector,
otlp_receiver_port,
docker_image_name,
docker_image_built,
prometheus_client,
expected_flow_metric_labels,
):
f = Flow(
tracing=True,
traces_exporter_host='localhost',
traces_exporter_port=otlp_receiver_port,
metrics=True,
metrics_exporter_host='localhost',
metrics_exporter_port=otlp_receiver_port,
).add(uses=f'docker://{docker_image_name}')

with f:
from jina import DocumentArray

f.post(f'/search', DocumentArray.empty(), continue_on_error=True)
# give some time for the tracing and metrics exporters to finish exporting.
# the client is slow to export the data
time.sleep(3)

services = get_services(jaeger_port)
assert set(services) == {'executor0/rep-0', 'gateway/rep-0'}

exported_jobs = get_exported_jobs(prometheus_client)
assert exported_jobs == {
'gateway/rep-0',
'executor0/rep-0',
}

flow_metric_labels = get_flow_metric_labels(prometheus_client)
assert flow_metric_labels.issubset(expected_flow_metric_labels)
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit 66fd738

Please sign in to comment.