From 99abcb40db8fd8af16949384d10b832d6d1613eb Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 20 Sep 2023 11:26:13 -0700 Subject: [PATCH] add accept grpc (#1841) * add accept grpc Signed-off-by: Yee Hing Tong Signed-off-by: Jeev B * unpin setup.py grpc Signed-off-by: Yee Hing Tong Signed-off-by: Jeev B * Revert "add accept grpc" This reverts commit 2294592f9ca30e7758d18c900fa058049d26ddda. Signed-off-by: Jeev B * default headers interceptor Signed-off-by: Jeev B * setup.py Signed-off-by: Jeev B * fixes Signed-off-by: Jeev B * fmt Signed-off-by: Jeev B * move prometheus-client import Signed-off-by: Jeev B --------- Signed-off-by: Yee Hing Tong Signed-off-by: Jeev B Co-authored-by: Jeev B Signed-off-by: Jeev B --- flytekit/clients/auth_helper.py | 16 ++++--- .../clients/grpc_utils/auth_interceptor.py | 2 +- .../default_metadata_interceptor.py | 43 +++++++++++++++++++ flytekit/clis/sdk_in_container/serve.py | 4 +- setup.py | 6 +-- 5 files changed, 58 insertions(+), 13 deletions(-) create mode 100644 flytekit/clients/grpc_utils/default_metadata_interceptor.py diff --git a/flytekit/clients/auth_helper.py b/flytekit/clients/auth_helper.py index 5c4fafe579..bdff000623 100644 --- a/flytekit/clients/auth_helper.py +++ b/flytekit/clients/auth_helper.py @@ -16,6 +16,7 @@ PKCEAuthenticator, ) from flytekit.clients.grpc_utils.auth_interceptor import AuthUnaryInterceptor +from flytekit.clients.grpc_utils.default_metadata_interceptor import DefaultMetadataInterceptor from flytekit.clients.grpc_utils.wrap_exception_interceptor import RetryExceptionWrapperInterceptor from flytekit.configuration import AuthType, PlatformConfig @@ -171,7 +172,7 @@ def get_channel(cfg: PlatformConfig, **kwargs) -> grpc.Channel: :return: grpc.Channel (secure / insecure) """ if cfg.insecure: - return grpc.insecure_channel(cfg.endpoint, **kwargs) + return grpc.intercept_channel(grpc.insecure_channel(cfg.endpoint, **kwargs), DefaultMetadataInterceptor()) credentials = None if "credentials" not in kwargs: @@ -189,11 +190,14 @@ def get_channel(cfg: PlatformConfig, **kwargs) -> grpc.Channel: ) else: credentials = kwargs["credentials"] - return grpc.secure_channel( - target=cfg.endpoint, - credentials=credentials, - options=kwargs.get("options", None), - compression=kwargs.get("compression", None), + return grpc.intercept_channel( + grpc.secure_channel( + target=cfg.endpoint, + credentials=credentials, + options=kwargs.get("options", None), + compression=kwargs.get("compression", None), + ), + DefaultMetadataInterceptor(), ) diff --git a/flytekit/clients/grpc_utils/auth_interceptor.py b/flytekit/clients/grpc_utils/auth_interceptor.py index 21bcc30136..53f178a9a9 100644 --- a/flytekit/clients/grpc_utils/auth_interceptor.py +++ b/flytekit/clients/grpc_utils/auth_interceptor.py @@ -32,7 +32,7 @@ def _call_details_with_auth_metadata(self, client_call_details: grpc.ClientCallD """ Returns new ClientCallDetails with metadata added. """ - metadata = None + metadata = client_call_details.metadata auth_metadata = self._authenticator.fetch_grpc_call_auth_metadata() if auth_metadata: metadata = [] diff --git a/flytekit/clients/grpc_utils/default_metadata_interceptor.py b/flytekit/clients/grpc_utils/default_metadata_interceptor.py new file mode 100644 index 0000000000..12b06cca03 --- /dev/null +++ b/flytekit/clients/grpc_utils/default_metadata_interceptor.py @@ -0,0 +1,43 @@ +import typing + +import grpc + +from flytekit.clients.grpc_utils.auth_interceptor import _ClientCallDetails + + +class DefaultMetadataInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): + def _inject_default_metadata(self, call_details: grpc.ClientCallDetails): + metadata = [("accept", "application/grpc")] + if call_details.metadata: + metadata.extend(list(call_details.metadata)) + new_details = _ClientCallDetails( + call_details.method, + call_details.timeout, + metadata, + call_details.credentials, + ) + return new_details + + def intercept_unary_unary( + self, + continuation: typing.Callable, + client_call_details: grpc.ClientCallDetails, + request: typing.Any, + ): + """ + Intercepts unary calls and inject default metadata + """ + updated_call_details = self._inject_default_metadata(client_call_details) + return continuation(updated_call_details, request) + + def intercept_unary_stream( + self, + continuation: typing.Callable, + client_call_details: grpc.ClientCallDetails, + request: typing.Any, + ): + """ + Handles a stream call and inject default metadata + """ + updated_call_details = self._inject_default_metadata(client_call_details) + return continuation(updated_call_details, request) diff --git a/flytekit/clis/sdk_in_container/serve.py b/flytekit/clis/sdk_in_container/serve.py index 2aed58312e..53f02b6481 100644 --- a/flytekit/clis/sdk_in_container/serve.py +++ b/flytekit/clis/sdk_in_container/serve.py @@ -4,8 +4,6 @@ from flyteidl.service.agent_pb2_grpc import add_AsyncAgentServiceServicer_to_server from grpc import aio -from flytekit.extend.backend.agent_service import AsyncAgentService - _serve_help = """Start a grpc server for the agent service.""" @@ -47,6 +45,8 @@ async def _start_grpc_server(port: int, worker: int, timeout: int): try: from prometheus_client import start_http_server + from flytekit.extend.backend.agent_service import AsyncAgentService + start_http_server(9090) except ImportError as e: click.secho(f"Failed to start the prometheus server with error {e}", fg="red") diff --git a/setup.py b/setup.py index 7642625fe3..6828cb8661 100644 --- a/setup.py +++ b/setup.py @@ -38,10 +38,8 @@ "deprecated>=1.0,<2.0", "docker>=4.0.0,<7.0.0", "python-dateutil>=2.1", - # Restrict grpcio and grpcio-status. Version 1.50.0 pulls in a version of protobuf that is not compatible - # with the old protobuf library (as described in https://developers.google.com/protocol-buffers/docs/news/2022-05-06) - "grpcio>=1.50.0,!=1.55.0,<1.53.1,<2.0", - "grpcio-status>=1.50.0,!=1.55.0,<1.53.1,<2.0", + "grpcio", + "grpcio-status", "importlib-metadata", "fsspec>=2023.3.0", "adlfs",