Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pass docs_map to Executor #5366

Merged
merged 21 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a56c040
feat: docs_matrix passed as dictionary
JoanFM Nov 8, 2022
e8b2500
Merge branch 'master' into force-deterministic-order
JoanFM Nov 8, 2022
71d6228
feat: add option to have docs as dictionary
JoanFM Nov 8, 2022
a7db021
style: fix overload and cli autocomplete
jina-bot Nov 8, 2022
97771a7
docs: add documentation about docs_matrix as dictionary
JoanFM Nov 8, 2022
a4ea275
Merge branch 'force-deterministic-order' of https://github.com/jina-a…
JoanFM Nov 8, 2022
b09b554
feat: add both docs_map and docs_matrix
JoanFM Nov 9, 2022
930881d
style: fix overload and cli autocomplete
jina-bot Nov 9, 2022
f55d87f
docs: apply suggestions from code review
JoanFM Nov 9, 2022
af98d92
style: fix overload and cli autocomplete
jina-bot Nov 9, 2022
7c18ccc
Merge branch 'master' into force-deterministic-order
JoanFM Nov 10, 2022
7e3bff3
fix: add executor before measuring sent measure
JoanFM Nov 10, 2022
50b3777
Merge branch 'master' into force-deterministic-order
JoanFM Nov 10, 2022
b2754d8
docs: apply suggestions from code review
JoanFM Nov 10, 2022
6426d19
style: fix overload and cli autocomplete
jina-bot Nov 10, 2022
1451769
feat: fix aliases of arguments
JoanFM Nov 10, 2022
4c3215f
style: fix overload and cli autocomplete
jina-bot Nov 10, 2022
d28e4a0
Merge branch 'master' into force-deterministic-order
JoanFM Nov 10, 2022
dc09dbc
refactor: refactor docs_map and docs_matrix creation
JoanFM Nov 10, 2022
87c36ff
feat: update deprecated map
JoanFM Nov 10, 2022
e1eec52
fix: fix test with routes
JoanFM Nov 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 108 additions & 41 deletions docs/fundamentals/executor/executor-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,14 @@ with f:
```

```shell
Flow@18048[I]:🎉 Flow is ready to use!
🔗 Protocol: GRPC
🏠 Local access: 0.0.0.0:52255
🔒 Private network: 192.168.1.187:52255
🌐 Public address: 212.231.186.65:52255
────────────────────────── 🎉 Flow is ready to serve! ──────────────────────────
╭────────────── 🔗 Endpoint ───────────────╮
│ ⛓ Protocol GRPC │
│ 🏠 Local 0.0.0.0:55925 │
│ 🔒 Private 192.168.1.187:55925 │
│ 🌍 Public 212.231.186.65:55925 │
╰──────────────────────────────────────────╯

Calling foo
Calling bar
Calling foo
Expand Down Expand Up @@ -93,20 +96,20 @@ All Executor methods decorated by `@requests` need to follow the signature below
The `async` definition is optional.

```python
from typing import Dict, Union, List
from typing import Dict, Union, List, Optional
from jina import Executor, requests, DocumentArray


class MyExecutor(Executor):
@requests
async def foo(
self, docs: DocumentArray, parameters: Dict, docs_matrix: List[DocumentArray]
self, docs: DocumentArray, parameters: Dict, docs_matrix: Optional[List[DocumentArray]], docs_map: Optional[Dict[str, DocumentArray]]
) -> Union[DocumentArray, Dict, None]:
pass

@requests
def bar(
self, docs: DocumentArray, parameters: Dict, docs_matrix: List[DocumentArray]
self, docs: DocumentArray, parameters: Dict, docs_matrix: Optional[List[DocumentArray]], docs_map: Optional[Dict[str, DocumentArray]]
) -> Union[DocumentArray, Dict, None]:
pass
```
Expand All @@ -118,10 +121,11 @@ any other `list`-like object in a Python function.

- `parameters`: A Dict object that passes extra parameters to Executor functions.

- `docs_matrix`: This is the least common parameter to be used for an Executor. This is needed when an Executor is used inside a Flow to merge or reduce the output of more than one other Executor.

- `docs_matrix`: This is one of the least common parameter to be used for an Executor. This is needed when an Executor is used inside a Flow to merge or reduce the output of more than one other Executor.
JoanFM marked this conversation as resolved.
Show resolved Hide resolved

- `docs_map`: This is one of the least common parameter to be used for an Executor. It has the same utility as `docs_matrix` but the information comes as a dict with previous Executor names as keys. If onl
JoanFM marked this conversation as resolved.
Show resolved Hide resolved
JoanFM marked this conversation as resolved.
Show resolved Hide resolved

- `tracing_context`: Context needed if you want to add custom traces. Check {ref}`how to add custom traces in your Executor <instrumenting-executor>`
JoanFM marked this conversation as resolved.
Show resolved Hide resolved


````{admonition} Hint
Expand Down Expand Up @@ -156,11 +160,14 @@ class MyExecutor(Executor):

### Multiple DocumentArrays as input argument

You have seen that `Executor` methods can receive three types of parameters: `docs`, `parameters` and `docs_matrix`.
You have seen that {class}`~jina.Executor` methods can receive multiple parameters.

`docs_matrix` and `docs_map` are only used in some special cases.

`docs_matrix` is only used in some special cases.
One case is when an Executor receives messages from more than one upstream {class}`~jina.Executor` in the {class}`~jina.Flow`:
JoanFM marked this conversation as resolved.
Show resolved Hide resolved

One case is when an Executor receives messages from more than one upstream Executor in the Flow:
If you set `disable_reduce` to True and the Executor has more than one incoming Executor, the Executor will receive all the DocumentArrays coming from previous Executors independently under `docs_matrix` and `docs_map`.
If `disable_reduce` is not set or set to False, `docs_map` and `docs_matrix` will be None and the Executor will receive a single DocumentArray resulting from the reducing of all the incoming ones.

```python
from jina import Flow, Executor, requests, Document, DocumentArray
Expand Down Expand Up @@ -202,22 +209,75 @@ f = (
)

with f:
returned_docs = f.post(on='/', Document())
returned_docs = f.post(on='/', inputs=Document())

print(f'Resulting documents {returned_docs[0].text}')
```


```shell
Flow@1244[I]:🎉 Flow is ready to use!
🔗 Protocol: GRPC
🏠 Local access: 0.0.0.0:54550
🔒 Private network: 192.168.1.187:54550
🌐 Public address: 212.231.186.65:54550
────────────────────────── 🎉 Flow is ready to serve! ──────────────────────────
╭────────────── 🔗 Endpoint ───────────────╮
│ ⛓ Protocol GRPC │
│ 🏠 Local 0.0.0.0:55761 │
│ 🔒 Private 192.168.1.187:55761 │
│ 🌍 Public 212.231.186.65:55761 │
╰──────────────────────────────────────────╯

MergeExec processing pairs of Documents "Exec1" and "Exec2"
Resulting documents Document merging from "Exec1" and "Exec2"
```

When merging Documents from more than one upstream {class}`~jina.Executor`, sometimes you want to control which Documents come from which Executor.
Executor will receive the `docs_map` as a dictionary where the key will be the last Executor processing that previous request and the DocumentArray of the request as the values.

```python
from jina import Flow, Executor, requests, Document


class Exec1(Executor):
@requests
def foo(self, docs, **kwargs):
for doc in docs:
doc.text = 'Exec1'


class Exec2(Executor):
@requests
def foo(self, docs, **kwargs):
for doc in docs:
doc.text = 'Exec2'


class MergeExec(Executor):
@requests
def foo(self, docs_map, **kwargs):
print(docs_map)

f = (
Flow()
.add(uses=Exec1, name='exec1')
.add(uses=Exec2, name='exec2')
.add(uses=MergeExec, needs=['exec1', 'exec2'], disable_reduce=True)
)

with f:
f.post(on='/', Document())
```


```shell
────────────────────────── 🎉 Flow is ready to serve! ──────────────────────────
╭────────────── 🔗 Endpoint ───────────────╮
│ ⛓ Protocol GRPC │
│ 🏠 Local 0.0.0.0:56286 │
│ 🔒 Private 192.168.1.187:56286 │
│ 🌍 Public 212.231.186.65:56286 │
╰──────────────────────────────────────────╯

{'exec1': <DocumentArray (length=1) at 140270975034640>, 'exec2': <DocumentArray (length=1) at 140270975034448>}
```

(async-executors)=
## Async coroutines

Expand All @@ -227,7 +287,7 @@ Python to write concurrent code.


```python
from jina import Executor, requests, Flow
from jina import Executor, requests


class MyExecutor(Executor):
Expand All @@ -242,7 +302,6 @@ This example has a heavy lifting API which we call several times, and we leverag
async Python features to speed up the {class}`~jina.Executor`'s call by calling the API multiple times concurrently. As a counterpart, in an example without `coroutines`, all 50 API calls are queued and nothing is done concurrently.



````{tab} Async coroutines
```python
import asyncio
Expand All @@ -269,12 +328,16 @@ with f:
```

```shell
Flow@20588[I]:🎉 Flow is ready to use!
🔗 Protocol: GRPC
🏠 Local access: 0.0.0.0:62598
🔒 Private network: 192.168.1.187:62598
🌐 Public address: 212.231.186.65:62598
⠙ DONE ━╸━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:00:01 100% ETA: 0 seconds 41 steps done in 1 second
────────────────────────── 🎉 Flow is ready to serve! ──────────────────────────
╭────────────── 🔗 Endpoint ───────────────╮
│ ⛓ Protocol GRPC │
│ 🏠 Local 0.0.0.0:54153 │
│ 🔒 Private 192.168.1.187:54153 │
│ 🌍 Public 212.231.186.65:54153 │
╰──────────────────────────────────────────╯

DONE ━━━━━━━━━━━━━━━━━━━━━━ 0:00:01 100% ETA: 0:00:00 50 steps done in 1
second
```

````
Expand Down Expand Up @@ -305,12 +368,16 @@ with f:
```

```shell
Flow@20394[I]:🎉 Flow is ready to use!
🔗 Protocol: GRPC
🏠 Local access: 0.0.0.0:52592
🔒 Private network: 192.168.1.187:52592
🌐 Public address: 212.231.186.65:52592
⠏ DONE ━╸━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:00:50 100% ETA: 0 seconds 41 steps done in 50 seconds
────────────────────────── 🎉 Flow is ready to serve! ──────────────────────────
╭────────────── 🔗 Endpoint ───────────────╮
│ ⛓ Protocol GRPC │
│ 🏠 Local 0.0.0.0:52340 │
│ 🔒 Private 192.168.1.187:52340 │
│ 🌍 Public 212.231.186.65:52340 │
╰──────────────────────────────────────────╯

DONE ━━━━━━━━━━━━━━━━━━━━━━ 0:00:50 100% ETA: 0:00:00 50 steps done in 50
seconds
```
````

Expand All @@ -335,9 +402,6 @@ class DummyExecutor(Executor):
```





## Returns

Every Executor method can `return` in three ways:
Expand Down Expand Up @@ -457,11 +521,14 @@ with f:


```shell
Flow@23300[I]:🎉 Flow is ready to use!
🔗 Protocol: GRPC
🏠 Local access: 0.0.0.0:61855
🔒 Private network: 192.168.1.187:61855
🌐 Public address: 212.231.186.65:61855
────────────────────────── 🎉 Flow is ready to serve! ──────────────────────────
╭────────────── 🔗 Endpoint ───────────────╮
│ ⛓ Protocol GRPC │
│ 🏠 Local 0.0.0.0:58746 │
│ 🔒 Private 192.168.1.187:58746 │
│ 🌍 Public 212.231.186.65:58746 │
╰──────────────────────────────────────────╯

ProcessDocuments: received document with text "request1"
PrintExecutor: received document with text: "I changed the executor in place"
ProcessDocuments: received document with text: "request2"
Expand Down
2 changes: 1 addition & 1 deletion docs/fundamentals/flow/executor-args.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
| `py_modules` | The customized python modules need to be imported before loading the executor<br><br>Note that the recommended way is to only import a single module - a simple python file, if your<br>executor can be defined in a single file, or an ``__init__.py`` file if you have multiple files,<br>which should be structured as a python package. For more details, please see the<br>`Executor cookbook <https://docs.jina.ai/fundamentals/executor/executor-files/>`__ | `array` | `None` |
| `output_array_type` | The type of array `tensor` and `embedding` will be serialized to.<br><br>Supports the same types as `docarray.to_protobuf(.., ndarray_type=...)`, which can be found <br>`here <https://docarray.jina.ai/fundamentals/document/serialization/#from-to-protobuf>`.<br>Defaults to retaining whatever type is returned by the Executor. | `string` | `None` |
| `exit_on_exceptions` | List of exceptions that will cause the Executor to shut down. | `array` | `[]` |
| `disable_reduce` | Disable the built-in reduction mechanism. Set this if the reduction is to be handled by the Executor itself by operating on a `docs_matrix` | `boolean` | `False` |
| `port` | The port for input data to bind to, default is a random port between [49152, 65535]. In the case of an external Executor (`--external` or `external=True`) this can be a list of ports, separated by commas. Then, every resulting address will be considered as one replica of the Executor. | `string` | `random in [49152, 65535]` |
| `host_in` | The host address for binding to, by default it is 0.0.0.0 | `string` | `0.0.0.0` |
| `grpc_server_options` | Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {'grpc.max_send_message_length': -1} | `object` | `None` |
Expand Down Expand Up @@ -47,5 +48,4 @@
| `uses_before_address` | The address of the uses-before runtime | `string` | `None` |
| `uses_after_address` | The address of the uses-before runtime | `string` | `None` |
| `connection_list` | dictionary JSON with a list of connections to configure | `string` | `None` |
| `disable_reduce` | Disable the built-in reduce mechanism, set this if the reduction is to be handled by the Executor connected to this Head | `boolean` | `False` |
| `timeout_send` | The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default | `number` | `None` |
4 changes: 2 additions & 2 deletions jina/orchestrate/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ def add(
:param compression: The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.
:param connection_list: dictionary JSON with a list of connections to configure
:param disable_auto_volume: Do not automatically mount a volume for dockerized Executors.
:param disable_reduce: Disable the built-in reduce mechanism, set this if the reduction is to be handled by the Executor connected to this Head
:param disable_reduce: Disable the built-in reduction mechanism. Set this if the reduction is to be handled by the Executor itself by operating on a `docs_matrix`
JoanFM marked this conversation as resolved.
Show resolved Hide resolved
:param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker '
container.

Expand Down Expand Up @@ -1048,7 +1048,7 @@ def add(
:param compression: The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.
:param connection_list: dictionary JSON with a list of connections to configure
:param disable_auto_volume: Do not automatically mount a volume for dockerized Executors.
:param disable_reduce: Disable the built-in reduce mechanism, set this if the reduction is to be handled by the Executor connected to this Head
:param disable_reduce: Disable the built-in reduction mechanism. Set this if the reduction is to be handled by the Executor itself by operating on a `docs_matrix`
:param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker '
container.

Expand Down
7 changes: 0 additions & 7 deletions jina/parsers/orchestrate/runtimes/head.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ def mixin_head_parser(parser):
help='dictionary JSON with a list of connections to configure',
)

gp.add_argument(
'--disable-reduce',
action='store_true',
default=False,
help='Disable the built-in reduce mechanism, set this if the reduction is to be handled by the Executor connected to this Head',
)

gp.add_argument(
'--timeout-send',
type=int,
Expand Down
8 changes: 8 additions & 0 deletions jina/parsers/orchestrate/runtimes/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,12 @@ def mixin_worker_runtime_parser(parser):
nargs='*',
help='List of exceptions that will cause the Executor to shut down.',
)

gp.add_argument(
'--disable-reduce',
action='store_true',
default=False,
help='Disable the built-in reduction mechanism. Set this if the reduction is to be handled by the Executor itself by operating on a `docs_matrix`',
JoanFM marked this conversation as resolved.
Show resolved Hide resolved
)

JoanFM marked this conversation as resolved.
Show resolved Hide resolved
mixin_base_runtime_parser(gp)
Loading