Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Search attributes #43

Merged
merged 11 commits into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
# Needed for tests since they use external server
- uses: actions/setup-go@v2
with:
go-version: "1.17"
go-version: "1.18"
- run: python -m pip install --upgrade wheel poetry poethepoet
- run: poetry install --no-root
- run: poe lint
Expand Down Expand Up @@ -97,7 +97,7 @@ jobs:
# Needed for tests since they use external server
- uses: actions/setup-go@v2
with:
go-version: "1.17"
go-version: "1.18"
- run: python -m pip install --upgrade wheel poetry poethepoet
- run: poetry install --no-root
- run: poetry build
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ These steps can be followed to use with a virtual environment and `pip`:

The SDK is now ready for use. To build from source, see "Building" near the end of this documentation.

**NOTE: This README is for the current branch and not necessarily what's released on `PyPI`.**

### Implementing a Workflow

Create the following script at `run_worker.py`:
Expand Down
4 changes: 3 additions & 1 deletion scripts/gen_protos.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ def fix_generated_output(base_path: Path):
# MyPy protobuf does not document this experimental class, see
# https://github.com/nipunn1313/mypy-protobuf/issues/212#issuecomment-885300106
import_suffix = ""
if stem == "service_pb2_grpc" and message == "WorkflowService":
if stem == "service_pb2_grpc" and (
message == "WorkflowService" or message == "OperatorService"
):
import_suffix = " # type: ignore"
f.write(f"from .{stem} import {message}{import_suffix}\n")
message_names.append(message)
Expand Down
2 changes: 1 addition & 1 deletion temporalio/api/operatorservice/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
RemoveSearchAttributesRequest,
RemoveSearchAttributesResponse,
)
from .service_pb2_grpc import OperatorService # type: ignore
from .service_pb2_grpc import (
OperatorService,
OperatorServiceServicer,
OperatorServiceStub,
add_OperatorServiceServicer_to_server,
Expand Down
13 changes: 12 additions & 1 deletion temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Worker using SDK Core."""

from dataclasses import dataclass
from typing import TYPE_CHECKING, Awaitable, Callable, Iterable, List
from typing import TYPE_CHECKING, Awaitable, Callable, Iterable, List, Mapping

import google.protobuf.internal.containers

Expand Down Expand Up @@ -366,3 +366,14 @@ async def encode_completion(
)
for val in command.start_child_workflow_execution.memo.values():
await _encode_bridge_payload(val, codec)


def encode_search_attributes(
attrs: temporalio.common.SearchAttributes,
payloads: Mapping[str, temporalio.bridge.proto.common.Payload],
) -> None:
"""Encode search attributes as bridge payloads."""
for k, vals in attrs.items():
payloads[k].CopyFrom(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this create the key k if it doesn't exist?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do you always make sure it exists before sending to this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a Protobuf oddity. They lazily create everything on first scalar (i.e. non-message) update. See https://developers.google.com/protocol-buffers/docs/reference/python-generated#map-fields. If I tried to do payloads[k] = some_message, I'd get an error because no Python Protobuf code allows assignment to a message.

to_bridge_payload(temporalio.converter.encode_search_attribute_values(vals))
)
29 changes: 14 additions & 15 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ async def start_workflow(
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
Expand All @@ -234,7 +234,7 @@ async def start_workflow(
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
Expand All @@ -259,7 +259,7 @@ async def start_workflow(
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
Expand All @@ -283,7 +283,7 @@ async def start_workflow(
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
Expand All @@ -305,7 +305,7 @@ async def start_workflow(
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
Expand Down Expand Up @@ -388,7 +388,7 @@ async def execute_workflow(
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
Expand All @@ -413,7 +413,7 @@ async def execute_workflow(
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
Expand All @@ -438,7 +438,7 @@ async def execute_workflow(
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
Expand All @@ -462,7 +462,7 @@ async def execute_workflow(
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
Expand All @@ -484,7 +484,7 @@ async def execute_workflow(
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
Expand Down Expand Up @@ -1297,7 +1297,7 @@ class StartWorkflowInput:
retry_policy: Optional[temporalio.common.RetryPolicy]
cron_schedule: str
memo: Optional[Mapping[str, Any]]
search_attributes: Optional[Mapping[str, Any]]
search_attributes: Optional[temporalio.common.SearchAttributes]
header: Optional[Mapping[str, Any]]
start_signal: Optional[str]
start_signal_args: Iterable[Any]
Expand Down Expand Up @@ -1527,10 +1527,9 @@ async def start_workflow(
for k, v in input.memo.items():
req.memo.fields[k] = (await self._client.data_converter.encode([v]))[0]
if input.search_attributes is not None:
for k, v in input.search_attributes.items():
req.search_attributes.indexed_fields[k] = (
await self._client.data_converter.encode([v])
)[0]
temporalio.converter.encode_search_attributes(
input.search_attributes, req.search_attributes
)
if input.header is not None:
for k, v in input.header.items():
req.header.fields[k] = (await self._client.data_converter.encode([v]))[
Expand Down
14 changes: 12 additions & 2 deletions temporalio/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from __future__ import annotations

from dataclasses import dataclass
from datetime import timedelta
from datetime import datetime, timedelta
from enum import IntEnum
from typing import Any, Iterable, Optional
from typing import Any, Iterable, List, Mapping, Optional, Union

from typing_extensions import TypeAlias

import temporalio.api.common.v1
import temporalio.api.enums.v1
Expand Down Expand Up @@ -128,6 +130,14 @@ class QueryRejectCondition(IntEnum):
"""See :py:attr:`temporalio.api.enums.v1.QueryRejectCondition.QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY`."""


SearchAttributeValue: TypeAlias = Union[str, int, float, bool, datetime]

# We choose to make this a list instead of an iterable so we can catch if people
# are not sending lists each time but maybe accidentally sending a string (which
# is iterable)
SearchAttributes: TypeAlias = Mapping[str, List[SearchAttributeValue]]


# Should be set as the "arg" argument for _arg_or_args checks where the argument
# is unset. This is different than None which is a legitimate argument.
_arg_unset = object()
Expand Down
76 changes: 76 additions & 0 deletions temporalio/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple, Type

import dacite
Expand All @@ -16,6 +17,7 @@
import google.protobuf.symbol_database

import temporalio.api.common.v1
import temporalio.common


class PayloadConverter(ABC):
Expand Down Expand Up @@ -594,6 +596,80 @@ def default() -> DataConverter:
return _default


def encode_search_attributes(
attrs: temporalio.common.SearchAttributes,
api: temporalio.api.common.v1.SearchAttributes,
) -> None:
"""Convert search attributes into an API message.

Args:
attrs: Search attributes to convert.
api: API message to set converted attributes on.
"""
for k, v in attrs.items():
api.indexed_fields[k].CopyFrom(encode_search_attribute_values(v))


def encode_search_attribute_values(
vals: List[temporalio.common.SearchAttributeValue],
) -> temporalio.api.common.v1.Payload:
"""Convert search attribute values into a payload.

Args:
vals: List of values to convert.
"""
if not isinstance(vals, list):
raise TypeError("Search attribute values must be lists")
# Confirm all types are the same
val_type: Optional[Type] = None
# Convert dates to strings
safe_vals = []
for v in vals:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should assert that vals is a homogenous list

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also limit this at the type level.
We should have done the same in TS @lorensr (still not too late).

type SearchAttributeValueArray = string[] | number[] | boolean[] | Date[];
type SearchAttributes = Record<string, SearchAttributeVAlueArray>;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, will do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, will do

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid that Union[List[str], List[int], List[float], List[bool], List[datetime]] doesn't work as we'd like. You can't assign an empty list to it for example (this is a known MyPy and Python typing issue, see issue 3283 in https://github.com/python/mypy). We're gonna have to keep the types as is and only do runtime checks.

if isinstance(v, datetime):
if v.tzinfo is None:
raise ValueError(
"Timezone must be present on all search attribute dates"
)
v = v.isoformat()
elif not isinstance(v, (str, int, float, bool)):
raise TypeError(
f"Search attribute value of type {type(v).__name__} not one of str, int, float, bool, or datetime"
)
elif val_type and type(v) is not val_type:
raise TypeError(
f"Search attribute values must have the same type for the same key"
)
elif not val_type:
val_type = type(v)
safe_vals.append(v)
return default().payload_converter.to_payloads([safe_vals])[0]


def decode_search_attributes(
api: temporalio.api.common.v1.SearchAttributes,
) -> temporalio.common.SearchAttributes:
"""Decode API search attributes to values.

Args:
api: API message with search attribute values to convert.

Returns:
Converted search attribute values.
"""
conv = default().payload_converter
ret = {}
for k, v in api.indexed_fields.items():
val = conv.from_payloads([v])[0]
# If a value did not come back as a list, make it a single-item list
if not isinstance(val, list):
val = [val]
# Convert each item to datetime if necessary
if v.metadata.get("type") == b"Datetime":
val = [datetime.fromisoformat(v) for v in val]
ret[k] = val
return ret


class _FunctionTypeLookup:
def __init__(self, type_hint_eval_str: bool) -> None:
# Keyed by callable __qualname__, value is optional arg types and
Expand Down
4 changes: 2 additions & 2 deletions temporalio/worker/interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class ContinueAsNewInput:
run_timeout: Optional[timedelta]
task_timeout: Optional[timedelta]
memo: Optional[Mapping[str, Any]]
search_attributes: Optional[Mapping[str, Any]]
search_attributes: Optional[temporalio.common.SearchAttributes]
# The types may be absent
arg_types: Optional[List[Type]]

Expand Down Expand Up @@ -203,7 +203,7 @@ class StartChildWorkflowInput:
retry_policy: Optional[temporalio.common.RetryPolicy]
cron_schedule: str
memo: Optional[Mapping[str, Any]]
search_attributes: Optional[Mapping[str, Any]]
search_attributes: Optional[temporalio.common.SearchAttributes]
# The types may be absent
arg_types: Optional[List[Type]]
ret_type: Optional[Type]
Expand Down
3 changes: 3 additions & 0 deletions temporalio/worker/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ async def _create_workflow_instance(
run_timeout=start.workflow_run_timeout.ToTimedelta()
if start.HasField("workflow_run_timeout")
else None,
search_attributes=temporalio.converter.decode_search_attributes(
start.search_attributes
),
start_time=act.timestamp.ToDatetime().replace(tzinfo=timezone.utc),
task_queue=self._task_queue,
task_timeout=start.workflow_task_timeout.ToTimedelta(),
Expand Down
Loading