diff --git a/docs/fundamentals/executor/executor-methods.md b/docs/fundamentals/executor/executor-methods.md index b321b697915a2..58f380bdae32b 100644 --- a/docs/fundamentals/executor/executor-methods.md +++ b/docs/fundamentals/executor/executor-methods.md @@ -50,11 +50,14 @@ with f: ``` ```shell - Flow@18048[I]:🎉 Flow is ready to use! - 🔗 Protocol: GRPC - 🏠 Local access: 0.0.0.0:52255 - 🔒 Private network: 192.168.1.187:52255 - 🌐 Public address: 212.231.186.65:52255 +────────────────────────── 🎉 Flow is ready to serve! ────────────────────────── +╭────────────── 🔗 Endpoint ───────────────╮ +│ ⛓ Protocol GRPC │ +│ 🏠 Local 0.0.0.0:55925 │ +│ 🔒 Private 192.168.1.187:55925 │ +│ 🌍 Public 212.231.186.65:55925 │ +╰──────────────────────────────────────────╯ + Calling foo Calling bar Calling foo @@ -93,20 +96,20 @@ All Executor methods decorated by `@requests` need to follow the signature below The `async` definition is optional. ```python -from typing import Dict, Union, List +from typing import Dict, Union, List, Optional from jina import Executor, requests, DocumentArray class MyExecutor(Executor): @requests async def foo( - self, docs: DocumentArray, parameters: Dict, docs_matrix: List[DocumentArray] + self, docs: DocumentArray, parameters: Dict, docs_matrix: Optional[List[DocumentArray]], docs_map: Optional[Dict[str, DocumentArray]] ) -> Union[DocumentArray, Dict, None]: pass @requests def bar( - self, docs: DocumentArray, parameters: Dict, docs_matrix: List[DocumentArray] + self, docs: DocumentArray, parameters: Dict, docs_matrix: Optional[List[DocumentArray]], docs_map: Optional[Dict[str, DocumentArray]] ) -> Union[DocumentArray, Dict, None]: pass ``` @@ -118,10 +121,11 @@ any other `list`-like object in a Python function. - `parameters`: A Dict object that passes extra parameters to Executor functions. -- `docs_matrix`: This is the least common parameter to be used for an Executor. This is needed when an Executor is used inside a Flow to merge or reduce the output of more than one other Executor. - +- `docs_matrix`: This is one of the least common parameters to be used for an Executor. It is passed when multiple parallel branches lead into the Executor, and `no_reduce=True` is set. Each DocumentArray in the matrix is the output of one previous Executor. +- `docs_map`: This is also one of the least common parameter to be used for an Executor. It has the same utility as `docs_matrix` but the information comes as a dict with previous Executor names as keys, and DocumentArrays as values. +- `tracing_context`: Context needed if you want to add custom traces. Check {ref}`how to add custom traces in your Executor `. ````{admonition} Hint @@ -156,11 +160,14 @@ class MyExecutor(Executor): ### Multiple DocumentArrays as input argument -You have seen that `Executor` methods can receive three types of parameters: `docs`, `parameters` and `docs_matrix`. +You have seen that {class}`~jina.Executor` methods can receive multiple parameters. + +`docs_matrix` and `docs_map` are only used in some special cases. -`docs_matrix` is only used in some special cases. +One case is when an Executor receives messages from more than one incoming {class}`~jina.Executor` in the {class}`~jina.Flow`: -One case is when an Executor receives messages from more than one upstream Executor in the Flow: +If you set `no_reduce` to True and the Executor has more than one incoming Executor, the Executor will receive all the DocumentArrays coming from previous Executors independently under `docs_matrix` and `docs_map`. +If `no_reduce` is not set or set to False, `docs_map` and `docs_matrix` will be None and the Executor will receive a single DocumentArray resulting from the reducing of all the incoming ones. ```python from jina import Flow, Executor, requests, Document, DocumentArray @@ -198,26 +205,79 @@ f = ( Flow() .add(uses=Exec1, name='exec1') .add(uses=Exec2, name='exec2') - .add(uses=MergeExec, needs=['exec1', 'exec2'], disable_reduce=True) + .add(uses=MergeExec, needs=['exec1', 'exec2'], no_reduce=True) ) with f: - returned_docs = f.post(on='/', Document()) + returned_docs = f.post(on='/', inputs=Document()) print(f'Resulting documents {returned_docs[0].text}') ``` ```shell - Flow@1244[I]:🎉 Flow is ready to use! - 🔗 Protocol: GRPC - 🏠 Local access: 0.0.0.0:54550 - 🔒 Private network: 192.168.1.187:54550 - 🌐 Public address: 212.231.186.65:54550 +────────────────────────── 🎉 Flow is ready to serve! ────────────────────────── +╭────────────── 🔗 Endpoint ───────────────╮ +│ ⛓ Protocol GRPC │ +│ 🏠 Local 0.0.0.0:55761 │ +│ 🔒 Private 192.168.1.187:55761 │ +│ 🌍 Public 212.231.186.65:55761 │ +╰──────────────────────────────────────────╯ + MergeExec processing pairs of Documents "Exec1" and "Exec2" Resulting documents Document merging from "Exec1" and "Exec2" ``` +When merging Documents from more than one upstream {class}`~jina.Executor`, sometimes you want to control which Documents come from which Executor. +Executor will receive the `docs_map` as a dictionary where the key will be the last Executor processing that previous request and the DocumentArray of the request as the values. + +```python +from jina import Flow, Executor, requests, Document + + +class Exec1(Executor): + @requests + def foo(self, docs, **kwargs): + for doc in docs: + doc.text = 'Exec1' + + +class Exec2(Executor): + @requests + def foo(self, docs, **kwargs): + for doc in docs: + doc.text = 'Exec2' + + +class MergeExec(Executor): + @requests + def foo(self, docs_map, **kwargs): + print(docs_map) + +f = ( + Flow() + .add(uses=Exec1, name='exec1') + .add(uses=Exec2, name='exec2') + .add(uses=MergeExec, needs=['exec1', 'exec2'], no_reduce=True) +) + +with f: + f.post(on='/', Document()) +``` + + +```shell +────────────────────────── 🎉 Flow is ready to serve! ────────────────────────── +╭────────────── 🔗 Endpoint ───────────────╮ +│ ⛓ Protocol GRPC │ +│ 🏠 Local 0.0.0.0:56286 │ +│ 🔒 Private 192.168.1.187:56286 │ +│ 🌍 Public 212.231.186.65:56286 │ +╰──────────────────────────────────────────╯ + +{'exec1': , 'exec2': } +``` + (async-executors)= ## Async coroutines @@ -227,7 +287,7 @@ Python to write concurrent code. ```python -from jina import Executor, requests, Flow +from jina import Executor, requests class MyExecutor(Executor): @@ -242,7 +302,6 @@ This example has a heavy lifting API which we call several times, and we leverag async Python features to speed up the {class}`~jina.Executor`'s call by calling the API multiple times concurrently. As a counterpart, in an example without `coroutines`, all 50 API calls are queued and nothing is done concurrently. - ````{tab} Async coroutines ```python import asyncio @@ -269,12 +328,16 @@ with f: ``` ```shell - Flow@20588[I]:🎉 Flow is ready to use! - 🔗 Protocol: GRPC - 🏠 Local access: 0.0.0.0:62598 - 🔒 Private network: 192.168.1.187:62598 - 🌐 Public address: 212.231.186.65:62598 -⠙ DONE ━╸━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:00:01 100% ETA: 0 seconds 41 steps done in 1 second +────────────────────────── 🎉 Flow is ready to serve! ────────────────────────── +╭────────────── 🔗 Endpoint ───────────────╮ +│ ⛓ Protocol GRPC │ +│ 🏠 Local 0.0.0.0:54153 │ +│ 🔒 Private 192.168.1.187:54153 │ +│ 🌍 Public 212.231.186.65:54153 │ +╰──────────────────────────────────────────╯ + + DONE ━━━━━━━━━━━━━━━━━━━━━━ 0:00:01 100% ETA: 0:00:00 50 steps done in 1 + second ``` ```` @@ -305,12 +368,16 @@ with f: ``` ```shell - Flow@20394[I]:🎉 Flow is ready to use! - 🔗 Protocol: GRPC - 🏠 Local access: 0.0.0.0:52592 - 🔒 Private network: 192.168.1.187:52592 - 🌐 Public address: 212.231.186.65:52592 -⠏ DONE ━╸━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:00:50 100% ETA: 0 seconds 41 steps done in 50 seconds +────────────────────────── 🎉 Flow is ready to serve! ────────────────────────── +╭────────────── 🔗 Endpoint ───────────────╮ +│ ⛓ Protocol GRPC │ +│ 🏠 Local 0.0.0.0:52340 │ +│ 🔒 Private 192.168.1.187:52340 │ +│ 🌍 Public 212.231.186.65:52340 │ +╰──────────────────────────────────────────╯ + + DONE ━━━━━━━━━━━━━━━━━━━━━━ 0:00:50 100% ETA: 0:00:00 50 steps done in 50 + seconds ``` ```` @@ -335,9 +402,6 @@ class DummyExecutor(Executor): ``` - - - ## Returns Every Executor method can `return` in three ways: @@ -457,11 +521,14 @@ with f: ```shell - Flow@23300[I]:🎉 Flow is ready to use! - 🔗 Protocol: GRPC - 🏠 Local access: 0.0.0.0:61855 - 🔒 Private network: 192.168.1.187:61855 - 🌐 Public address: 212.231.186.65:61855 +────────────────────────── 🎉 Flow is ready to serve! ────────────────────────── +╭────────────── 🔗 Endpoint ───────────────╮ +│ ⛓ Protocol GRPC │ +│ 🏠 Local 0.0.0.0:58746 │ +│ 🔒 Private 192.168.1.187:58746 │ +│ 🌍 Public 212.231.186.65:58746 │ +╰──────────────────────────────────────────╯ + ProcessDocuments: received document with text "request1" PrintExecutor: received document with text: "I changed the executor in place" ProcessDocuments: received document with text: "request2" diff --git a/docs/fundamentals/flow/executor-args.md b/docs/fundamentals/flow/executor-args.md index 38bb50da17cc2..0dcc559c72959 100644 --- a/docs/fundamentals/flow/executor-args.md +++ b/docs/fundamentals/flow/executor-args.md @@ -17,8 +17,7 @@ | `py_modules` | The customized python modules need to be imported before loading the executor

Note that the recommended way is to only import a single module - a simple python file, if your
executor can be defined in a single file, or an ``__init__.py`` file if you have multiple files,
which should be structured as a python package. For more details, please see the
`Executor cookbook `__ | `array` | `None` | | `output_array_type` | The type of array `tensor` and `embedding` will be serialized to.

Supports the same types as `docarray.to_protobuf(.., ndarray_type=...)`, which can be found
`here `.
Defaults to retaining whatever type is returned by the Executor. | `string` | `None` | | `exit_on_exceptions` | List of exceptions that will cause the Executor to shut down. | `array` | `[]` | -| `port` | The port for input data to bind to, default is a random port between [49152, 65535]. In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. | `string` | `random in [49152, 65535]` | -| `host_in` | The host address for binding to, by default it is 0.0.0.0 | `string` | `0.0.0.0` | +| `no_reduce` | Disable the built-in reduction mechanism. Set this if the reduction is to be handled by the Executor itself by operating on a `docs_matrix` or `docs_map` | `boolean` | `False` | | `grpc_server_options` | Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} | `object` | `None` | | `entrypoint` | The entrypoint command overrides the ENTRYPOINT in Docker image. when not set then the Docker image ENTRYPOINT takes effective. | `string` | `None` | | `docker_kwargs` | Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker '
container.

More details can be found in the Docker SDK docs: https://docker-py.readthedocs.io/en/stable/ | `object` | `None` | @@ -31,6 +30,7 @@ | `runtime_cls` | The runtime class to run inside the Pod | `string` | `WorkerRuntime` | | `timeout_ready` | The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever | `number` | `600000` | | `env` | The map of environment variables that are available inside runtime | `object` | `None` | +| `port` | The port for input data to bind to, default is a random port between [49152, 65535].In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. | `string` | `random in [49152, 65535]` | | `monitoring` | If set, spawn an http server with a prometheus endpoint to expose metrics | `boolean` | `False` | | `port_monitoring` | The port on which the prometheus server is exposed, default is a random port between [49152, 65535] | `string` | `random in [49152, 65535]` | | `retries` | Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas) | `number` | `-1` | @@ -47,5 +47,4 @@ | `uses_before_address` | The address of the uses-before runtime | `string` | `None` | | `uses_after_address` | The address of the uses-before runtime | `string` | `None` | | `connection_list` | dictionary JSON with a list of connections to configure | `string` | `None` | -| `disable_reduce` | Disable the built-in reduce mechanism, set this if the reduction is to be handled by the Executor connected to this Head | `boolean` | `False` | | `timeout_send` | The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default | `number` | `None` | \ No newline at end of file diff --git a/docs/fundamentals/flow/gateway-args.md b/docs/fundamentals/flow/gateway-args.md index c5f2749e369c7..a09126c1be5a9 100644 --- a/docs/fundamentals/flow/gateway-args.md +++ b/docs/fundamentals/flow/gateway-args.md @@ -25,14 +25,13 @@ | `uses` | The config of the gateway, it could be one of the followings:
* the string literal of an Gateway class name
* a Gateway YAML file (.yml, .yaml, .jaml)
* a docker image (must start with `docker://`)
* the string literal of a YAML config (must start with `!` or `jtype: `)
* the string literal of a JSON config

When use it under Python, one can use the following values additionally:
- a Python dict that represents the config
- a text file stream has `.read()` interface | `string` | `None` | | `uses_with` | Dictionary of keyword arguments that will override the `with` configuration in `uses` | `object` | `None` | | `py_modules` | The customized python modules need to be imported before loading the gateway

Note that the recommended way is to only import a single module - a simple python file, if your
gateway can be defined in a single file, or an ``__init__.py`` file if you have multiple files,
which should be structured as a python package. | `array` | `None` | -| `port` | The port for input data to bind to, default is a random port between [49152, 65535]. In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. | `string` | `random in [49152, 65535]` | -| `host_in` | The host address for binding to, by default it is 0.0.0.0 | `string` | `0.0.0.0` | | `grpc_server_options` | Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} | `object` | `None` | +| `port` | The port for input data to bind to, default is a random port between [49152, 65535].In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. | `string` | `random in [49152, 65535]` | | `graph_description` | Routing graph for the gateway | `string` | `{}` | | `graph_conditions` | Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents. | `string` | `{}` | | `deployments_addresses` | JSON dictionary with the input addresses of each Deployment | `string` | `{}` | | `deployments_metadata` | JSON dictionary with the request metadata for each Deployment | `string` | `{}` | -| `deployments_disable_reduce` | list JSON disabling the built-in merging mechanism for each Deployment listed | `string` | `[]` | +| `deployments_no_reduce` | list JSON disabling the built-in merging mechanism for each Deployment listed | `string` | `[]` | | `compression` | The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression. | `string` | `None` | | `timeout_send` | The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default | `number` | `None` | | `runtime_cls` | The runtime class to run inside the Pod | `string` | `GatewayRuntime` | diff --git a/docs/fundamentals/flow/topologies.md b/docs/fundamentals/flow/topologies.md index fcead6121beea..e6658499bb406 100644 --- a/docs/fundamentals/flow/topologies.md +++ b/docs/fundamentals/flow/topologies.md @@ -55,7 +55,7 @@ This gives the output: ``` Both `BarExecutor` and `BazExecutor` only received a single `Document` from `FooExecutor` because they are run in parallel. The last Executor `executor3` receives both DocumentArrays and merges them automatically. -This automated merging can be disabled with `disable_reduce=True`. This is useful for providing custom merge logic in a separate Executor. In this case the last `.add()` call would look like `.add(needs=['barExecutor', 'bazExecutor'], uses=CustomMergeExecutor, disable_reduce=True)`. This feature requires Jina >= 3.0.2. +This automated merging can be disabled with `no_reduce=True`. This is useful for providing custom merge logic in a separate Executor. In this case the last `.add()` call would look like `.add(needs=['barExecutor', 'bazExecutor'], uses=CustomMergeExecutor, no_reduce=True)`. This feature requires Jina >= 3.0.2. (replicate-executors)= ## Replicate Executors diff --git a/docs/how-to/external-executor.md b/docs/how-to/external-executor.md index ce83bf2f01d7a..4bc7ba42579e1 100644 --- a/docs/how-to/external-executor.md +++ b/docs/how-to/external-executor.md @@ -63,7 +63,7 @@ This connects to `grpc://localhost:12345` and `grpc://91.198.174.192:12346` as t ````{admonition} Reducing :class: hint -If an external Executor needs multiple predecessors, reducing needs to be enabled. So setting disable_reduce=True is not allowed for these cases. +If an external Executor needs multiple predecessors, reducing needs to be enabled. So setting no_reduce=True is not allowed for these cases. ```` diff --git a/jina/orchestrate/flow/base.py b/jina/orchestrate/flow/base.py index b672d9f1407fe..ed2014fa668a1 100644 --- a/jina/orchestrate/flow/base.py +++ b/jina/orchestrate/flow/base.py @@ -159,8 +159,8 @@ def __init__( compression: Optional[str] = None, cors: Optional[bool] = False, deployments_addresses: Optional[str] = '{}', - deployments_disable_reduce: Optional[str] = '[]', deployments_metadata: Optional[str] = '{}', + deployments_no_reduce: Optional[str] = '[]', description: Optional[str] = None, docker_kwargs: Optional[dict] = None, entrypoint: Optional[str] = None, @@ -172,7 +172,6 @@ def __init__( graph_description: Optional[str] = '{}', grpc_server_options: Optional[dict] = None, host: Optional[str] = '0.0.0.0', - host_in: Optional[str] = '0.0.0.0', log_config: Optional[str] = None, metrics: Optional[bool] = False, metrics_exporter_host: Optional[str] = None, @@ -211,8 +210,8 @@ def __init__( :param compression: The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression. :param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access. :param deployments_addresses: JSON dictionary with the input addresses of each Deployment - :param deployments_disable_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed :param deployments_metadata: JSON dictionary with the request metadata for each Deployment + :param deployments_no_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed :param description: The description of this HTTP server. It will be used in automatics docs such as Swagger UI. :param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker ' container. @@ -227,7 +226,6 @@ def __init__( :param graph_description: Routing graph for the gateway :param grpc_server_options: Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} :param host: The host address of the runtime, by default it is 0.0.0.0. In the case of an external Executor (`--external` or `external=True`) this can be a list of hosts, separated by commas. Then, every resulting address will be considered as one replica of the Executor. - :param host_in: The host address for binding to, by default it is 0.0.0.0 :param log_config: The YAML config of the logger used in this object. :param metrics: If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. :param metrics_exporter_host: If tracing is enabled, this hostname will be used to configure the metrics exporter agent. @@ -246,7 +244,7 @@ def __init__( Any executor that has `@requests(on=...)` bound with those values will receive data requests. :param no_debug_endpoints: If set, `/status` `/post` endpoints are removed from HTTP interface. - :param port: The port for input data to bind to, default is a random port between [49152, 65535]. In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. + :param port: The port for input data to bind to, default is a random port between [49152, 65535].In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. :param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535] :param prefetch: Number of requests fetched from the client before feeding into the first Executor. @@ -383,8 +381,8 @@ def __init__( :param compression: The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression. :param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access. :param deployments_addresses: JSON dictionary with the input addresses of each Deployment - :param deployments_disable_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed :param deployments_metadata: JSON dictionary with the request metadata for each Deployment + :param deployments_no_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed :param description: The description of this HTTP server. It will be used in automatics docs such as Swagger UI. :param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker ' container. @@ -399,7 +397,6 @@ def __init__( :param graph_description: Routing graph for the gateway :param grpc_server_options: Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} :param host: The host address of the runtime, by default it is 0.0.0.0. In the case of an external Executor (`--external` or `external=True`) this can be a list of hosts, separated by commas. Then, every resulting address will be considered as one replica of the Executor. - :param host_in: The host address for binding to, by default it is 0.0.0.0 :param log_config: The YAML config of the logger used in this object. :param metrics: If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. :param metrics_exporter_host: If tracing is enabled, this hostname will be used to configure the metrics exporter agent. @@ -418,7 +415,7 @@ def __init__( Any executor that has `@requests(on=...)` bound with those values will receive data requests. :param no_debug_endpoints: If set, `/status` `/post` endpoints are removed from HTTP interface. - :param port: The port for input data to bind to, default is a random port between [49152, 65535]. In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. + :param port: The port for input data to bind to, default is a random port between [49152, 65535].In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. :param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535] :param prefetch: Number of requests fetched from the client before feeding into the first Executor. @@ -600,7 +597,7 @@ def _add_gateway( deployments_addresses: Dict[str, List[str]], deployments_metadata: Dict[str, Dict[str, str]], graph_conditions: Dict[str, Dict], - deployments_disable_reduce: List[str], + deployments_no_reduce: List[str], **kwargs, ): kwargs.update( @@ -628,7 +625,7 @@ def _add_gateway( args.graph_conditions = json.dumps(graph_conditions) args.deployments_addresses = json.dumps(deployments_addresses) args.deployments_metadata = json.dumps(deployments_metadata) - args.deployments_disable_reduce = json.dumps(deployments_disable_reduce) + args.deployments_no_reduce = json.dumps(deployments_no_reduce) self._deployment_nodes[GATEWAY_NAME] = Deployment(args, needs) def _get_deployments_metadata(self) -> Dict[str, Dict[str, str]]: @@ -752,7 +749,7 @@ def _get_graph_conditions(self) -> Dict[str, Dict]: def _get_disabled_reduce_deployments(self) -> List[str]: disabled_deployments = [] for node, v in self._deployment_nodes.items(): - if v.args.disable_reduce: + if v.args.no_reduce: disabled_deployments.append(node) return disabled_deployments @@ -833,7 +830,6 @@ def add( compression: Optional[str] = None, connection_list: Optional[str] = None, disable_auto_volume: Optional[bool] = False, - disable_reduce: Optional[bool] = False, docker_kwargs: Optional[dict] = None, entrypoint: Optional[str] = None, env: Optional[dict] = None, @@ -845,7 +841,6 @@ def add( grpc_metadata: Optional[dict] = None, grpc_server_options: Optional[dict] = None, host: Optional[str] = '0.0.0.0', - host_in: Optional[str] = '0.0.0.0', install_requirements: Optional[bool] = False, log_config: Optional[str] = None, metrics: Optional[bool] = False, @@ -854,6 +849,7 @@ def add( monitoring: Optional[bool] = False, name: Optional[str] = None, native: Optional[bool] = False, + no_reduce: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[str] = None, @@ -892,7 +888,6 @@ def add( :param compression: The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression. :param connection_list: dictionary JSON with a list of connections to configure :param disable_auto_volume: Do not automatically mount a volume for dockerized Executors. - :param disable_reduce: Disable the built-in reduce mechanism, set this if the reduction is to be handled by the Executor connected to this Head :param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker ' container. @@ -914,7 +909,6 @@ def add( :param grpc_metadata: The metadata to be passed to the gRPC request. :param grpc_server_options: Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} :param host: The host address of the runtime, by default it is 0.0.0.0. In the case of an external Executor (`--external` or `external=True`) this can be a list of hosts, separated by commas. Then, every resulting address will be considered as one replica of the Executor. - :param host_in: The host address for binding to, by default it is 0.0.0.0 :param install_requirements: If set, install `requirements.txt` in the Hub Executor bundle to local :param log_config: The YAML config of the logger used in this object. :param metrics: If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. @@ -931,6 +925,7 @@ def add( When not given, then the default naming strategy will apply. :param native: If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime. + :param no_reduce: Disable the built-in reduction mechanism. Set this if the reduction is to be handled by the Executor itself by operating on a `docs_matrix` or `docs_map` :param output_array_type: The type of array `tensor` and `embedding` will be serialized to. Supports the same types as `docarray.to_protobuf(.., ndarray_type=...)`, which can be found @@ -944,7 +939,7 @@ def add( Define per Endpoint: JSON dict, {endpoint: PollingType} {'/custom': 'ALL', '/search': 'ANY', '*': 'ANY'} - :param port: The port for input data to bind to, default is a random port between [49152, 65535]. In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. + :param port: The port for input data to bind to, default is a random port between [49152, 65535].In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. :param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535] :param py_modules: The customized python modules need to be imported before loading the executor @@ -1048,7 +1043,6 @@ def add( :param compression: The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression. :param connection_list: dictionary JSON with a list of connections to configure :param disable_auto_volume: Do not automatically mount a volume for dockerized Executors. - :param disable_reduce: Disable the built-in reduce mechanism, set this if the reduction is to be handled by the Executor connected to this Head :param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker ' container. @@ -1070,7 +1064,6 @@ def add( :param grpc_metadata: The metadata to be passed to the gRPC request. :param grpc_server_options: Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} :param host: The host address of the runtime, by default it is 0.0.0.0. In the case of an external Executor (`--external` or `external=True`) this can be a list of hosts, separated by commas. Then, every resulting address will be considered as one replica of the Executor. - :param host_in: The host address for binding to, by default it is 0.0.0.0 :param install_requirements: If set, install `requirements.txt` in the Hub Executor bundle to local :param log_config: The YAML config of the logger used in this object. :param metrics: If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. @@ -1087,6 +1080,7 @@ def add( When not given, then the default naming strategy will apply. :param native: If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime. + :param no_reduce: Disable the built-in reduction mechanism. Set this if the reduction is to be handled by the Executor itself by operating on a `docs_matrix` or `docs_map` :param output_array_type: The type of array `tensor` and `embedding` will be serialized to. Supports the same types as `docarray.to_protobuf(.., ndarray_type=...)`, which can be found @@ -1100,7 +1094,7 @@ def add( Define per Endpoint: JSON dict, {endpoint: PollingType} {'/custom': 'ALL', '/search': 'ANY', '*': 'ANY'} - :param port: The port for input data to bind to, default is a random port between [49152, 65535]. In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. + :param port: The port for input data to bind to, default is a random port between [49152, 65535].In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. :param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535] :param py_modules: The customized python modules need to be imported before loading the executor @@ -1236,7 +1230,7 @@ def add( port = helper.random_port() args.port = port - if len(needs) > 1 and args.external and args.disable_reduce: + if len(needs) > 1 and args.external and args.no_reduce: raise ValueError( 'External Executors with multiple needs have to do auto reduce.' ) @@ -1256,8 +1250,8 @@ def config_gateway( compression: Optional[str] = None, cors: Optional[bool] = False, deployments_addresses: Optional[str] = '{}', - deployments_disable_reduce: Optional[str] = '[]', deployments_metadata: Optional[str] = '{}', + deployments_no_reduce: Optional[str] = '[]', description: Optional[str] = None, docker_kwargs: Optional[dict] = None, entrypoint: Optional[str] = None, @@ -1269,7 +1263,6 @@ def config_gateway( graph_description: Optional[str] = '{}', grpc_server_options: Optional[dict] = None, host: Optional[str] = '0.0.0.0', - host_in: Optional[str] = '0.0.0.0', log_config: Optional[str] = None, metrics: Optional[bool] = False, metrics_exporter_host: Optional[str] = None, @@ -1308,8 +1301,8 @@ def config_gateway( :param compression: The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression. :param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access. :param deployments_addresses: JSON dictionary with the input addresses of each Deployment - :param deployments_disable_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed :param deployments_metadata: JSON dictionary with the request metadata for each Deployment + :param deployments_no_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed :param description: The description of this HTTP server. It will be used in automatics docs such as Swagger UI. :param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker ' container. @@ -1324,7 +1317,6 @@ def config_gateway( :param graph_description: Routing graph for the gateway :param grpc_server_options: Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} :param host: The host address of the runtime, by default it is 0.0.0.0. In the case of an external Executor (`--external` or `external=True`) this can be a list of hosts, separated by commas. Then, every resulting address will be considered as one replica of the Executor. - :param host_in: The host address for binding to, by default it is 0.0.0.0 :param log_config: The YAML config of the logger used in this object. :param metrics: If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. :param metrics_exporter_host: If tracing is enabled, this hostname will be used to configure the metrics exporter agent. @@ -1343,7 +1335,7 @@ def config_gateway( Any executor that has `@requests(on=...)` bound with those values will receive data requests. :param no_debug_endpoints: If set, `/status` `/post` endpoints are removed from HTTP interface. - :param port: The port for input data to bind to, default is a random port between [49152, 65535]. In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. + :param port: The port for input data to bind to, default is a random port between [49152, 65535].In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. :param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535] :param prefetch: Number of requests fetched from the client before feeding into the first Executor. @@ -1404,8 +1396,8 @@ def config_gateway( :param compression: The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression. :param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access. :param deployments_addresses: JSON dictionary with the input addresses of each Deployment - :param deployments_disable_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed :param deployments_metadata: JSON dictionary with the request metadata for each Deployment + :param deployments_no_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed :param description: The description of this HTTP server. It will be used in automatics docs such as Swagger UI. :param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker ' container. @@ -1420,7 +1412,6 @@ def config_gateway( :param graph_description: Routing graph for the gateway :param grpc_server_options: Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} :param host: The host address of the runtime, by default it is 0.0.0.0. In the case of an external Executor (`--external` or `external=True`) this can be a list of hosts, separated by commas. Then, every resulting address will be considered as one replica of the Executor. - :param host_in: The host address for binding to, by default it is 0.0.0.0 :param log_config: The YAML config of the logger used in this object. :param metrics: If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. :param metrics_exporter_host: If tracing is enabled, this hostname will be used to configure the metrics exporter agent. @@ -1439,7 +1430,7 @@ def config_gateway( Any executor that has `@requests(on=...)` bound with those values will receive data requests. :param no_debug_endpoints: If set, `/status` `/post` endpoints are removed from HTTP interface. - :param port: The port for input data to bind to, default is a random port between [49152, 65535]. In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. + :param port: The port for input data to bind to, default is a random port between [49152, 65535].In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. :param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535] :param prefetch: Number of requests fetched from the client before feeding into the first Executor. @@ -1661,7 +1652,7 @@ def build( deployments_addresses=op_flow._get_deployments_addresses(), deployments_metadata=op_flow._get_deployments_metadata(), graph_conditions=op_flow._get_graph_conditions(), - deployments_disable_reduce=op_flow._get_disabled_reduce_deployments(), + deployments_no_reduce=op_flow._get_disabled_reduce_deployments(), uses=op_flow.gateway_args.uses, ) diff --git a/jina/parsers/deprecated.py b/jina/parsers/deprecated.py index 2f0e9341ac1c4..9a97ae1b722eb 100644 --- a/jina/parsers/deprecated.py +++ b/jina/parsers/deprecated.py @@ -5,7 +5,10 @@ 'port_expose': 'port', 'parallel': 'One of "shards" (when dividing data in indexers) or "replicas" (replicating Executors for performance and reliability)', 'port_in': 'port', + 'host_in': 'host', 'https': 'tls', + 'disable_reduce': 'no_reduce', + 'deployments_disable_reduce': 'deployments_no_reduce' } diff --git a/jina/parsers/orchestrate/pod.py b/jina/parsers/orchestrate/pod.py index 9cbe5248f1d44..b4c1f7107cd45 100644 --- a/jina/parsers/orchestrate/pod.py +++ b/jina/parsers/orchestrate/pod.py @@ -90,10 +90,12 @@ def mixin_pod_parser(parser, pod_type: str = 'worker'): gp.add_argument( '--port', + '--port-in', type=str, default=str(helper.random_port()), help='The port for input data to bind to, default is a random port between [49152, 65535].' - ' In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas.' + 'In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, ' + 'separated by commas. ' ' Then, every resulting address will be considered as one replica of the Executor.', ) diff --git a/jina/parsers/orchestrate/runtimes/head.py b/jina/parsers/orchestrate/runtimes/head.py index 76878e538c728..352f654ec0e7b 100644 --- a/jina/parsers/orchestrate/runtimes/head.py +++ b/jina/parsers/orchestrate/runtimes/head.py @@ -33,13 +33,6 @@ def mixin_head_parser(parser): help='dictionary JSON with a list of connections to configure', ) - gp.add_argument( - '--disable-reduce', - action='store_true', - default=False, - help='Disable the built-in reduce mechanism, set this if the reduction is to be handled by the Executor connected to this Head', - ) - gp.add_argument( '--timeout-send', type=int, diff --git a/jina/parsers/orchestrate/runtimes/remote.py b/jina/parsers/orchestrate/runtimes/remote.py index adbda1cbc7b34..693c9acfe573d 100644 --- a/jina/parsers/orchestrate/runtimes/remote.py +++ b/jina/parsers/orchestrate/runtimes/remote.py @@ -126,6 +126,7 @@ def mixin_gateway_parser(parser): ) parser.add_argument( + '--deployments-no-reduce', '--deployments-disable-reduce', type=str, help='list JSON disabling the built-in merging mechanism for each Deployment listed', @@ -150,6 +151,7 @@ def mixin_gateway_parser(parser): def _add_host(arg_group): arg_group.add_argument( '--host', + '--host-in', type=str, default=__default_host__, help=f'The host address of the runtime, by default it is {__default_host__}.' diff --git a/jina/parsers/orchestrate/runtimes/runtime.py b/jina/parsers/orchestrate/runtimes/runtime.py index 136ddb8fd6329..4922a7cfb0295 100644 --- a/jina/parsers/orchestrate/runtimes/runtime.py +++ b/jina/parsers/orchestrate/runtimes/runtime.py @@ -1,6 +1,5 @@ """Argparser module for WorkerRuntime""" -from jina import __default_host__, helper from jina.parsers.helper import KVAppendAction @@ -9,20 +8,6 @@ def mixin_base_runtime_parser(arg_group): :param arg_group: the parser instance to which we add arguments """ - arg_group.add_argument( - '--port-in', - type=int, - default=helper.random_port(), - dest='port', - help='The port for input data to bind to, default a random port between [49152, 65535]', - ) - arg_group.add_argument( - '--host-in', - type=str, - default=__default_host__, - help=f'The host address for binding to, by default it is {__default_host__}', - ) - arg_group.add_argument( '--grpc-server-options', action=KVAppendAction, diff --git a/jina/parsers/orchestrate/runtimes/worker.py b/jina/parsers/orchestrate/runtimes/worker.py index 328fff4df4465..872afa47e0364 100644 --- a/jina/parsers/orchestrate/runtimes/worker.py +++ b/jina/parsers/orchestrate/runtimes/worker.py @@ -92,4 +92,13 @@ def mixin_worker_runtime_parser(parser): nargs='*', help='List of exceptions that will cause the Executor to shut down.', ) + + gp.add_argument( + '--no-reduce', + '--disable-reduce', + action='store_true', + default=False, + help='Disable the built-in reduction mechanism. Set this if the reduction is to be handled by the Executor itself by operating on a `docs_matrix` or `docs_map`', + ) + mixin_base_runtime_parser(gp) diff --git a/jina/serve/gateway.py b/jina/serve/gateway.py index 66c9d543824a6..fd95cc6828491 100644 --- a/jina/serve/gateway.py +++ b/jina/serve/gateway.py @@ -113,14 +113,14 @@ def inject_dependencies( graph_conditions = json.loads(args.graph_conditions) deployments_addresses = json.loads(args.deployments_addresses) deployments_metadata = json.loads(args.deployments_metadata) - deployments_disable_reduce = json.loads(args.deployments_disable_reduce) + deployments_no_reduce = json.loads(args.deployments_no_reduce) self.streamer = GatewayStreamer( graph_representation=graph_description, executor_addresses=deployments_addresses, graph_conditions=graph_conditions, deployments_metadata=deployments_metadata, - deployments_disable_reduce=deployments_disable_reduce, + deployments_no_reduce=deployments_no_reduce, timeout_send=timeout_send, retries=args.retries, compression=args.compression, diff --git a/jina/serve/runtimes/gateway/graph/topology_graph.py b/jina/serve/runtimes/gateway/graph/topology_graph.py index bcb92553a046d..d284f427e1270 100644 --- a/jina/serve/runtimes/gateway/graph/topology_graph.py +++ b/jina/serve/runtimes/gateway/graph/topology_graph.py @@ -127,10 +127,8 @@ async def _wait_previous_and_send( request.parameters = _parse_specific_params( request.parameters, self.name ) - if copy_request_at_send: - self.parts_to_send.append(copy.deepcopy(request)) - else: - self.parts_to_send.append(request) + req_to_send = copy.deepcopy(request) if copy_request_at_send else request + self.parts_to_send.append(req_to_send) # this is a specific needs if len(self.parts_to_send) == self.number_of_parts: self.start_time = datetime.utcnow() @@ -328,7 +326,7 @@ def __init__( graph_representation: Dict, graph_conditions: Dict = {}, deployments_metadata: Dict = {}, - deployments_disable_reduce: List[str] = [], + deployments_no_reduce: List[str] = [], timeout_send: Optional[float] = 1.0, retries: Optional[int] = -1, *args, @@ -363,7 +361,7 @@ def __init__( floating=node_name in floating_deployment_set, filter_condition=condition, metadata=metadata, - reduce=node_name not in deployments_disable_reduce, + reduce=node_name not in deployments_no_reduce, timeout_send=timeout_send, retries=retries, ) diff --git a/jina/serve/runtimes/gateway/request_handling.py b/jina/serve/runtimes/gateway/request_handling.py index 506fbb32ad763..0c342397016a9 100644 --- a/jina/serve/runtimes/gateway/request_handling.py +++ b/jina/serve/runtimes/gateway/request_handling.py @@ -8,6 +8,7 @@ from jina.excepts import InternalNetworkError from jina.importer import ImportExtensions +from jina.helper import GATEWAY_NAME from jina.proto import jina_pb2 from jina.serve.networking import GrpcConnectionPool from jina.serve.runtimes.gateway.graph.topology_graph import TopologyGraph @@ -346,7 +347,7 @@ async def _process_results_at_end_gateway( asyncio.create_task(gather_endpoints(request_graph)) partial_responses = await asyncio.gather(*tasks) - except Exception as e: + except Exception: # update here failed request self._update_end_failed_requests_metrics() raise @@ -356,6 +357,12 @@ async def _process_results_at_end_gateway( ) response = filtered_partial_responses[0] + # JoanFM: to keep the docs_map feature, need to add the routes in the WorkerRuntime but clear it here + # so that routes are properly done. not very clean but refactoring would be costly for such a small + # thing, `docs_map` reuses routes potentially not in the best way but works for now + for i in reversed(range(len(response.routes))): + if response.routes[i].executor != GATEWAY_NAME: + del response.routes[i] request_graph.add_routes(response) if graph.has_filter_conditions: diff --git a/jina/serve/runtimes/head/__init__.py b/jina/serve/runtimes/head/__init__.py index 8607adef74361..3aa81675c8597 100644 --- a/jina/serve/runtimes/head/__init__.py +++ b/jina/serve/runtimes/head/__init__.py @@ -143,7 +143,7 @@ def __init__( self.connection_pool.add_connection( deployment='uses_after', address=self.uses_after_address ) - self._reduce = not args.disable_reduce + self._reduce = not args.no_reduce def _default_polling_dict(self, default_polling): return defaultdict( diff --git a/jina/serve/runtimes/request_handlers/worker_request_handler.py b/jina/serve/runtimes/request_handlers/worker_request_handler.py index b37deb3656a9f..7dd153bc89d6b 100644 --- a/jina/serve/runtimes/request_handlers/worker_request_handler.py +++ b/jina/serve/runtimes/request_handlers/worker_request_handler.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Dict, List, Optional +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple from docarray import DocumentArray @@ -30,6 +30,7 @@ def __init__( metrics_registry: Optional['CollectorRegistry'] = None, tracer_provider: Optional['trace.TracerProvider'] = None, meter_provider: Optional['metrics.MeterProvider'] = None, + deployment_name: str = '', **kwargs, ): """Initialize private parameters and execute private loading functions. @@ -39,11 +40,11 @@ def __init__( :param metrics_registry: optional metrics registry for prometheus used if we need to expose metrics from the executor of from the data request handler :param tracer_provider: Optional tracer_provider that will be provided to the executor for tracing :param meter_provider: Optional meter_provider that will be provided to the executor for metrics + :param deployment_name: name of the deployment to use as Executor name to set in requests :param kwargs: extra keyword arguments """ super().__init__() self.args = args - self.args.parallel = self.args.shards self.logger = logger self._is_closed = False self._load_executor( @@ -57,6 +58,7 @@ def __init__( else None ) self._init_monitoring(metrics_registry, meter) + self.deployment_name = deployment_name def _init_monitoring( self, @@ -285,19 +287,21 @@ async def handle( ) # executor logic + docs_matrix, docs_map = WorkerRequestHandler._get_docs_matrix_from_request(requests) return_data = await self._executor.__acall__( req_endpoint=requests[0].header.exec_endpoint, docs=docs, parameters=params, - docs_matrix=WorkerRequestHandler.get_docs_matrix_from_request( - requests, - field='docs', - ), + docs_matrix=docs_matrix, + docs_map=docs_map, tracing_context=tracing_context, ) docs = self._set_result(requests, return_data, docs) + for req in requests: + req.add_executor(self.deployment_name) + self._record_docs_processed_monitoring(requests, docs) self._record_response_size_monitoring(requests) @@ -346,26 +350,28 @@ def close(self): self._is_closed = True @staticmethod - def get_docs_matrix_from_request( + def _get_docs_matrix_from_request( requests: List['DataRequest'], - field: str, - ) -> List['DocumentArray']: + ) -> Tuple[Optional[List['DocumentArray']], Optional[Dict[str, 'DocumentArray']]]: """ Returns a docs matrix from a list of DataRequest objects. + :param requests: List of DataRequest objects - :param field: field to be retrieved - :return: docs matrix: list of DocumentArray objects + :return: docs matrix and doc: list of DocumentArray objects """ - if len(requests) > 1: - result = [getattr(request, field) for request in requests] - else: - result = [getattr(requests[0], field)] + docs_map = {} + docs_matrix = [] + for req in requests: + docs_matrix.append(req.docs) + docs_map[req.last_executor] = req.docs # to unify all length=0 DocumentArray (or any other results) will simply considered as None # otherwise, the executor has to handle [None, None, None] or [DocArray(0), DocArray(0), DocArray(0)] - len_r = sum(len(r) for r in result) - if len_r: - return result + len_r = sum(len(r) for r in docs_matrix) + if len_r == 0: + docs_matrix = None + + return docs_matrix, docs_map @staticmethod def get_parameters_dict_from_request( @@ -454,8 +460,8 @@ def reduce_requests(requests: List['DataRequest']) -> 'DataRequest': :param requests: List of DataRequest objects :return: the resulting DataRequest """ - docs_matrix = WorkerRequestHandler.get_docs_matrix_from_request( - requests, field='docs' + docs_matrix, _ = WorkerRequestHandler._get_docs_matrix_from_request( + requests ) # Reduction is applied in-place to the first DocumentArray in the matrix diff --git a/jina/serve/runtimes/worker/__init__.py b/jina/serve/runtimes/worker/__init__.py index d50e975218e75..fb3bd51346a93 100644 --- a/jina/serve/runtimes/worker/__init__.py +++ b/jina/serve/runtimes/worker/__init__.py @@ -101,11 +101,12 @@ async def async_setup(self): # otherwise readiness check is not valid # The WorkerRequestHandler needs to be started BEFORE the grpc server self._worker_request_handler = WorkerRequestHandler( - self.args, - self.logger, - self.metrics_registry, - self.tracer_provider, - self.meter_provider, + args=self.args, + logger=self.logger, + metrics_registry=self.metrics_registry, + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + deployment_name=self.name.split('/')[0] ) await self._async_setup_grpc_server() diff --git a/jina/serve/streamer.py b/jina/serve/streamer.py index 01ab948f21464..859cf0ae56da9 100644 --- a/jina/serve/streamer.py +++ b/jina/serve/streamer.py @@ -29,7 +29,7 @@ def __init__( executor_addresses: Dict[str, Union[str, List[str]]], graph_conditions: Dict = {}, deployments_metadata: Dict[str, Dict[str, str]] = {}, - deployments_disable_reduce: List[str] = [], + deployments_no_reduce: List[str] = [], timeout_send: Optional[float] = None, retries: int = 0, compression: Optional[str] = None, @@ -49,7 +49,7 @@ def __init__( :param graph_conditions: Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents. :param deployments_metadata: Dictionary with the metadata of each Deployment. Each executor deployment can have a list of key-value pairs to provide information associated with the request to the deployment. - :param deployments_disable_reduce: list of Executor disabling the built-in merging mechanism. + :param deployments_no_reduce: list of Executor disabling the built-in merging mechanism. :param timeout_send: Timeout to be considered when sending requests to Executors :param retries: Number of retries to try to make successfull sendings to Executors :param compression: The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression. @@ -65,7 +65,7 @@ def __init__( graph_representation, graph_conditions, deployments_metadata, - deployments_disable_reduce, + deployments_no_reduce, timeout_send, retries, ) @@ -99,7 +99,7 @@ def _create_topology_graph( graph_description, graph_conditions, deployments_metadata, - deployments_disable_reduce, + deployments_no_reduce, timeout_send, retries, ): @@ -108,7 +108,7 @@ def _create_topology_graph( graph_representation=graph_description, graph_conditions=graph_conditions, deployments_metadata=deployments_metadata, - deployments_disable_reduce=deployments_disable_reduce, + deployments_no_reduce=deployments_no_reduce, timeout_send=timeout_send, retries=retries, ) diff --git a/jina/types/request/data.py b/jina/types/request/data.py index a21d6314d7f78..8a538b9aa19eb 100644 --- a/jina/types/request/data.py +++ b/jina/types/request/data.py @@ -54,7 +54,7 @@ def docs(self, value: DocumentArray): self.set_docs_convert_arrays(value, None) def set_docs_convert_arrays( - self, value: DocumentArray, ndarray_type: Optional[str] = None + self, value: DocumentArray, ndarray_type: Optional[str] = None ): """ " Convert embedding and tensor to given type, then set DocumentArray @@ -100,8 +100,8 @@ def docs_bytes(self, value: bytes): """ def __init__( - self, - request: Optional[RequestSourceType] = None, + self, + request: Optional[RequestSourceType] = None, ): self.buffer = None self._pb_body = None @@ -159,7 +159,7 @@ def is_decompressed_wo_data(self) -> bool: @property def proto_wo_data( - self, + self, ) -> Union['jina_pb2.DataRequestProtoWoData', 'jina_pb2.DataRequestProto']: """ Transform the current buffer to a :class:`jina_pb2.DataRequestProtoWoData` unless the full proto has already @@ -173,7 +173,7 @@ def proto_wo_data( @property def proto( - self, + self, ) -> Union['jina_pb2.DataRequestProto', 'jina_pb2.DataRequestProtoWoData']: """ Cast ``self`` to a :class:`jina_pb2.DataRequestProto` or a :class:`jina_pb2.DataRequestProto`. Laziness will be broken and serialization will be recomputed when calling. @@ -187,7 +187,7 @@ def proto( @property def proto_with_data( - self, + self, ) -> 'jina_pb2.DataRequestProto': """ Cast ``self`` to a :class:`jina_pb2.DataRequestProto`. Laziness will be broken and serialization will be recomputed when calling. @@ -259,6 +259,7 @@ def data(self) -> 'DataRequest._DataContent': @property def parameters(self) -> Dict: """Return the `parameters` field of this DataRequest as a Python dict + :return: a Python dict view of the parameters. """ # if u get this u need to have it decompressed @@ -267,6 +268,7 @@ def parameters(self) -> Dict: @parameters.setter def parameters(self, value: Dict): """Set the `parameters` field of this Request to a Python dict + :param value: a Python dict """ self.proto_wo_data.parameters.Clear() @@ -290,6 +292,36 @@ def status(self): """ return self.proto_wo_data.header.status + @property + def last_executor(self): + """ + Returns the name of the last Executor that has processed this Request + + :return: the name of the last Executor that processed this Request + """ + if len(self.proto_wo_data.routes) > 0: + return self.proto_wo_data.routes[-1].executor + + def add_executor(self, executor_name: str): + """ + Adds Executor the the request routes + + :param executor_name: name of the Executor processing the Request to be added to the routes + """ + route_proto = jina_pb2.RouteProto() + route_proto.executor = executor_name + print(f' type {type(self.proto_wo_data.routes)}') + self.proto_wo_data.routes.append(route_proto) + + @property + def routes(self): + """ + Returns the routes from the request + + :return: the routes object of this request + """ + return self.proto_wo_data.routes + @property def request_id(self): """ diff --git a/jina_cli/autocomplete.py b/jina_cli/autocomplete.py index f80a7d35c7371..3cfc083f768d9 100644 --- a/jina_cli/autocomplete.py +++ b/jina_cli/autocomplete.py @@ -40,8 +40,8 @@ '--py-modules', '--output-array-type', '--exit-on-exceptions', - '--port-in', - '--host-in', + '--no-reduce', + '--disable-reduce', '--grpc-server-options', '--entrypoint', '--docker-kwargs', @@ -49,6 +49,7 @@ '--gpus', '--disable-auto-volume', '--host', + '--host-in', '--quiet-remote-logs', '--upload-files', '--runtime-cls', @@ -58,6 +59,7 @@ '--pod-role', '--noblock-on-start', '--port', + '--port-in', '--monitoring', '--port-monitoring', '--retries', @@ -75,7 +77,6 @@ '--uses-before-address', '--uses-after-address', '--connection-list', - '--disable-reduce', '--timeout-send', ], 'flow': [ @@ -131,18 +132,18 @@ '--expose-graphql-endpoint', '--protocol', '--host', + '--host-in', '--proxy', '--uses', '--uses-with', '--py-modules', - '--port-in', - '--host-in', '--grpc-server-options', '--port-expose', '--graph-description', '--graph-conditions', '--deployments-addresses', '--deployments-metadata', + '--deployments-no-reduce', '--deployments-disable-reduce', '--compression', '--timeout-send', @@ -153,6 +154,7 @@ '--pod-role', '--noblock-on-start', '--port', + '--port-in', '--monitoring', '--port-monitoring', '--retries', @@ -252,8 +254,8 @@ '--py-modules', '--output-array-type', '--exit-on-exceptions', - '--port-in', - '--host-in', + '--no-reduce', + '--disable-reduce', '--grpc-server-options', '--entrypoint', '--docker-kwargs', @@ -261,6 +263,7 @@ '--gpus', '--disable-auto-volume', '--host', + '--host-in', '--quiet-remote-logs', '--upload-files', '--runtime-cls', @@ -270,6 +273,7 @@ '--pod-role', '--noblock-on-start', '--port', + '--port-in', '--monitoring', '--port-monitoring', '--retries', @@ -287,7 +291,6 @@ '--uses-before-address', '--uses-after-address', '--connection-list', - '--disable-reduce', '--timeout-send', ], 'deployment': [ @@ -312,8 +315,8 @@ '--py-modules', '--output-array-type', '--exit-on-exceptions', - '--port-in', - '--host-in', + '--no-reduce', + '--disable-reduce', '--grpc-server-options', '--entrypoint', '--docker-kwargs', @@ -321,6 +324,7 @@ '--gpus', '--disable-auto-volume', '--host', + '--host-in', '--quiet-remote-logs', '--upload-files', '--runtime-cls', @@ -330,6 +334,7 @@ '--pod-role', '--noblock-on-start', '--port', + '--port-in', '--monitoring', '--port-monitoring', '--retries', @@ -347,7 +352,6 @@ '--uses-before-address', '--uses-after-address', '--connection-list', - '--disable-reduce', '--timeout-send', '--uses-before', '--uses-after', @@ -360,6 +364,7 @@ 'client': [ '--help', '--host', + '--host-in', '--proxy', '--port', '--tls', diff --git a/tests/integration/needs-merge/__init__.py b/tests/integration/needs-merge/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/tests/integration/needs-merge/test_needs_merge.py b/tests/integration/needs-merge/test_needs_merge.py new file mode 100644 index 0000000000000..d8c00e26532b4 --- /dev/null +++ b/tests/integration/needs-merge/test_needs_merge.py @@ -0,0 +1,19 @@ +from jina import Flow, Executor, requests, DocumentArray + + +def test_needs_docs_map(): + class TestMergeDictDocMatrixExecutor(Executor): + + @requests() + def foo(self, docs_map, **kwargs): + assert {'exec0', 'exec1'} == set(docs_map.keys()) + + f = Flow().add(name='exec0'). \ + add(name='exec1', replicas=2, shards=2, needs=['gateway']). \ + add(name='exec2', + needs=['exec0', 'exec1'], + uses=TestMergeDictDocMatrixExecutor, + disable_reduce=True) + + with f: + f.post(on='/', inputs=DocumentArray.empty(2)) diff --git a/tests/unit/orchestrate/deployments/test_deployments.py b/tests/unit/orchestrate/deployments/test_deployments.py index 050bcf0261df8..c795482bd9dc4 100644 --- a/tests/unit/orchestrate/deployments/test_deployments.py +++ b/tests/unit/orchestrate/deployments/test_deployments.py @@ -171,7 +171,7 @@ def foo(self, docs: DocumentArray, **kwargs): @pytest.mark.slow def test_pod_activates_replicas(): - args_list = ['--replicas', '3', '--shards', '2', '--disable-reduce'] + args_list = ['--replicas', '3', '--shards', '2', '--no-reduce'] args = set_deployment_parser().parse_args(args_list) args.uses = 'AppendNameExecutor' with Deployment(args) as pod: diff --git a/tests/unit/serve/runtimes/head/test_head_runtime.py b/tests/unit/serve/runtimes/head/test_head_runtime.py index 4aafe24ccca2b..5a836aa7412f5 100644 --- a/tests/unit/serve/runtimes/head/test_head_runtime.py +++ b/tests/unit/serve/runtimes/head/test_head_runtime.py @@ -47,7 +47,7 @@ def test_message_merging(disable_reduce): if not disable_reduce: args = set_pod_parser().parse_args([]) else: - args = set_pod_parser().parse_args(['--disable-reduce']) + args = set_pod_parser().parse_args(['--no-reduce']) args.polling = PollingType.ALL connection_list_dict = {0: [f'ip1:8080'], 1: [f'ip2:8080'], 2: [f'ip3:8080']} args.connection_list = json.dumps(connection_list_dict) diff --git a/tests/unit/types/request/test_request.py b/tests/unit/types/request/test_request.py index cf1eaa4c59707..0162e117b065b 100644 --- a/tests/unit/types/request/test_request.py +++ b/tests/unit/types/request/test_request.py @@ -295,3 +295,14 @@ def test_proto_wo_data_docs(): # check if we can access the docs after deserial new_data_request = DataRequest(bytes_) assert new_data_request.docs == r.docs + + +def test_req_add_get_executors(): + r = DataRequest() + r.add_executor('one') + assert r.last_executor == 'one' + r.add_executor('two') + assert r.last_executor == 'two' + + r2 = DataRequest.from_proto(r.proto) + assert r2.last_executor == 'two'