Skip to content

Commit

Permalink
feat: fix aliases of arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Nov 10, 2022
1 parent 6426d19 commit 1451769
Show file tree
Hide file tree
Showing 16 changed files with 41 additions and 51 deletions.
10 changes: 5 additions & 5 deletions docs/fundamentals/executor/executor-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ any other `list`-like object in a Python function.

- `parameters`: A Dict object that passes extra parameters to Executor functions.

- `docs_matrix`: This is one of the least common parameters to be used for an Executor. It is passed when multiple parallel branches lead into the Executor, and `disable_reduce=True` is set. Each DocumentArray in the matrix is the output of one previous Executor.
- `docs_matrix`: This is one of the least common parameters to be used for an Executor. It is passed when multiple parallel branches lead into the Executor, and `no_reduce=True` is set. Each DocumentArray in the matrix is the output of one previous Executor.

- `docs_map`: This is also one of the least common parameter to be used for an Executor. It has the same utility as `docs_matrix` but the information comes as a dict with previous Executor names as keys, and DocumentArrays as values.

Expand Down Expand Up @@ -166,8 +166,8 @@ You have seen that {class}`~jina.Executor` methods can receive multiple paramete

One case is when an Executor receives messages from more than one incoming {class}`~jina.Executor` in the {class}`~jina.Flow`:

If you set `disable_reduce` to True and the Executor has more than one incoming Executor, the Executor will receive all the DocumentArrays coming from previous Executors independently under `docs_matrix` and `docs_map`.
If `disable_reduce` is not set or set to False, `docs_map` and `docs_matrix` will be None and the Executor will receive a single DocumentArray resulting from the reducing of all the incoming ones.
If you set `no_reduce` to True and the Executor has more than one incoming Executor, the Executor will receive all the DocumentArrays coming from previous Executors independently under `docs_matrix` and `docs_map`.
If `no_reduce` is not set or set to False, `docs_map` and `docs_matrix` will be None and the Executor will receive a single DocumentArray resulting from the reducing of all the incoming ones.

```python
from jina import Flow, Executor, requests, Document, DocumentArray
Expand Down Expand Up @@ -205,7 +205,7 @@ f = (
Flow()
.add(uses=Exec1, name='exec1')
.add(uses=Exec2, name='exec2')
.add(uses=MergeExec, needs=['exec1', 'exec2'], disable_reduce=True)
.add(uses=MergeExec, needs=['exec1', 'exec2'], no_reduce=True)
)

with f:
Expand Down Expand Up @@ -258,7 +258,7 @@ f = (
Flow()
.add(uses=Exec1, name='exec1')
.add(uses=Exec2, name='exec2')
.add(uses=MergeExec, needs=['exec1', 'exec2'], disable_reduce=True)
.add(uses=MergeExec, needs=['exec1', 'exec2'], no_reduce=True)
)

with f:
Expand Down
2 changes: 1 addition & 1 deletion docs/fundamentals/flow/executor-args.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
| `py_modules` | The customized python modules need to be imported before loading the executor<br><br>Note that the recommended way is to only import a single module - a simple python file, if your<br>executor can be defined in a single file, or an ``__init__.py`` file if you have multiple files,<br>which should be structured as a python package. For more details, please see the<br>`Executor cookbook <https://docs.jina.ai/fundamentals/executor/executor-files/>`__ | `array` | `None` |
| `output_array_type` | The type of array `tensor` and `embedding` will be serialized to.<br><br>Supports the same types as `docarray.to_protobuf(.., ndarray_type=...)`, which can be found <br>`here <https://docarray.jina.ai/fundamentals/document/serialization/#from-to-protobuf>`.<br>Defaults to retaining whatever type is returned by the Executor. | `string` | `None` |
| `exit_on_exceptions` | List of exceptions that will cause the Executor to shut down. | `array` | `[]` |
| `disable_reduce` | Disable the built-in reduction mechanism. Set this if the reduction is to be handled by the Executor itself by operating on a `docs_matrix` or `docs_map` | `boolean` | `False` |
| `no_reduce` | Disable the built-in reduction mechanism. Set this if the reduction is to be handled by the Executor itself by operating on a `docs_matrix` or `docs_map` | `boolean` | `False` |
| `port` | The port for input data to bind to, default is a random port between [49152, 65535]. In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. | `string` | `random in [49152, 65535]` |
| `host_in` | The host address for binding to, by default it is 0.0.0.0 | `string` | `0.0.0.0` |
| `grpc_server_options` | Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} | `object` | `None` |
Expand Down
2 changes: 1 addition & 1 deletion docs/fundamentals/flow/gateway-args.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
| `graph_conditions` | Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents. | `string` | `{}` |
| `deployments_addresses` | JSON dictionary with the input addresses of each Deployment | `string` | `{}` |
| `deployments_metadata` | JSON dictionary with the request metadata for each Deployment | `string` | `{}` |
| `deployments_disable_reduce` | list JSON disabling the built-in merging mechanism for each Deployment listed | `string` | `[]` |
| `deployments_no_reduce` | list JSON disabling the built-in merging mechanism for each Deployment listed | `string` | `[]` |
| `compression` | The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression. | `string` | `None` |
| `timeout_send` | The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default | `number` | `None` |
| `runtime_cls` | The runtime class to run inside the Pod | `string` | `GatewayRuntime` |
Expand Down
2 changes: 1 addition & 1 deletion docs/fundamentals/flow/topologies.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ This gives the output:
```

Both `BarExecutor` and `BazExecutor` only received a single `Document` from `FooExecutor` because they are run in parallel. The last Executor `executor3` receives both DocumentArrays and merges them automatically.
This automated merging can be disabled with `disable_reduce=True`. This is useful for providing custom merge logic in a separate Executor. In this case the last `.add()` call would look like `.add(needs=['barExecutor', 'bazExecutor'], uses=CustomMergeExecutor, disable_reduce=True)`. This feature requires Jina >= 3.0.2.
This automated merging can be disabled with `no_reduce=True`. This is useful for providing custom merge logic in a separate Executor. In this case the last `.add()` call would look like `.add(needs=['barExecutor', 'bazExecutor'], uses=CustomMergeExecutor, no_reduce=True)`. This feature requires Jina >= 3.0.2.

(replicate-executors)=
## Replicate Executors
Expand Down
2 changes: 1 addition & 1 deletion docs/how-to/external-executor.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ This connects to `grpc://localhost:12345` and `grpc://91.198.174.192:12346` as t

````{admonition} Reducing
:class: hint
If an external Executor needs multiple predecessors, reducing needs to be enabled. So setting disable_reduce=True is not allowed for these cases.
If an external Executor needs multiple predecessors, reducing needs to be enabled. So setting no_reduce=True is not allowed for these cases.
````


Expand Down
28 changes: 14 additions & 14 deletions jina/orchestrate/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def __init__(
compression: Optional[str] = None,
cors: Optional[bool] = False,
deployments_addresses: Optional[str] = '{}',
deployments_disable_reduce: Optional[str] = '[]',
deployments_no_reduce: Optional[str] = '[]',
deployments_metadata: Optional[str] = '{}',
description: Optional[str] = None,
docker_kwargs: Optional[dict] = None,
Expand Down Expand Up @@ -211,7 +211,7 @@ def __init__(
:param compression: The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.
:param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.
:param deployments_addresses: JSON dictionary with the input addresses of each Deployment
:param deployments_disable_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed
:param deployments_no_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed
:param deployments_metadata: JSON dictionary with the request metadata for each Deployment
:param description: The description of this HTTP server. It will be used in automatics docs such as Swagger UI.
:param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker '
Expand Down Expand Up @@ -383,7 +383,7 @@ def __init__(
:param compression: The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.
:param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.
:param deployments_addresses: JSON dictionary with the input addresses of each Deployment
:param deployments_disable_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed
:param deployments_no_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed
:param deployments_metadata: JSON dictionary with the request metadata for each Deployment
:param description: The description of this HTTP server. It will be used in automatics docs such as Swagger UI.
:param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker '
Expand Down Expand Up @@ -600,7 +600,7 @@ def _add_gateway(
deployments_addresses: Dict[str, List[str]],
deployments_metadata: Dict[str, Dict[str, str]],
graph_conditions: Dict[str, Dict],
deployments_disable_reduce: List[str],
deployments_no_reduce: List[str],
**kwargs,
):
kwargs.update(
Expand Down Expand Up @@ -628,7 +628,7 @@ def _add_gateway(
args.graph_conditions = json.dumps(graph_conditions)
args.deployments_addresses = json.dumps(deployments_addresses)
args.deployments_metadata = json.dumps(deployments_metadata)
args.deployments_disable_reduce = json.dumps(deployments_disable_reduce)
args.deployments_no_reduce = json.dumps(deployments_no_reduce)
self._deployment_nodes[GATEWAY_NAME] = Deployment(args, needs)

def _get_deployments_metadata(self) -> Dict[str, Dict[str, str]]:
Expand Down Expand Up @@ -752,7 +752,7 @@ def _get_graph_conditions(self) -> Dict[str, Dict]:
def _get_disabled_reduce_deployments(self) -> List[str]:
disabled_deployments = []
for node, v in self._deployment_nodes.items():
if v.args.disable_reduce:
if v.args.no_reduce:
disabled_deployments.append(node)

return disabled_deployments
Expand Down Expand Up @@ -833,7 +833,7 @@ def add(
compression: Optional[str] = None,
connection_list: Optional[str] = None,
disable_auto_volume: Optional[bool] = False,
disable_reduce: Optional[bool] = False,
no_reduce: Optional[bool] = False,
docker_kwargs: Optional[dict] = None,
entrypoint: Optional[str] = None,
env: Optional[dict] = None,
Expand Down Expand Up @@ -892,7 +892,7 @@ def add(
:param compression: The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.
:param connection_list: dictionary JSON with a list of connections to configure
:param disable_auto_volume: Do not automatically mount a volume for dockerized Executors.
:param disable_reduce: Disable the built-in reduction mechanism. Set this if the reduction is to be handled by the Executor itself by operating on a `docs_matrix` or `docs_map`
:param no_reduce: Disable the built-in reduction mechanism. Set this if the reduction is to be handled by the Executor itself by operating on a `docs_matrix` or `docs_map`
:param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker '
container.
Expand Down Expand Up @@ -1048,7 +1048,7 @@ def add(
:param compression: The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.
:param connection_list: dictionary JSON with a list of connections to configure
:param disable_auto_volume: Do not automatically mount a volume for dockerized Executors.
:param disable_reduce: Disable the built-in reduction mechanism. Set this if the reduction is to be handled by the Executor itself by operating on a `docs_matrix` or `docs_map`
:param no_reduce: Disable the built-in reduction mechanism. Set this if the reduction is to be handled by the Executor itself by operating on a `docs_matrix` or `docs_map`
:param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker '
container.
Expand Down Expand Up @@ -1236,7 +1236,7 @@ def add(
port = helper.random_port()
args.port = port

if len(needs) > 1 and args.external and args.disable_reduce:
if len(needs) > 1 and args.external and args.no_reduce:
raise ValueError(
'External Executors with multiple needs have to do auto reduce.'
)
Expand All @@ -1256,7 +1256,7 @@ def config_gateway(
compression: Optional[str] = None,
cors: Optional[bool] = False,
deployments_addresses: Optional[str] = '{}',
deployments_disable_reduce: Optional[str] = '[]',
deployments_no_reduce: Optional[str] = '[]',
deployments_metadata: Optional[str] = '{}',
description: Optional[str] = None,
docker_kwargs: Optional[dict] = None,
Expand Down Expand Up @@ -1308,7 +1308,7 @@ def config_gateway(
:param compression: The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.
:param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.
:param deployments_addresses: JSON dictionary with the input addresses of each Deployment
:param deployments_disable_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed
:param deployments_no_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed
:param deployments_metadata: JSON dictionary with the request metadata for each Deployment
:param description: The description of this HTTP server. It will be used in automatics docs such as Swagger UI.
:param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker '
Expand Down Expand Up @@ -1404,7 +1404,7 @@ def config_gateway(
:param compression: The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.
:param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.
:param deployments_addresses: JSON dictionary with the input addresses of each Deployment
:param deployments_disable_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed
:param deployments_no_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed
:param deployments_metadata: JSON dictionary with the request metadata for each Deployment
:param description: The description of this HTTP server. It will be used in automatics docs such as Swagger UI.
:param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker '
Expand Down Expand Up @@ -1661,7 +1661,7 @@ def build(
deployments_addresses=op_flow._get_deployments_addresses(),
deployments_metadata=op_flow._get_deployments_metadata(),
graph_conditions=op_flow._get_graph_conditions(),
deployments_disable_reduce=op_flow._get_disabled_reduce_deployments(),
deployments_no_reduce=op_flow._get_disabled_reduce_deployments(),
uses=op_flow.gateway_args.uses,
)

Expand Down
4 changes: 3 additions & 1 deletion jina/parsers/orchestrate/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,12 @@ def mixin_pod_parser(parser, pod_type: str = 'worker'):

gp.add_argument(
'--port',
'--port-in',
type=str,
default=str(helper.random_port()),
help='The port for input data to bind to, default is a random port between [49152, 65535].'
' In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas.'
'In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, '
'separated by commas. '
' Then, every resulting address will be considered as one replica of the Executor.',
)

Expand Down
2 changes: 2 additions & 0 deletions jina/parsers/orchestrate/runtimes/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def mixin_gateway_parser(parser):
)

parser.add_argument(
'--deployments-no-reduce',
'--deployments-disable-reduce',
type=str,
help='list JSON disabling the built-in merging mechanism for each Deployment listed',
Expand All @@ -150,6 +151,7 @@ def mixin_gateway_parser(parser):
def _add_host(arg_group):
arg_group.add_argument(
'--host',
'--host-in',
type=str,
default=__default_host__,
help=f'The host address of the runtime, by default it is {__default_host__}.'
Expand Down
15 changes: 0 additions & 15 deletions jina/parsers/orchestrate/runtimes/runtime.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Argparser module for WorkerRuntime"""

from jina import __default_host__, helper
from jina.parsers.helper import KVAppendAction


Expand All @@ -9,20 +8,6 @@ def mixin_base_runtime_parser(arg_group):
:param arg_group: the parser instance to which we add arguments
"""

arg_group.add_argument(
'--port-in',
type=int,
default=helper.random_port(),
dest='port',
help='The port for input data to bind to, default a random port between [49152, 65535]',
)
arg_group.add_argument(
'--host-in',
type=str,
default=__default_host__,
help=f'The host address for binding to, by default it is {__default_host__}',
)

arg_group.add_argument(
'--grpc-server-options',
action=KVAppendAction,
Expand Down
1 change: 1 addition & 0 deletions jina/parsers/orchestrate/runtimes/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def mixin_worker_runtime_parser(parser):
)

gp.add_argument(
'--no-reduce',
'--disable-reduce',
action='store_true',
default=False,
Expand Down
4 changes: 2 additions & 2 deletions jina/serve/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,14 @@ def inject_dependencies(
graph_conditions = json.loads(args.graph_conditions)
deployments_addresses = json.loads(args.deployments_addresses)
deployments_metadata = json.loads(args.deployments_metadata)
deployments_disable_reduce = json.loads(args.deployments_disable_reduce)
deployments_no_reduce = json.loads(args.deployments_no_reduce)

self.streamer = GatewayStreamer(
graph_representation=graph_description,
executor_addresses=deployments_addresses,
graph_conditions=graph_conditions,
deployments_metadata=deployments_metadata,
deployments_disable_reduce=deployments_disable_reduce,
deployments_no_reduce=deployments_no_reduce,
timeout_send=timeout_send,
retries=args.retries,
compression=args.compression,
Expand Down
4 changes: 2 additions & 2 deletions jina/serve/runtimes/gateway/graph/topology_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def __init__(
graph_representation: Dict,
graph_conditions: Dict = {},
deployments_metadata: Dict = {},
deployments_disable_reduce: List[str] = [],
deployments_no_reduce: List[str] = [],
timeout_send: Optional[float] = 1.0,
retries: Optional[int] = -1,
*args,
Expand Down Expand Up @@ -363,7 +363,7 @@ def __init__(
floating=node_name in floating_deployment_set,
filter_condition=condition,
metadata=metadata,
reduce=node_name not in deployments_disable_reduce,
reduce=node_name not in deployments_no_reduce,
timeout_send=timeout_send,
retries=retries,
)
Expand Down
Loading

0 comments on commit 1451769

Please sign in to comment.