Skip to content

Commit

Permalink
IWF-405: Rename failure handling API (#67)
Browse files Browse the repository at this point in the history
* Rename failure handling API

* DONE

* fix lint

* fix lint

* fix lint

* rename
  • Loading branch information
longquanzheng authored Dec 11, 2024
1 parent c3bd12a commit 2a09401
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 10 deletions.
12 changes: 12 additions & 0 deletions iwf/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import inspect
from typing import Any, Callable, Optional, Type, TypeVar, Union

from typing_extensions import deprecated

from iwf.client_options import ClientOptions
from iwf.errors import InvalidArgumentError
from iwf.registry import Registry
Expand Down Expand Up @@ -94,10 +96,20 @@ def start_workflow(
wf_type, wf_id, starting_state_id, timeout_seconds, input, unreg_opts
)

@deprecated("use wait_for_workflow_completion instead")
def get_simple_workflow_result_with_wait(
self,
workflow_id: str,
type_hint: Optional[Type[T]] = None,
) -> Optional[T]:
return self._unregistered_client.get_simple_workflow_result_with_wait(
workflow_id, "", type_hint
)

def wait_for_workflow_completion(
self,
workflow_id: str,
type_hint: Optional[Type[T]] = None,
) -> Optional[T]:
"""
This will be waiting up to 5~60 seconds (configurable in HTTP client and capped by server) for workflow to
Expand Down
2 changes: 1 addition & 1 deletion iwf/tests/test_state_failure_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def execute(
def get_state_options(self) -> WorkflowStateOptions:
return WorkflowStateOptions(
execute_api_retry_policy=RetryPolicy(maximum_attempts=1),
execute_failure_handling_state=RecoveryState,
proceed_to_state_when_execute_retry_exhausted=RecoveryState,
)


Expand Down
56 changes: 47 additions & 9 deletions iwf/workflow_state_options.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from dataclasses import dataclass
from typing import Any, Optional

from iwf.errors import WorkflowDefinitionError
from iwf.iwf_api.models import (
ExecuteApiFailurePolicy,
PersistenceLoadingPolicy,
RetryPolicy,
WaitUntilApiFailurePolicy,
WorkflowStateOptions as IdlWorkflowStateOptions,
)
from iwf.iwf_api.types import Unset


@dataclass
Expand All @@ -19,7 +21,17 @@ class WorkflowStateOptions:
# below are wait_until API specific options:
wait_until_api_timeout_seconds: Optional[int] = None
wait_until_api_retry_policy: Optional[RetryPolicy] = None
wait_until_api_failure_policy: Optional[WaitUntilApiFailurePolicy] = None
"""
By default, workflow would fail after waitUntil API retry exhausted.
This policy to allow proceeding to the execute API after waitUntil API exhausted all retries.
This is useful for some advanced use cases like SAGA pattern.
RetryPolicy is required to be set with maximumAttempts or maximumAttemptsDurationSeconds for waitUntil API.
NOTE: execute API will use commandResults to check whether the waitUntil has succeeded or not.
See more in <a href="https://github.com/indeedeng/iwf/wiki/WorkflowStateOptions">wiki</a>
"""
proceed_to_execute_when_wait_until_retry_exhausted: Optional[
WaitUntilApiFailurePolicy
] = None
wait_until_api_data_attributes_loading_policy: Optional[
PersistenceLoadingPolicy
] = None
Expand All @@ -30,11 +42,14 @@ class WorkflowStateOptions:
execute_api_timeout_seconds: Optional[int] = None
execute_api_retry_policy: Optional[RetryPolicy] = None
"""
note that the failing handling state will take the same input as the failed state
the type is Optional[type[WorkflowState]] but there is an issue with type hint...
TODO fix this type hint
By default, workflow would fail after execute API retry exhausted.
Set the state to proceed to the specified state after the execute API exhausted all retries
This is useful for some advanced use cases like SAGA pattern.
RetryPolicy is required to be set with maximumAttempts or maximumAttemptsDurationSeconds for execute API.
Note that the failure handling state will take the same input as the failed from state.
TODO the type should be the type is Optional[type[WorkflowState]] but -- there is an issue with circular import...
"""
execute_failure_handling_state: Optional[type] = None
proceed_to_state_when_execute_retry_exhausted: Optional[type] = None
execute_api_data_attributes_loading_policy: Optional[PersistenceLoadingPolicy] = (
None
)
Expand Down Expand Up @@ -77,8 +92,20 @@ def _to_idl_state_options(
)
if options.data_attributes_loading_policy is not None:
res.data_attributes_loading_policy = options.data_attributes_loading_policy
if options.wait_until_api_failure_policy is not None:
res.wait_until_api_failure_policy = options.wait_until_api_failure_policy
if options.proceed_to_execute_when_wait_until_retry_exhausted is not None:
res.wait_until_api_failure_policy = (
options.proceed_to_execute_when_wait_until_retry_exhausted
)
if options.wait_until_api_retry_policy is None:
raise WorkflowDefinitionError("wait_until API retry policy must be set")
if isinstance(
options.wait_until_api_retry_policy.maximum_attempts, Unset
) and isinstance(
options.wait_until_api_retry_policy.maximum_attempts_duration_seconds, Unset
):
raise WorkflowDefinitionError(
"wait_until API retry policy must be set with maximum_attempts or maximum_attempts_duration_seconds"
)
if options.wait_until_api_retry_policy is not None:
res.wait_until_api_retry_policy = options.wait_until_api_retry_policy
if options.wait_until_api_timeout_seconds is not None:
Expand All @@ -87,14 +114,25 @@ def _to_idl_state_options(
res.execute_api_retry_policy = options.execute_api_retry_policy
if options.execute_api_timeout_seconds is not None:
res.execute_api_timeout_seconds = options.execute_api_timeout_seconds
if options.execute_failure_handling_state is not None:
if options.proceed_to_state_when_execute_retry_exhausted is not None:
res.execute_api_failure_policy = (
ExecuteApiFailurePolicy.PROCEED_TO_CONFIGURED_STATE
)
if options.execute_api_retry_policy is None:
raise WorkflowDefinitionError("execute API retry policy must be set")
if isinstance(
options.execute_api_retry_policy.maximum_attempts, Unset
) and isinstance(
options.execute_api_retry_policy.maximum_attempts_duration_seconds, Unset
):
raise WorkflowDefinitionError(
"execute API retry policy must be set with maximum_attempts or maximum_attempts_duration_seconds"
)

from iwf.workflow_state import get_state_id_by_class

res.execute_api_failure_proceed_state_id = get_state_id_by_class(
options.execute_failure_handling_state
options.proceed_to_state_when_execute_retry_exhausted
)
state = state_store[res.execute_api_failure_proceed_state_id]
proceed_state_options = state.get_state_options()
Expand Down

0 comments on commit 2a09401

Please sign in to comment.