Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DEPRECATED PR] Create Timer Implementation #39

Closed
wants to merge 56 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
bf05733
create timer first pass
priyaananthasankar Jan 30, 2020
71a5ad6
fix bugs to make waitForExternalEvent working
Jan 22, 2020
9177fe2
add waitforexternalevent samples
Jan 22, 2020
f55748f
minor fixes(variable name, delete comment)
Jan 25, 2020
406334d
flake8 fixes
Jan 25, 2020
8601b74
add docstrings
Jan 25, 2020
9b9af77
implement task_any function
Jan 25, 2020
2ed3d79
unittest for waitforexternalevents
Jan 27, 2020
6a2d066
fix bugs after merging dev
Jan 27, 2020
198e54f
fix flake8
Jan 27, 2020
eb47bc0
Base implementation of tests
scgbear Jan 27, 2020
3e76499
parrot values success
scgbear Jan 27, 2020
76fdcfc
test full complete flow
scgbear Jan 27, 2020
3b2b68a
test failed scenario
scgbear Jan 27, 2020
9d40be5
docstring to numpy format
Jan 28, 2020
f20a2e0
minor changes (rename, remove logging)
Jan 28, 2020
5d1115b
unittest for task_any, added tasks_test_utils
Jan 28, 2020
cb29fee
add class __eq__ function for Waitforexternalevent actions
Jan 28, 2020
abf0166
add samples readme doc
Jan 28, 2020
bd19b1c
fix flake8
Jan 28, 2020
12004fe
Refactoring HistoryEvent
scgbear Jan 28, 2020
c2fc638
add docstrings for HistoryEvent class
scgbear Jan 28, 2020
bf1e2e1
Refactor json conversion
scgbear Jan 28, 2020
a0fa70f
simple Fan out fan in sample
scgbear Jan 28, 2020
c0d8681
Fix flake errors
scgbear Jan 28, 2020
b927c04
Remove local debugging bits
scgbear Jan 28, 2020
5b8b6d0
remove state in task_any
Jan 29, 2020
75010c3
add handle faulted task_any case +unittest
Jan 29, 2020
74305c3
Undo De Morgan's Law
scgbear Jan 29, 2020
6ab0dc7
replace filters with list comprehension
scgbear Jan 29, 2020
47f0cb8
Add documentation for tracking API implementation
scgbear Jan 29, 2020
606e80a
move datetime format string to azure package
scgbear Jan 29, 2020
cba9740
replace filter with list comprehension
scgbear Jan 30, 2020
77ee682
remove extra zimezone from format
scgbear Jan 30, 2020
8509c39
Push context initialization our of handle method
scgbear Jan 30, 2020
3c2ba43
able to pass in tasksets to task_any and task_all
Feb 3, 2020
db21a28
update unittest for adding timestamp to taskset, add unittest for pas…
Feb 3, 2020
580285f
fix bugs in task_all(when all tasks fail), and fix unittest for that …
Feb 3, 2020
2af63d5
fix flake8
Feb 3, 2020
77331d6
test from orchestrator level(draft)
Feb 4, 2020
ad82120
Remove IFunctionContext abstraction
scgbear Feb 5, 2020
ddcfc67
Starting of schema validation bits
scgbear Feb 6, 2020
8950fe7
wire up schema validation into the orchestrator tests
scgbear Feb 6, 2020
6cdba74
Test commit
scgbear Feb 7, 2020
5d2957c
fix flake 8 issues
scgbear Feb 7, 2020
8ca4aad
fix pytest, remove task_any_tests from orchestrator level
Feb 7, 2020
7673e16
fix flake8
Feb 7, 2020
f7a9af8
create timer first pass
priyaananthasankar Jan 30, 2020
0955ce2
rebased
priyaananthasankar Feb 12, 2020
060df32
rebased
priyaananthasankar Feb 12, 2020
d3ff8f9
rebased
priyaananthasankar Feb 12, 2020
26fd2b4
Merge branch 'dev' into monitoring
priyaananthasankar Feb 12, 2020
492e580
rebased
priyaananthasankar Feb 12, 2020
2dc5de1
Merge branch 'monitoring' of https://github.com/priyaananthasankar/az…
priyaananthasankar Feb 12, 2020
6121c6c
rebased
priyaananthasankar Feb 12, 2020
d3e9cab
rebased
priyaananthasankar Feb 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion azure/durable_functions/models/DurableOrchestrationContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ..interfaces import IAction
from ..models.Task import Task
from ..tasks import call_activity_task, task_all, task_any, call_activity_with_retry_task, \
wait_for_external_event_task
wait_for_external_event_task,create_timer_task


class DurableOrchestrationContext:
Expand All @@ -30,6 +30,7 @@ def __init__(self,
state=self.histories,
name=n,
input_=i)
self.create_timer = lambda d: create_timer_task(state=self.histories,fire_at=d)
self.call_activity_with_retry = \
lambda n, o, i=None: call_activity_with_retry_task(
state=self.histories,
Expand Down Expand Up @@ -67,6 +68,15 @@ def from_json(cls, json_string):
"""
json_dict = json.loads(json_string)
return cls(**json_dict)

def get_input(input: Any) -> Any:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you're missing the lambda expression that pulls the input value out of the context string passed in. Also I've been separating the implementation of the functions from the context. Two reasons, 1. the functions aren't dependent on any of the class state, you'll get all sorts of squiggly lines all over this function in PyCharm wanting you to either mark it as a classmethod, or pull it out of the class. 2. separation of concerns, this functions can be tested without the DurableOrchestrationContext object created. This it is a small function all together, but I'd rather stick with the convention of separation. Helps the reader to know that they can look in the tasks package to see the actual implementation of all of the functions, instead of some in the tasks package, some directly implemented in the DurableOrchestrationContext module.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

"""
Returns
-------
str
Returns the input parameters obtained in the context of a Azure Function call
"""
return input

def call_activity(self, name: str, input_=None) -> Task:
"""Schedule an activity for execution.
Expand Down Expand Up @@ -195,6 +205,7 @@ def current_utc_datetime(self) -> datetime:
This date/time value is derived from the orchestration history. It
always returns the same value at specific points in the orchestrator
function code, making it deterministic and safe for replay.

:return: The current date/time in a way that is safe for use by
orchestrator functions
"""
Expand Down
5 changes: 4 additions & 1 deletion azure/durable_functions/models/OrchestratorState.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ def _add_actions(self, json_dict):
for action_list in self._actions:
action_result_list = []
for action_obj in action_list:
action_result_list.append(action_obj.to_json())
if type(action_obj) is tuple:
action_result_list.append(action_obj[0].to_json())
else:
action_result_list.append(action_obj.to_json())
json_dict['actions'].append(action_result_list)

def to_json_string(self) -> str:
Expand Down
3 changes: 1 addition & 2 deletions azure/durable_functions/models/Task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
class Task:
"""Represents some pending action.

Similar to a native JavaScript promise in
that it acts as a placeholder for outstanding asynchronous work, but has
Acts as a placeholder for outstanding asynchronous work, but has
a synchronous implementation and is specific to Durable Functions.

Tasks are only returned to an orchestration function when a
Expand Down
42 changes: 42 additions & 0 deletions azure/durable_functions/models/actions/CreateTimerAction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import Any, Dict

from .ActionType import ActionType
from ..utils.json_utils import add_attrib,add_datetime_attrib
import datetime

class CreateTimerAction:

"""
Defines the structure of the Create Timer object.

Returns
-------
Provides the information needed by the durable extension to be able to schedule the activity.

Raises
------
ValueError
if the event fired is not of valid datetime object
"""
def __init__(self,fire_at: datetime,is_cancelled:bool=False):
self.action_type : ActionType = ActionType.CREATE_TIMER
self.fire_at:datetime = fire_at
self.is_cancelled: bool = is_cancelled

if not isinstance(self.fire_at,datetime.date):
raise ValueError("fireAt: Expected valid datetime object but got ", self.fire_at)

def to_json(self) -> Dict[str, Any]:
"""
Convert object into a json dictionary.

Returns
-------
Dict[str, Any]
The instance of the class converted into a json dictionary
"""
json_dict = {}
add_attrib(json_dict, self, 'action_type', 'actionType')
add_datetime_attrib(json_dict, self, 'fire_at', 'fireAt')
add_attrib(json_dict, self, 'is_cancelled', 'isCanceled')
return json_dict
3 changes: 3 additions & 0 deletions azure/durable_functions/models/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
from .ActionType import ActionType
from .CallActivityAction import CallActivityAction
from .CallActivityWithRetryAction import CallActivityWithRetryAction
from .CreateTimerAction import CreateTimerAction
from .WaitForExternalEventAction import WaitForExternalEventAction

__all__ = [
'ActionType',
'CallActivityAction',
'CallActivityWithRetryAction',
'CreateTimerAction',
'WaitForExternalEventAction'
]
18 changes: 18 additions & 0 deletions azure/durable_functions/models/utils/json_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import Dict, Any
from tests.test_utils.constants import DATETIME_STRING_FORMAT
priyaananthasankar marked this conversation as resolved.
Show resolved Hide resolved

from ...constants import DATETIME_STRING_FORMAT

from ...constants import DATETIME_STRING_FORMAT

Expand Down Expand Up @@ -52,3 +55,18 @@ def add_json_attrib(json_dict: Dict[str, Any], object_,
"""
if hasattr(object_, attribute_name):
json_dict[alt_name or attribute_name] = getattr(object_, attribute_name).to_json()

def add_datetime_attrib(json_dict: Dict[str, Any], object_,
attribute_name: str, alt_name: str = None):
"""Add the value of the attribute from the object to the dictionary converted into a string.

Parameters
----------
json_dict: The dictionary to add the attribute to
object_: The object to look for the attribute on
attribute_name: The name of the attribute to look for
alt_name: An alternate name to provide to the attribute in the in the dictionary
"""
if hasattr(object_, attribute_name):
json_dict[alt_name or attribute_name] = \
getattr(object_, attribute_name).strftime(DATETIME_STRING_FORMAT)
2 changes: 2 additions & 0 deletions azure/durable_functions/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .task_all import task_all
from .task_any import task_any
from .task_utilities import should_suspend
from .create_timer import create_timer_task
from .wait_for_external_event import wait_for_external_event_task

__all__ = [
Expand All @@ -12,5 +13,6 @@
'task_all',
'task_any',
'should_suspend',
'create_timer_task'
priyaananthasankar marked this conversation as resolved.
Show resolved Hide resolved
'wait_for_external_event_task'
]
48 changes: 48 additions & 0 deletions azure/durable_functions/tasks/create_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import List, Any
from ..models.actions.CreateTimerAction import CreateTimerAction
from ..models.history import HistoryEvent
from .task_utilities import \
find_task_retry_timer_created, find_task_retry_timer_fired, set_processed
import datetime
from .timer_task import TimerTask

def create_timer_task(state: List[HistoryEvent],
fire_at: datetime) -> TimerTask:
"""
Durable Timers are used in orchestrator functions to implement delays or to
setup timeouts on async actions.

Parameters
----------
state : List[HistoryEvent]
The list of history events to search to determine the current state of the activity
fire_at : datetime
The time interval to fire the timer trigger

Returns
-------
TimerTask
A Durable Timer Task that schedules the timer to wake up the activity
"""

new_action = CreateTimerAction(fire_at)

timer_created = find_task_retry_timer_created(state,fire_at)
timer_fired = find_task_retry_timer_fired(state,timer_created)

set_processed([timer_created,timer_fired])

if timer_fired:
return TimerTask(
is_completed=True,
action=new_action,
timestamp=timer_fired["Timestamp"],
id_=timer_fired["TaskScheduledId"])
else:
return TimerTask(
is_completed=False,
action=new_action,
timestamp=None,
id_=None)


3 changes: 2 additions & 1 deletion azure/durable_functions/tasks/task_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ def task_all(tasks):
all_actions = []
results = []
is_completed = True
complete_time = None
faulted = []
complete_time = None

for task in tasks:
if isinstance(task, TaskSet):
for action in task.actions:
Expand Down
9 changes: 5 additions & 4 deletions azure/durable_functions/tasks/task_utilities.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
from ..models.history import HistoryEventType

import json, datetime
from ..models.history import HistoryEventType, HistoryEvent
from ..models.Task import Task
from typing import List

def should_suspend(partial_result) -> bool:
"""Check the state of the result to determine if the orchestration should suspend."""
Expand Down Expand Up @@ -163,4 +164,4 @@ def set_processed(tasks):
"""
for task in tasks:
if task is not None:
task.is_processed = True
task.is_processed = True
58 changes: 58 additions & 0 deletions azure/durable_functions/tasks/timer_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from ..models.Task import Task
import datetime

class TimerTask(Task):
"""
Returned from DurableOrchestrationContext.createTimer if the call
is not 'yield-ed". Represents a pending timer.
All pending timers must be completed or canceled for an orchestration
to complete.

Example: Cancel a timer
```
timeout_task = context.df.create_timer(expiration_date)
if not timeout_task.is_completed():
timeout_task.cancel()
```
#TODO Write an example for create_timeout
"""

def __init__(self, action ,is_completed, timestamp, id_):
self._action = action,
self._is_completed = is_completed,
self._timestamp = timestamp,
self._id = id_

# Task(
priyaananthasankar marked this conversation as resolved.
Show resolved Hide resolved
# is_completed=self._is_completed,
# is_faulted=False,
# action=self._action,
# result=None,
# timestamp=self._timestamp,
# id_=self._id)

super().__init__(self._is_completed,False,self._action,None,self._timestamp,self._id,None)

def is_cancelled() -> bool:
"""
Returns
-------
bool: Whether or not the timer has been cancelled

"""
return self._action.is_cancelled

def cancel():
"""
Indicates the timer should be cancelled. This request will execute on
the next `yield` or `return` statement

Raises
------
ValueError
Raises an error if the task is already completed and an attempt is made to cancel it
"""
if not self._is_completed:
self._action.is_cancelled = True
else:
raise ValueError("Cannot cancel a completed task.")