diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index 030aa4e2..1b02263f 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -7,7 +7,7 @@ from ..interfaces import IAction from ..models.Task import Task from ..tasks import call_activity_task, task_all, task_any, call_activity_with_retry_task, \ - wait_for_external_event_task + wait_for_external_event_task,create_timer_task class DurableOrchestrationContext: @@ -30,6 +30,7 @@ def __init__(self, state=self.histories, name=n, input_=i) + self.create_timer = lambda d: create_timer_task(state=self.histories,fire_at=d) self.call_activity_with_retry = \ lambda n, o, i=None: call_activity_with_retry_task( state=self.histories, @@ -67,6 +68,15 @@ def from_json(cls, json_string): """ json_dict = json.loads(json_string) return cls(**json_dict) + + def get_input(input: Any) -> Any: + """ + Returns + ------- + str + Returns the input parameters obtained in the context of a Azure Function call + """ + return input def call_activity(self, name: str, input_=None) -> Task: """Schedule an activity for execution. @@ -195,6 +205,7 @@ def current_utc_datetime(self) -> datetime: This date/time value is derived from the orchestration history. It always returns the same value at specific points in the orchestrator function code, making it deterministic and safe for replay. + :return: The current date/time in a way that is safe for use by orchestrator functions """ diff --git a/azure/durable_functions/models/OrchestratorState.py b/azure/durable_functions/models/OrchestratorState.py index 81a5a62e..e5643e7a 100644 --- a/azure/durable_functions/models/OrchestratorState.py +++ b/azure/durable_functions/models/OrchestratorState.py @@ -87,7 +87,10 @@ def _add_actions(self, json_dict): for action_list in self._actions: action_result_list = [] for action_obj in action_list: - action_result_list.append(action_obj.to_json()) + if type(action_obj) is tuple: + action_result_list.append(action_obj[0].to_json()) + else: + action_result_list.append(action_obj.to_json()) json_dict['actions'].append(action_result_list) def to_json_string(self) -> str: diff --git a/azure/durable_functions/models/Task.py b/azure/durable_functions/models/Task.py index cb86a619..96f0a2bc 100644 --- a/azure/durable_functions/models/Task.py +++ b/azure/durable_functions/models/Task.py @@ -6,8 +6,7 @@ class Task: """Represents some pending action. - Similar to a native JavaScript promise in - that it acts as a placeholder for outstanding asynchronous work, but has + Acts as a placeholder for outstanding asynchronous work, but has a synchronous implementation and is specific to Durable Functions. Tasks are only returned to an orchestration function when a diff --git a/azure/durable_functions/models/actions/CreateTimerAction.py b/azure/durable_functions/models/actions/CreateTimerAction.py new file mode 100644 index 00000000..14158362 --- /dev/null +++ b/azure/durable_functions/models/actions/CreateTimerAction.py @@ -0,0 +1,42 @@ +from typing import Any, Dict + +from .ActionType import ActionType +from ..utils.json_utils import add_attrib,add_datetime_attrib +import datetime + +class CreateTimerAction: + + """ + Defines the structure of the Create Timer object. + + Returns + ------- + Provides the information needed by the durable extension to be able to schedule the activity. + + Raises + ------ + ValueError + if the event fired is not of valid datetime object + """ + def __init__(self,fire_at: datetime,is_cancelled:bool=False): + self.action_type : ActionType = ActionType.CREATE_TIMER + self.fire_at:datetime = fire_at + self.is_cancelled: bool = is_cancelled + + if not isinstance(self.fire_at,datetime.date): + raise ValueError("fireAt: Expected valid datetime object but got ", self.fire_at) + + def to_json(self) -> Dict[str, Any]: + """ + Convert object into a json dictionary. + + Returns + ------- + Dict[str, Any] + The instance of the class converted into a json dictionary + """ + json_dict = {} + add_attrib(json_dict, self, 'action_type', 'actionType') + add_datetime_attrib(json_dict, self, 'fire_at', 'fireAt') + add_attrib(json_dict, self, 'is_cancelled', 'isCanceled') + return json_dict \ No newline at end of file diff --git a/azure/durable_functions/models/actions/__init__.py b/azure/durable_functions/models/actions/__init__.py index 2441e630..58183f60 100644 --- a/azure/durable_functions/models/actions/__init__.py +++ b/azure/durable_functions/models/actions/__init__.py @@ -2,10 +2,13 @@ from .ActionType import ActionType from .CallActivityAction import CallActivityAction from .CallActivityWithRetryAction import CallActivityWithRetryAction +from .CreateTimerAction import CreateTimerAction from .WaitForExternalEventAction import WaitForExternalEventAction + __all__ = [ 'ActionType', 'CallActivityAction', 'CallActivityWithRetryAction', + 'CreateTimerAction', 'WaitForExternalEventAction' ] diff --git a/azure/durable_functions/models/utils/json_utils.py b/azure/durable_functions/models/utils/json_utils.py index 1edc2e23..5597b247 100644 --- a/azure/durable_functions/models/utils/json_utils.py +++ b/azure/durable_functions/models/utils/json_utils.py @@ -1,4 +1,7 @@ from typing import Dict, Any +from tests.test_utils.constants import DATETIME_STRING_FORMAT + +from ...constants import DATETIME_STRING_FORMAT from ...constants import DATETIME_STRING_FORMAT @@ -52,3 +55,18 @@ def add_json_attrib(json_dict: Dict[str, Any], object_, """ if hasattr(object_, attribute_name): json_dict[alt_name or attribute_name] = getattr(object_, attribute_name).to_json() + +def add_datetime_attrib(json_dict: Dict[str, Any], object_, + attribute_name: str, alt_name: str = None): + """Add the value of the attribute from the object to the dictionary converted into a string. + + Parameters + ---------- + json_dict: The dictionary to add the attribute to + object_: The object to look for the attribute on + attribute_name: The name of the attribute to look for + alt_name: An alternate name to provide to the attribute in the in the dictionary + """ + if hasattr(object_, attribute_name): + json_dict[alt_name or attribute_name] = \ + getattr(object_, attribute_name).strftime(DATETIME_STRING_FORMAT) diff --git a/azure/durable_functions/tasks/__init__.py b/azure/durable_functions/tasks/__init__.py index af182221..0509ea78 100644 --- a/azure/durable_functions/tasks/__init__.py +++ b/azure/durable_functions/tasks/__init__.py @@ -4,6 +4,7 @@ from .task_all import task_all from .task_any import task_any from .task_utilities import should_suspend +from .create_timer import create_timer_task from .wait_for_external_event import wait_for_external_event_task __all__ = [ @@ -12,5 +13,6 @@ 'task_all', 'task_any', 'should_suspend', + 'create_timer_task' 'wait_for_external_event_task' ] diff --git a/azure/durable_functions/tasks/create_timer.py b/azure/durable_functions/tasks/create_timer.py new file mode 100644 index 00000000..27bafaca --- /dev/null +++ b/azure/durable_functions/tasks/create_timer.py @@ -0,0 +1,48 @@ +from typing import List, Any +from ..models.actions.CreateTimerAction import CreateTimerAction +from ..models.history import HistoryEvent +from .task_utilities import \ + find_task_retry_timer_created, find_task_retry_timer_fired, set_processed +import datetime +from .timer_task import TimerTask + +def create_timer_task(state: List[HistoryEvent], + fire_at: datetime) -> TimerTask: + """ + Durable Timers are used in orchestrator functions to implement delays or to + setup timeouts on async actions. + + Parameters + ---------- + state : List[HistoryEvent] + The list of history events to search to determine the current state of the activity + fire_at : datetime + The time interval to fire the timer trigger + + Returns + ------- + TimerTask + A Durable Timer Task that schedules the timer to wake up the activity + """ + + new_action = CreateTimerAction(fire_at) + + timer_created = find_task_retry_timer_created(state,fire_at) + timer_fired = find_task_retry_timer_fired(state,timer_created) + + set_processed([timer_created,timer_fired]) + + if timer_fired: + return TimerTask( + is_completed=True, + action=new_action, + timestamp=timer_fired["Timestamp"], + id_=timer_fired["TaskScheduledId"]) + else: + return TimerTask( + is_completed=False, + action=new_action, + timestamp=None, + id_=None) + + \ No newline at end of file diff --git a/azure/durable_functions/tasks/task_all.py b/azure/durable_functions/tasks/task_all.py index bbb09bf4..76f2595a 100644 --- a/azure/durable_functions/tasks/task_all.py +++ b/azure/durable_functions/tasks/task_all.py @@ -10,8 +10,9 @@ def task_all(tasks): all_actions = [] results = [] is_completed = True - complete_time = None faulted = [] + complete_time = None + for task in tasks: if isinstance(task, TaskSet): for action in task.actions: diff --git a/azure/durable_functions/tasks/task_utilities.py b/azure/durable_functions/tasks/task_utilities.py index 21a65038..c22695b7 100644 --- a/azure/durable_functions/tasks/task_utilities.py +++ b/azure/durable_functions/tasks/task_utilities.py @@ -1,6 +1,7 @@ -import json -from ..models.history import HistoryEventType - +import json, datetime +from ..models.history import HistoryEventType, HistoryEvent +from ..models.Task import Task +from typing import List def should_suspend(partial_result) -> bool: """Check the state of the result to determine if the orchestration should suspend.""" @@ -163,4 +164,4 @@ def set_processed(tasks): """ for task in tasks: if task is not None: - task.is_processed = True + task.is_processed = True \ No newline at end of file diff --git a/azure/durable_functions/tasks/timer_task.py b/azure/durable_functions/tasks/timer_task.py new file mode 100644 index 00000000..f70cc582 --- /dev/null +++ b/azure/durable_functions/tasks/timer_task.py @@ -0,0 +1,58 @@ +from ..models.Task import Task +import datetime + +class TimerTask(Task): + """ + Returned from DurableOrchestrationContext.createTimer if the call + is not 'yield-ed". Represents a pending timer. + All pending timers must be completed or canceled for an orchestration + to complete. + + Example: Cancel a timer + ``` + timeout_task = context.df.create_timer(expiration_date) + if not timeout_task.is_completed(): + timeout_task.cancel() + ``` + #TODO Write an example for create_timeout + """ + + def __init__(self, action ,is_completed, timestamp, id_): + self._action = action, + self._is_completed = is_completed, + self._timestamp = timestamp, + self._id = id_ + + # Task( + # is_completed=self._is_completed, + # is_faulted=False, + # action=self._action, + # result=None, + # timestamp=self._timestamp, + # id_=self._id) + + super().__init__(self._is_completed,False,self._action,None,self._timestamp,self._id,None) + + def is_cancelled() -> bool: + """ + Returns + ------- + bool: Whether or not the timer has been cancelled + + """ + return self._action.is_cancelled + + def cancel(): + """ + Indicates the timer should be cancelled. This request will execute on + the next `yield` or `return` statement + + Raises + ------ + ValueError + Raises an error if the task is already completed and an attempt is made to cancel it + """ + if not self._is_completed: + self._action.is_cancelled = True + else: + raise ValueError("Cannot cancel a completed task.") \ No newline at end of file