diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c61aff4..1a677f4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,7 +4,7 @@ exclude: ^iwf/iwf_api/ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v2.4.0 + rev: v5.0.0 hooks: - id: check-ast - id: trailing-whitespace @@ -12,7 +12,7 @@ repos: - id: end-of-file-fixer - repo: https://github.com/macisamuele/language-formatters-pre-commit-hooks - rev: v2.1.0 + rev: v2.14.0 hooks: - id: pretty-format-yaml args: @@ -21,19 +21,19 @@ repos: - --indent=2 - repo: https://github.com/hadialqattan/pycln - rev: v1.1.0 + rev: v2.4.0 hooks: - id: pycln args: ["-a"] - repo: https://github.com/ambv/black - rev: 23.7.0 + rev: 24.10.0 hooks: - id: black language_version: python3 - repo: https://github.com/charliermarsh/ruff-pre-commit - rev: "v0.0.267" + rev: "v0.8.1" hooks: - id: ruff diff --git a/iwf-idl b/iwf-idl index d1da138..a6fbd8f 160000 --- a/iwf-idl +++ b/iwf-idl @@ -1 +1 @@ -Subproject commit d1da1384bca0c9317b916df3d6bbd3b848dc5150 +Subproject commit a6fbd8feb427fdf77b7ddc0f7c9d7eb154c385e7 diff --git a/iwf/client.py b/iwf/client.py index 3011e0e..f1ede30 100644 --- a/iwf/client.py +++ b/iwf/client.py @@ -70,6 +70,12 @@ def start_workflow( unreg_opts.workflow_start_delay_seconds = ( options.workflow_start_delay_seconds ) + unreg_opts.workflow_already_started_options = ( + options.workflow_already_started_options + ) + unreg_opts.initial_data_attributes = options.initial_data_attributes + + unreg_opts.workflow_config_override = options.workflow_config_override # TODO: set initial search attributes here diff --git a/iwf/iwf_api/api/default/post_api_v1_workflow_dataobjects_get.py b/iwf/iwf_api/api/default/post_api_v1_workflow_dataobjects_get.py index 9e7e9df..e83ca26 100644 --- a/iwf/iwf_api/api/default/post_api_v1_workflow_dataobjects_get.py +++ b/iwf/iwf_api/api/default/post_api_v1_workflow_dataobjects_get.py @@ -67,7 +67,7 @@ def sync_detailed( client: Client, json_body: WorkflowGetDataObjectsRequest, ) -> Response[Union[ErrorResponse, WorkflowGetDataObjectsResponse]]: - """get workflow data objects + """get workflow data objects aka data attributes Args: json_body (WorkflowGetDataObjectsRequest): @@ -98,7 +98,7 @@ def sync( client: Client, json_body: WorkflowGetDataObjectsRequest, ) -> Optional[Union[ErrorResponse, WorkflowGetDataObjectsResponse]]: - """get workflow data objects + """get workflow data objects aka data attributes Args: json_body (WorkflowGetDataObjectsRequest): @@ -122,7 +122,7 @@ async def asyncio_detailed( client: Client, json_body: WorkflowGetDataObjectsRequest, ) -> Response[Union[ErrorResponse, WorkflowGetDataObjectsResponse]]: - """get workflow data objects + """get workflow data objects aka data attributes Args: json_body (WorkflowGetDataObjectsRequest): @@ -151,7 +151,7 @@ async def asyncio( client: Client, json_body: WorkflowGetDataObjectsRequest, ) -> Optional[Union[ErrorResponse, WorkflowGetDataObjectsResponse]]: - """get workflow data objects + """get workflow data objects aka data attributes Args: json_body (WorkflowGetDataObjectsRequest): diff --git a/iwf/iwf_api/api/default/post_api_v1_workflow_dataobjects_set.py b/iwf/iwf_api/api/default/post_api_v1_workflow_dataobjects_set.py new file mode 100644 index 0000000..51040de --- /dev/null +++ b/iwf/iwf_api/api/default/post_api_v1_workflow_dataobjects_set.py @@ -0,0 +1,170 @@ +from http import HTTPStatus +from typing import Any, Dict, Optional, Union, cast + +import httpx + +from ... import errors +from ...client import Client +from ...models.error_response import ErrorResponse +from ...models.workflow_set_data_objects_request import WorkflowSetDataObjectsRequest +from ...types import Response + + +def _get_kwargs( + *, + client: Client, + json_body: WorkflowSetDataObjectsRequest, +) -> Dict[str, Any]: + url = "{}/api/v1/workflow/dataobjects/set".format(client.base_url) + + headers: Dict[str, str] = client.get_headers() + cookies: Dict[str, Any] = client.get_cookies() + + json_json_body = json_body.to_dict() + + return { + "method": "post", + "url": url, + "headers": headers, + "cookies": cookies, + "timeout": client.get_timeout(), + "follow_redirects": client.follow_redirects, + "json": json_json_body, + } + + +def _parse_response( + *, client: Client, response: httpx.Response +) -> Optional[Union[Any, ErrorResponse]]: + if response.status_code == HTTPStatus.OK: + response_200 = cast(Any, None) + return response_200 + if response.status_code == HTTPStatus.BAD_REQUEST: + response_400 = ErrorResponse.from_dict(response.json()) + + return response_400 + if client.raise_on_unexpected_status: + raise errors.UnexpectedStatus(response.status_code, response.content) + else: + return None + + +def _build_response( + *, client: Client, response: httpx.Response +) -> Response[Union[Any, ErrorResponse]]: + return Response( + status_code=HTTPStatus(response.status_code), + content=response.content, + headers=response.headers, + parsed=_parse_response(client=client, response=response), + ) + + +def sync_detailed( + *, + client: Client, + json_body: WorkflowSetDataObjectsRequest, +) -> Response[Union[Any, ErrorResponse]]: + """set workflow data objects aka data attributes + + Args: + json_body (WorkflowSetDataObjectsRequest): + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + Response[Union[Any, ErrorResponse]] + """ + + kwargs = _get_kwargs( + client=client, + json_body=json_body, + ) + + response = httpx.request( + verify=client.verify_ssl, + **kwargs, + ) + + return _build_response(client=client, response=response) + + +def sync( + *, + client: Client, + json_body: WorkflowSetDataObjectsRequest, +) -> Optional[Union[Any, ErrorResponse]]: + """set workflow data objects aka data attributes + + Args: + json_body (WorkflowSetDataObjectsRequest): + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + Union[Any, ErrorResponse] + """ + + return sync_detailed( + client=client, + json_body=json_body, + ).parsed + + +async def asyncio_detailed( + *, + client: Client, + json_body: WorkflowSetDataObjectsRequest, +) -> Response[Union[Any, ErrorResponse]]: + """set workflow data objects aka data attributes + + Args: + json_body (WorkflowSetDataObjectsRequest): + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + Response[Union[Any, ErrorResponse]] + """ + + kwargs = _get_kwargs( + client=client, + json_body=json_body, + ) + + async with httpx.AsyncClient(verify=client.verify_ssl) as _client: + response = await _client.request(**kwargs) + + return _build_response(client=client, response=response) + + +async def asyncio( + *, + client: Client, + json_body: WorkflowSetDataObjectsRequest, +) -> Optional[Union[Any, ErrorResponse]]: + """set workflow data objects aka data attributes + + Args: + json_body (WorkflowSetDataObjectsRequest): + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + Union[Any, ErrorResponse] + """ + + return ( + await asyncio_detailed( + client=client, + json_body=json_body, + ) + ).parsed diff --git a/iwf/iwf_api/api/default/post_api_v1_workflow_searchattributes_set.py b/iwf/iwf_api/api/default/post_api_v1_workflow_searchattributes_set.py new file mode 100644 index 0000000..1e2a0d2 --- /dev/null +++ b/iwf/iwf_api/api/default/post_api_v1_workflow_searchattributes_set.py @@ -0,0 +1,172 @@ +from http import HTTPStatus +from typing import Any, Dict, Optional, Union, cast + +import httpx + +from ... import errors +from ...client import Client +from ...models.error_response import ErrorResponse +from ...models.workflow_set_search_attributes_request import ( + WorkflowSetSearchAttributesRequest, +) +from ...types import Response + + +def _get_kwargs( + *, + client: Client, + json_body: WorkflowSetSearchAttributesRequest, +) -> Dict[str, Any]: + url = "{}/api/v1/workflow/searchattributes/set".format(client.base_url) + + headers: Dict[str, str] = client.get_headers() + cookies: Dict[str, Any] = client.get_cookies() + + json_json_body = json_body.to_dict() + + return { + "method": "post", + "url": url, + "headers": headers, + "cookies": cookies, + "timeout": client.get_timeout(), + "follow_redirects": client.follow_redirects, + "json": json_json_body, + } + + +def _parse_response( + *, client: Client, response: httpx.Response +) -> Optional[Union[Any, ErrorResponse]]: + if response.status_code == HTTPStatus.OK: + response_200 = cast(Any, None) + return response_200 + if response.status_code == HTTPStatus.BAD_REQUEST: + response_400 = ErrorResponse.from_dict(response.json()) + + return response_400 + if client.raise_on_unexpected_status: + raise errors.UnexpectedStatus(response.status_code, response.content) + else: + return None + + +def _build_response( + *, client: Client, response: httpx.Response +) -> Response[Union[Any, ErrorResponse]]: + return Response( + status_code=HTTPStatus(response.status_code), + content=response.content, + headers=response.headers, + parsed=_parse_response(client=client, response=response), + ) + + +def sync_detailed( + *, + client: Client, + json_body: WorkflowSetSearchAttributesRequest, +) -> Response[Union[Any, ErrorResponse]]: + """set workflow search attributes + + Args: + json_body (WorkflowSetSearchAttributesRequest): + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + Response[Union[Any, ErrorResponse]] + """ + + kwargs = _get_kwargs( + client=client, + json_body=json_body, + ) + + response = httpx.request( + verify=client.verify_ssl, + **kwargs, + ) + + return _build_response(client=client, response=response) + + +def sync( + *, + client: Client, + json_body: WorkflowSetSearchAttributesRequest, +) -> Optional[Union[Any, ErrorResponse]]: + """set workflow search attributes + + Args: + json_body (WorkflowSetSearchAttributesRequest): + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + Union[Any, ErrorResponse] + """ + + return sync_detailed( + client=client, + json_body=json_body, + ).parsed + + +async def asyncio_detailed( + *, + client: Client, + json_body: WorkflowSetSearchAttributesRequest, +) -> Response[Union[Any, ErrorResponse]]: + """set workflow search attributes + + Args: + json_body (WorkflowSetSearchAttributesRequest): + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + Response[Union[Any, ErrorResponse]] + """ + + kwargs = _get_kwargs( + client=client, + json_body=json_body, + ) + + async with httpx.AsyncClient(verify=client.verify_ssl) as _client: + response = await _client.request(**kwargs) + + return _build_response(client=client, response=response) + + +async def asyncio( + *, + client: Client, + json_body: WorkflowSetSearchAttributesRequest, +) -> Optional[Union[Any, ErrorResponse]]: + """set workflow search attributes + + Args: + json_body (WorkflowSetSearchAttributesRequest): + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + Union[Any, ErrorResponse] + """ + + return ( + await asyncio_detailed( + client=client, + json_body=json_body, + ) + ).parsed diff --git a/iwf/iwf_api/models/__init__.py b/iwf/iwf_api/models/__init__.py index 8ef35cf..1e72650 100644 --- a/iwf/iwf_api/models/__init__.py +++ b/iwf/iwf_api/models/__init__.py @@ -10,6 +10,7 @@ from .error_response import ErrorResponse from .error_sub_status import ErrorSubStatus from .execute_api_failure_policy import ExecuteApiFailurePolicy +from .executing_state_id_mode import ExecutingStateIdMode from .health_info import HealthInfo from .id_reuse_policy import IDReusePolicy from .inter_state_channel_command import InterStateChannelCommand @@ -33,10 +34,10 @@ from .trigger_continue_as_new_request import TriggerContinueAsNewRequest from .wait_until_api_failure_policy import WaitUntilApiFailurePolicy from .worker_error_response import WorkerErrorResponse +from .workflow_already_started_options import WorkflowAlreadyStartedOptions from .workflow_conditional_close import WorkflowConditionalClose from .workflow_conditional_close_type import WorkflowConditionalCloseType from .workflow_config import WorkflowConfig -from .workflow_config_executing_state_id_mode import WorkflowConfigExecutingStateIdMode from .workflow_config_update_request import WorkflowConfigUpdateRequest from .workflow_dump_request import WorkflowDumpRequest from .workflow_dump_response import WorkflowDumpResponse @@ -56,6 +57,8 @@ from .workflow_search_request import WorkflowSearchRequest from .workflow_search_response import WorkflowSearchResponse from .workflow_search_response_entry import WorkflowSearchResponseEntry +from .workflow_set_data_objects_request import WorkflowSetDataObjectsRequest +from .workflow_set_search_attributes_request import WorkflowSetSearchAttributesRequest from .workflow_signal_request import WorkflowSignalRequest from .workflow_skip_timer_request import WorkflowSkipTimerRequest from .workflow_start_options import WorkflowStartOptions @@ -89,6 +92,7 @@ "ErrorResponse", "ErrorSubStatus", "ExecuteApiFailurePolicy", + "ExecutingStateIdMode", "HealthInfo", "IDReusePolicy", "InterStateChannelCommand", @@ -112,10 +116,10 @@ "TriggerContinueAsNewRequest", "WaitUntilApiFailurePolicy", "WorkerErrorResponse", + "WorkflowAlreadyStartedOptions", "WorkflowConditionalClose", "WorkflowConditionalCloseType", "WorkflowConfig", - "WorkflowConfigExecutingStateIdMode", "WorkflowConfigUpdateRequest", "WorkflowDumpRequest", "WorkflowDumpResponse", @@ -135,6 +139,8 @@ "WorkflowSearchRequest", "WorkflowSearchResponse", "WorkflowSearchResponseEntry", + "WorkflowSetDataObjectsRequest", + "WorkflowSetSearchAttributesRequest", "WorkflowSignalRequest", "WorkflowSkipTimerRequest", "WorkflowStartOptions", diff --git a/iwf/iwf_api/models/executing_state_id_mode.py b/iwf/iwf_api/models/executing_state_id_mode.py new file mode 100644 index 0000000..d9b28ea --- /dev/null +++ b/iwf/iwf_api/models/executing_state_id_mode.py @@ -0,0 +1,10 @@ +from enum import Enum + + +class ExecutingStateIdMode(str, Enum): + DISABLED = "DISABLED" + ENABLED_FOR_ALL = "ENABLED_FOR_ALL" + ENABLED_FOR_STATES_WITH_WAIT_UNTIL = "ENABLED_FOR_STATES_WITH_WAIT_UNTIL" + + def __str__(self) -> str: + return str(self.value) diff --git a/iwf/iwf_api/models/workflow_already_started_options.py b/iwf/iwf_api/models/workflow_already_started_options.py new file mode 100644 index 0000000..6f2e7c6 --- /dev/null +++ b/iwf/iwf_api/models/workflow_already_started_options.py @@ -0,0 +1,67 @@ +from typing import Any, Dict, List, Type, TypeVar, Union + +import attr + +from ..types import UNSET, Unset + +T = TypeVar("T", bound="WorkflowAlreadyStartedOptions") + + +@attr.s(auto_attribs=True) +class WorkflowAlreadyStartedOptions: + """ + Attributes: + ignore_already_started_error (bool): + request_id (Union[Unset, str]): + """ + + ignore_already_started_error: bool + request_id: Union[Unset, str] = UNSET + additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict) + + def to_dict(self) -> Dict[str, Any]: + ignore_already_started_error = self.ignore_already_started_error + request_id = self.request_id + + field_dict: Dict[str, Any] = {} + field_dict.update(self.additional_properties) + field_dict.update( + { + "ignoreAlreadyStartedError": ignore_already_started_error, + } + ) + if request_id is not UNSET: + field_dict["requestId"] = request_id + + return field_dict + + @classmethod + def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: + d = src_dict.copy() + ignore_already_started_error = d.pop("ignoreAlreadyStartedError") + + request_id = d.pop("requestId", UNSET) + + workflow_already_started_options = cls( + ignore_already_started_error=ignore_already_started_error, + request_id=request_id, + ) + + workflow_already_started_options.additional_properties = d + return workflow_already_started_options + + @property + def additional_keys(self) -> List[str]: + return list(self.additional_properties.keys()) + + def __getitem__(self, key: str) -> Any: + return self.additional_properties[key] + + def __setitem__(self, key: str, value: Any) -> None: + self.additional_properties[key] = value + + def __delitem__(self, key: str) -> None: + del self.additional_properties[key] + + def __contains__(self, key: str) -> bool: + return key in self.additional_properties diff --git a/iwf/iwf_api/models/workflow_config.py b/iwf/iwf_api/models/workflow_config.py index cbc891c..819f7a8 100644 --- a/iwf/iwf_api/models/workflow_config.py +++ b/iwf/iwf_api/models/workflow_config.py @@ -2,9 +2,7 @@ import attr -from ..models.workflow_config_executing_state_id_mode import ( - WorkflowConfigExecutingStateIdMode, -) +from ..models.executing_state_id_mode import ExecutingStateIdMode from ..types import UNSET, Unset T = TypeVar("T", bound="WorkflowConfig") @@ -14,22 +12,19 @@ class WorkflowConfig: """ Attributes: - disable_system_search_attribute (Union[Unset, bool]): - executing_state_id_mode (Union[Unset, WorkflowConfigExecutingStateIdMode]): + executing_state_id_mode (Union[Unset, ExecutingStateIdMode]): continue_as_new_threshold (Union[Unset, int]): continue_as_new_page_size_in_bytes (Union[Unset, int]): optimize_activity (Union[Unset, bool]): """ - disable_system_search_attribute: Union[Unset, bool] = UNSET - executing_state_id_mode: Union[Unset, WorkflowConfigExecutingStateIdMode] = UNSET + executing_state_id_mode: Union[Unset, ExecutingStateIdMode] = UNSET continue_as_new_threshold: Union[Unset, int] = UNSET continue_as_new_page_size_in_bytes: Union[Unset, int] = UNSET optimize_activity: Union[Unset, bool] = UNSET additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict) def to_dict(self) -> Dict[str, Any]: - disable_system_search_attribute = self.disable_system_search_attribute executing_state_id_mode: Union[Unset, str] = UNSET if not isinstance(self.executing_state_id_mode, Unset): executing_state_id_mode = self.executing_state_id_mode.value @@ -41,8 +36,6 @@ def to_dict(self) -> Dict[str, Any]: field_dict: Dict[str, Any] = {} field_dict.update(self.additional_properties) field_dict.update({}) - if disable_system_search_attribute is not UNSET: - field_dict["disableSystemSearchAttribute"] = disable_system_search_attribute if executing_state_id_mode is not UNSET: field_dict["executingStateIdMode"] = executing_state_id_mode if continue_as_new_threshold is not UNSET: @@ -59,16 +52,12 @@ def to_dict(self) -> Dict[str, Any]: @classmethod def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: d = src_dict.copy() - disable_system_search_attribute = d.pop("disableSystemSearchAttribute", UNSET) - _executing_state_id_mode = d.pop("executingStateIdMode", UNSET) - executing_state_id_mode: Union[Unset, WorkflowConfigExecutingStateIdMode] + executing_state_id_mode: Union[Unset, ExecutingStateIdMode] if isinstance(_executing_state_id_mode, Unset): executing_state_id_mode = UNSET else: - executing_state_id_mode = WorkflowConfigExecutingStateIdMode( - _executing_state_id_mode - ) + executing_state_id_mode = ExecutingStateIdMode(_executing_state_id_mode) continue_as_new_threshold = d.pop("continueAsNewThreshold", UNSET) @@ -79,7 +68,6 @@ def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: optimize_activity = d.pop("optimizeActivity", UNSET) workflow_config = cls( - disable_system_search_attribute=disable_system_search_attribute, executing_state_id_mode=executing_state_id_mode, continue_as_new_threshold=continue_as_new_threshold, continue_as_new_page_size_in_bytes=continue_as_new_page_size_in_bytes, diff --git a/iwf/iwf_api/models/workflow_reset_request.py b/iwf/iwf_api/models/workflow_reset_request.py index e0466c5..ab8c9eb 100644 --- a/iwf/iwf_api/models/workflow_reset_request.py +++ b/iwf/iwf_api/models/workflow_reset_request.py @@ -21,6 +21,7 @@ class WorkflowResetRequest: state_id (Union[Unset, str]): state_execution_id (Union[Unset, str]): skip_signal_reapply (Union[Unset, bool]): + skip_update_reapply (Union[Unset, bool]): """ workflow_id: str @@ -32,6 +33,7 @@ class WorkflowResetRequest: state_id: Union[Unset, str] = UNSET state_execution_id: Union[Unset, str] = UNSET skip_signal_reapply: Union[Unset, bool] = UNSET + skip_update_reapply: Union[Unset, bool] = UNSET additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict) def to_dict(self) -> Dict[str, Any]: @@ -45,6 +47,7 @@ def to_dict(self) -> Dict[str, Any]: state_id = self.state_id state_execution_id = self.state_execution_id skip_signal_reapply = self.skip_signal_reapply + skip_update_reapply = self.skip_update_reapply field_dict: Dict[str, Any] = {} field_dict.update(self.additional_properties) @@ -68,6 +71,8 @@ def to_dict(self) -> Dict[str, Any]: field_dict["stateExecutionId"] = state_execution_id if skip_signal_reapply is not UNSET: field_dict["skipSignalReapply"] = skip_signal_reapply + if skip_update_reapply is not UNSET: + field_dict["skipUpdateReapply"] = skip_update_reapply return field_dict @@ -92,6 +97,8 @@ def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: skip_signal_reapply = d.pop("skipSignalReapply", UNSET) + skip_update_reapply = d.pop("skipUpdateReapply", UNSET) + workflow_reset_request = cls( workflow_id=workflow_id, reset_type=reset_type, @@ -102,6 +109,7 @@ def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: state_id=state_id, state_execution_id=state_execution_id, skip_signal_reapply=skip_signal_reapply, + skip_update_reapply=skip_update_reapply, ) workflow_reset_request.additional_properties = d diff --git a/iwf/iwf_api/models/workflow_set_data_objects_request.py b/iwf/iwf_api/models/workflow_set_data_objects_request.py new file mode 100644 index 0000000..ffad5de --- /dev/null +++ b/iwf/iwf_api/models/workflow_set_data_objects_request.py @@ -0,0 +1,92 @@ +from typing import TYPE_CHECKING, Any, Dict, List, Type, TypeVar, Union + +import attr + +from ..types import UNSET, Unset + +if TYPE_CHECKING: + from ..models.key_value import KeyValue + + +T = TypeVar("T", bound="WorkflowSetDataObjectsRequest") + + +@attr.s(auto_attribs=True) +class WorkflowSetDataObjectsRequest: + """ + Attributes: + workflow_id (str): + workflow_run_id (Union[Unset, str]): + objects (Union[Unset, List['KeyValue']]): + """ + + workflow_id: str + workflow_run_id: Union[Unset, str] = UNSET + objects: Union[Unset, List["KeyValue"]] = UNSET + additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict) + + def to_dict(self) -> Dict[str, Any]: + workflow_id = self.workflow_id + workflow_run_id = self.workflow_run_id + objects: Union[Unset, List[Dict[str, Any]]] = UNSET + if not isinstance(self.objects, Unset): + objects = [] + for objects_item_data in self.objects: + objects_item = objects_item_data.to_dict() + + objects.append(objects_item) + + field_dict: Dict[str, Any] = {} + field_dict.update(self.additional_properties) + field_dict.update( + { + "workflowId": workflow_id, + } + ) + if workflow_run_id is not UNSET: + field_dict["workflowRunId"] = workflow_run_id + if objects is not UNSET: + field_dict["objects"] = objects + + return field_dict + + @classmethod + def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: + from ..models.key_value import KeyValue + + d = src_dict.copy() + workflow_id = d.pop("workflowId") + + workflow_run_id = d.pop("workflowRunId", UNSET) + + objects = [] + _objects = d.pop("objects", UNSET) + for objects_item_data in _objects or []: + objects_item = KeyValue.from_dict(objects_item_data) + + objects.append(objects_item) + + workflow_set_data_objects_request = cls( + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + objects=objects, + ) + + workflow_set_data_objects_request.additional_properties = d + return workflow_set_data_objects_request + + @property + def additional_keys(self) -> List[str]: + return list(self.additional_properties.keys()) + + def __getitem__(self, key: str) -> Any: + return self.additional_properties[key] + + def __setitem__(self, key: str, value: Any) -> None: + self.additional_properties[key] = value + + def __delitem__(self, key: str) -> None: + del self.additional_properties[key] + + def __contains__(self, key: str) -> bool: + return key in self.additional_properties diff --git a/iwf/iwf_api/models/workflow_set_search_attributes_request.py b/iwf/iwf_api/models/workflow_set_search_attributes_request.py new file mode 100644 index 0000000..610cf05 --- /dev/null +++ b/iwf/iwf_api/models/workflow_set_search_attributes_request.py @@ -0,0 +1,94 @@ +from typing import TYPE_CHECKING, Any, Dict, List, Type, TypeVar, Union + +import attr + +from ..types import UNSET, Unset + +if TYPE_CHECKING: + from ..models.search_attribute import SearchAttribute + + +T = TypeVar("T", bound="WorkflowSetSearchAttributesRequest") + + +@attr.s(auto_attribs=True) +class WorkflowSetSearchAttributesRequest: + """ + Attributes: + workflow_id (str): + workflow_run_id (Union[Unset, str]): + search_attributes (Union[Unset, List['SearchAttribute']]): + """ + + workflow_id: str + workflow_run_id: Union[Unset, str] = UNSET + search_attributes: Union[Unset, List["SearchAttribute"]] = UNSET + additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict) + + def to_dict(self) -> Dict[str, Any]: + workflow_id = self.workflow_id + workflow_run_id = self.workflow_run_id + search_attributes: Union[Unset, List[Dict[str, Any]]] = UNSET + if not isinstance(self.search_attributes, Unset): + search_attributes = [] + for search_attributes_item_data in self.search_attributes: + search_attributes_item = search_attributes_item_data.to_dict() + + search_attributes.append(search_attributes_item) + + field_dict: Dict[str, Any] = {} + field_dict.update(self.additional_properties) + field_dict.update( + { + "workflowId": workflow_id, + } + ) + if workflow_run_id is not UNSET: + field_dict["workflowRunId"] = workflow_run_id + if search_attributes is not UNSET: + field_dict["searchAttributes"] = search_attributes + + return field_dict + + @classmethod + def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: + from ..models.search_attribute import SearchAttribute + + d = src_dict.copy() + workflow_id = d.pop("workflowId") + + workflow_run_id = d.pop("workflowRunId", UNSET) + + search_attributes = [] + _search_attributes = d.pop("searchAttributes", UNSET) + for search_attributes_item_data in _search_attributes or []: + search_attributes_item = SearchAttribute.from_dict( + search_attributes_item_data + ) + + search_attributes.append(search_attributes_item) + + workflow_set_search_attributes_request = cls( + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + search_attributes=search_attributes, + ) + + workflow_set_search_attributes_request.additional_properties = d + return workflow_set_search_attributes_request + + @property + def additional_keys(self) -> List[str]: + return list(self.additional_properties.keys()) + + def __getitem__(self, key: str) -> Any: + return self.additional_properties[key] + + def __setitem__(self, key: str, value: Any) -> None: + self.additional_properties[key] = value + + def __delitem__(self, key: str) -> None: + del self.additional_properties[key] + + def __contains__(self, key: str) -> bool: + return key in self.additional_properties diff --git a/iwf/iwf_api/models/workflow_start_options.py b/iwf/iwf_api/models/workflow_start_options.py index cd66dc2..38e238d 100644 --- a/iwf/iwf_api/models/workflow_start_options.py +++ b/iwf/iwf_api/models/workflow_start_options.py @@ -8,6 +8,7 @@ if TYPE_CHECKING: from ..models.key_value import KeyValue from ..models.search_attribute import SearchAttribute + from ..models.workflow_already_started_options import WorkflowAlreadyStartedOptions from ..models.workflow_config import WorkflowConfig from ..models.workflow_retry_policy import WorkflowRetryPolicy @@ -27,6 +28,7 @@ class WorkflowStartOptions: data_attributes (Union[Unset, List['KeyValue']]): workflow_config_override (Union[Unset, WorkflowConfig]): use_memo_for_data_attributes (Union[Unset, bool]): + workflow_already_started_options (Union[Unset, WorkflowAlreadyStartedOptions]): """ id_reuse_policy: Union[Unset, IDReusePolicy] = UNSET @@ -37,6 +39,9 @@ class WorkflowStartOptions: data_attributes: Union[Unset, List["KeyValue"]] = UNSET workflow_config_override: Union[Unset, "WorkflowConfig"] = UNSET use_memo_for_data_attributes: Union[Unset, bool] = UNSET + workflow_already_started_options: Union[Unset, "WorkflowAlreadyStartedOptions"] = ( + UNSET + ) additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict) def to_dict(self) -> Dict[str, Any]: @@ -71,6 +76,11 @@ def to_dict(self) -> Dict[str, Any]: workflow_config_override = self.workflow_config_override.to_dict() use_memo_for_data_attributes = self.use_memo_for_data_attributes + workflow_already_started_options: Union[Unset, Dict[str, Any]] = UNSET + if not isinstance(self.workflow_already_started_options, Unset): + workflow_already_started_options = ( + self.workflow_already_started_options.to_dict() + ) field_dict: Dict[str, Any] = {} field_dict.update(self.additional_properties) @@ -91,6 +101,10 @@ def to_dict(self) -> Dict[str, Any]: field_dict["workflowConfigOverride"] = workflow_config_override if use_memo_for_data_attributes is not UNSET: field_dict["useMemoForDataAttributes"] = use_memo_for_data_attributes + if workflow_already_started_options is not UNSET: + field_dict["workflowAlreadyStartedOptions"] = ( + workflow_already_started_options + ) return field_dict @@ -98,6 +112,9 @@ def to_dict(self) -> Dict[str, Any]: def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: from ..models.key_value import KeyValue from ..models.search_attribute import SearchAttribute + from ..models.workflow_already_started_options import ( + WorkflowAlreadyStartedOptions, + ) from ..models.workflow_config import WorkflowConfig from ..models.workflow_retry_policy import WorkflowRetryPolicy @@ -147,6 +164,17 @@ def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: use_memo_for_data_attributes = d.pop("useMemoForDataAttributes", UNSET) + _workflow_already_started_options = d.pop( + "workflowAlreadyStartedOptions", UNSET + ) + workflow_already_started_options: Union[Unset, WorkflowAlreadyStartedOptions] + if isinstance(_workflow_already_started_options, Unset): + workflow_already_started_options = UNSET + else: + workflow_already_started_options = WorkflowAlreadyStartedOptions.from_dict( + _workflow_already_started_options + ) + workflow_start_options = cls( id_reuse_policy=id_reuse_policy, cron_schedule=cron_schedule, @@ -156,6 +184,7 @@ def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: data_attributes=data_attributes, workflow_config_override=workflow_config_override, use_memo_for_data_attributes=use_memo_for_data_attributes, + workflow_already_started_options=workflow_already_started_options, ) workflow_start_options.additional_properties = d diff --git a/iwf/rpc.py b/iwf/rpc.py index 3112fb1..7d9c91a 100644 --- a/iwf/rpc.py +++ b/iwf/rpc.py @@ -13,9 +13,9 @@ class RPCInfo: timeout_seconds: int input_type: Optional[type] = None data_attribute_loading_policy: Optional[PersistenceLoadingPolicy] = None - params_order: Optional[ - list - ] = None # store this so that the rpc can be invoked with correct parameters + params_order: Optional[list] = ( + None # store this so that the rpc can be invoked with correct parameters + ) rpc_definition_err = WorkflowDefinitionError( diff --git a/iwf/tests/test_basic_workflow.py b/iwf/tests/test_basic_workflow.py index f96b040..f15f51d 100644 --- a/iwf/tests/test_basic_workflow.py +++ b/iwf/tests/test_basic_workflow.py @@ -1,17 +1,21 @@ import inspect import time +import unittest from typing import Union from iwf.client import Client -from iwf.command_request import CommandRequest +from iwf.command_request import CommandRequest, TimerCommand from iwf.command_results import CommandResults from iwf.communication import Communication +from iwf.errors import WorkflowAlreadyStartedError +from iwf.iwf_api.models import WorkflowAlreadyStartedOptions from iwf.persistence import Persistence from iwf.state_decision import StateDecision from iwf.state_schema import StateSchema from iwf.tests.worker_server import registry from iwf.workflow import ObjectWorkflow from iwf.workflow_context import WorkflowContext +from iwf.workflow_options import WorkflowOptions from iwf.workflow_state import T, WorkflowState @@ -25,7 +29,9 @@ def wait_until( ) -> CommandRequest: if input != "input": raise RuntimeError("input is incorrect") - return CommandRequest.empty() + return CommandRequest.for_all_command_completed( + TimerCommand.by_seconds(1), + ) def execute( self, @@ -62,9 +68,39 @@ def get_workflow_states(self) -> StateSchema: client = Client(registry) -def test_basic_workflow(): - wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}" +class TestWorkflowErrors(unittest.TestCase): + def test_basic_workflow(self): + original_request_id = "1" + later_request_id = "2" - client.start_workflow(BasicWorkflow, wf_id, 100, "input") - res = client.get_simple_workflow_result_with_wait(wf_id, str) - assert res == "done" + wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}" + + workflow_already_started_options_1 = WorkflowAlreadyStartedOptions( + ignore_already_started_error=True + ) + workflow_already_started_options_1.request_id = original_request_id + + start_options_1 = WorkflowOptions() + start_options_1.workflow_already_started_options = ( + workflow_already_started_options_1 + ) + + client.start_workflow(BasicWorkflow, wf_id, 100, "input", start_options_1) + + client.start_workflow(BasicWorkflow, wf_id, 100, "input", start_options_1) + + workflow_already_started_options_2 = WorkflowAlreadyStartedOptions( + ignore_already_started_error=True + ) + workflow_already_started_options_2.request_id = later_request_id + + start_options_2 = WorkflowOptions() + start_options_2.workflow_already_started_option = ( + workflow_already_started_options_2 + ) + + with self.assertRaises(WorkflowAlreadyStartedError): + client.start_workflow(BasicWorkflow, wf_id, 100, "input", start_options_2) + + res = client.get_simple_workflow_result_with_wait(wf_id, str) + assert res == "done" diff --git a/iwf/tests/test_persistence.py b/iwf/tests/test_persistence.py index f19f512..9d73936 100644 --- a/iwf/tests/test_persistence.py +++ b/iwf/tests/test_persistence.py @@ -1,5 +1,6 @@ import inspect import time +import unittest from iwf.client import Client from iwf.command_request import CommandRequest @@ -12,11 +13,23 @@ from iwf.tests.worker_server import registry from iwf.workflow import ObjectWorkflow from iwf.workflow_context import WorkflowContext +from iwf.workflow_options import WorkflowOptions from iwf.workflow_state import T, WorkflowState +from iwf.rpc import rpc + +initial_da_1 = "initial_da_1" +initial_da_value_1 = "value_1" +initial_da_2 = "initial_da_2" +initial_da_value_2 = "value_2" test_da_1 = "test_da_1" test_da_2 = "test_da_2" +final_test_da_value_1 = "1234" +final_test_da_value_2 = 1234 +final_initial_da_value_1 = initial_da_value_1 +final_initial_da_value_2 = "no-more-init" + class DataAttributeRWState(WorkflowState[None]): def wait_until( @@ -44,8 +57,9 @@ def execute( assert da1 == "123" assert da2 == 123 - persistence.set_data_attribute(test_da_1, "1234") - persistence.set_data_attribute(test_da_2, 1234) + persistence.set_data_attribute(test_da_1, final_test_da_value_1) + persistence.set_data_attribute(test_da_2, final_test_da_value_2) + persistence.set_data_attribute(initial_da_2, final_initial_da_value_2) return StateDecision.graceful_complete_workflow() @@ -55,19 +69,46 @@ def get_workflow_states(self) -> StateSchema: def get_persistence_schema(self) -> PersistenceSchema: return PersistenceSchema.create( + PersistenceField.data_attribute_def(initial_da_1, str), + PersistenceField.data_attribute_def(initial_da_2, str), PersistenceField.data_attribute_def(test_da_1, str), PersistenceField.data_attribute_def(test_da_2, int), ) + @rpc() + def test_persistence_read(self, pers: Persistence): + return ( + pers.get_data_attribute(initial_da_1), + pers.get_data_attribute(initial_da_2), + pers.get_data_attribute(test_da_1), + pers.get_data_attribute(test_da_2), + ) + + +class TestPersistence(unittest.TestCase): + @classmethod + def setUpClass(cls): + wf = PersistenceWorkflow() + registry.add_workflow(wf) + cls.client = Client(registry) -wf = PersistenceWorkflow() -registry.add_workflow(wf) -client = Client(registry) + def test_persistence_workflow(self): + wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}" + start_options = WorkflowOptions( + initial_data_attributes={ + initial_da_1: initial_da_value_1, + initial_da_2: initial_da_value_2, + }, + ) -def test_persistence_workflow(): - wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}" + self.client.start_workflow(PersistenceWorkflow, wf_id, 100, None, start_options) + self.client.get_simple_workflow_result_with_wait(wf_id, None) - client.start_workflow(PersistenceWorkflow, wf_id, 100, None) - client.get_simple_workflow_result_with_wait(wf_id, None) - # TODO use RPC to get the final result of the persistence updated in execute API + res = self.client.invoke_rpc(wf_id, PersistenceWorkflow.test_persistence_read) + assert res == [ + final_initial_da_value_1, + final_initial_da_value_2, + final_test_da_value_1, + final_test_da_value_2, + ] diff --git a/iwf/unregistered_client.py b/iwf/unregistered_client.py index 61b43a8..f8bf7c0 100644 --- a/iwf/unregistered_client.py +++ b/iwf/unregistered_client.py @@ -31,7 +31,6 @@ ErrorResponse, IDReusePolicy, PersistenceLoadingPolicy, - SearchAttribute, SearchAttributeKeyAndType, WorkflowConfig, WorkflowGetDataObjectsRequest, @@ -52,6 +51,8 @@ WorkflowStateOptions, WorkflowStatus, WorkflowStopRequest, + WorkflowAlreadyStartedOptions, + KeyValue, ) from iwf.iwf_api.types import Response from iwf.reset_workflow_type_and_options import ResetWorkflowTypeAndOptions @@ -65,8 +66,9 @@ class UnregisteredWorkflowOptions: workflow_start_delay_seconds: Optional[int] = None workflow_retry_policy: Optional[WorkflowRetryPolicy] = None start_state_options: Optional[WorkflowStateOptions] = None - initial_search_attribute: Optional[List[SearchAttribute]] = None workflow_config_override: Optional[WorkflowConfig] = None + workflow_already_started_options: Optional[WorkflowAlreadyStartedOptions] = None + initial_data_attributes: Optional[dict[str, Any]] = None T = TypeVar("T") @@ -142,6 +144,20 @@ def start_workflow( start_options.workflow_config_override = ( options.workflow_config_override ) + + if options.workflow_already_started_options: + start_options.workflow_already_started_options = ( + options.workflow_already_started_options + ) + + if options.initial_data_attributes: + das = [] + for key, value in options.initial_data_attributes.items(): + das.append( + KeyValue(key, self.client_options.object_encoder.encode(value)) + ) + start_options.data_attributes = das + request.workflow_start_options = start_options response = post_api_v1_workflow_start.sync_detailed( diff --git a/iwf/worker_service.py b/iwf/worker_service.py index 3880c1c..3b4a834 100644 --- a/iwf/worker_service.py +++ b/iwf/worker_service.py @@ -34,12 +34,12 @@ class WorkerOptions: class WorkerService: - api_path_workflow_state_wait_until: typing.ClassVar[ - str - ] = "/api/v1/workflowState/start" - api_path_workflow_state_execute: typing.ClassVar[ - str - ] = "/api/v1/workflowState/decide" + api_path_workflow_state_wait_until: typing.ClassVar[str] = ( + "/api/v1/workflowState/start" + ) + api_path_workflow_state_execute: typing.ClassVar[str] = ( + "/api/v1/workflowState/decide" + ) api_path_workflow_worker_rpc: typing.ClassVar[str] = "/api/v1/workflowWorker/rpc" def __init__( diff --git a/iwf/workflow_context.py b/iwf/workflow_context.py index 24feb9f..2c8bb33 100644 --- a/iwf/workflow_context.py +++ b/iwf/workflow_context.py @@ -13,16 +13,24 @@ class WorkflowContext: state_execution_id: Optional[str] = None first_attempt_timestamp_seconds: Optional[int] = None attempt: Optional[int] = None + child_workflow_request_id: Optional[str] = None def _from_idl_context(idl_context: Context) -> WorkflowContext: + state_execution_id = unset_to_none(idl_context.state_execution_id) + return WorkflowContext( workflow_id=idl_context.workflow_id, workflow_run_id=idl_context.workflow_run_id, workflow_start_timestamp_seconds=idl_context.workflow_started_timestamp, - state_execution_id=unset_to_none(idl_context.state_execution_id), + state_execution_id=state_execution_id, first_attempt_timestamp_seconds=unset_to_none( idl_context.first_attempt_timestamp, ), attempt=unset_to_none(idl_context.attempt), + child_workflow_request_id=( + idl_context.workflow_run_id + "-" + state_execution_id + if state_execution_id is not None + else None + ), ) diff --git a/iwf/workflow_options.py b/iwf/workflow_options.py index eb2517b..fe5bf55 100644 --- a/iwf/workflow_options.py +++ b/iwf/workflow_options.py @@ -1,7 +1,12 @@ from dataclasses import dataclass -from typing import Optional +from typing import Any, Optional -from iwf.iwf_api.models import IDReusePolicy, WorkflowRetryPolicy +from iwf.iwf_api.models import ( + IDReusePolicy, + WorkflowRetryPolicy, + WorkflowAlreadyStartedOptions, + WorkflowConfig, +) @dataclass @@ -10,4 +15,6 @@ class WorkflowOptions: workflow_cron_schedule: Optional[str] = None workflow_start_delay_seconds: Optional[int] = None workflow_retry_policy: Optional[WorkflowRetryPolicy] = None - # initial_search_attributes: Optional[dict[str, Any]] = None + workflow_already_started_options: Optional[WorkflowAlreadyStartedOptions] = None + workflow_config_override: Optional[WorkflowConfig] = None + initial_data_attributes: Optional[dict[str, Any]] = None diff --git a/iwf/workflow_state_options.py b/iwf/workflow_state_options.py index 1225517..8ea88d2 100644 --- a/iwf/workflow_state_options.py +++ b/iwf/workflow_state_options.py @@ -35,12 +35,12 @@ class WorkflowStateOptions: TODO fix this type hint """ execute_failure_handling_state: Optional[type] = None - execute_api_data_attributes_loading_policy: Optional[ - PersistenceLoadingPolicy - ] = None - execute_api_search_attributes_loading_policy: Optional[ - PersistenceLoadingPolicy - ] = None + execute_api_data_attributes_loading_policy: Optional[PersistenceLoadingPolicy] = ( + None + ) + execute_api_search_attributes_loading_policy: Optional[PersistenceLoadingPolicy] = ( + None + ) def _to_idl_state_options(