Skip to content

Commit

Permalink
Durable Entities (#184)
Browse files Browse the repository at this point in the history
Co-authored-by: Wenonah Zhang <[email protected]>
Co-authored-by: wenhzha <[email protected]>
  • Loading branch information
3 people authored Dec 3, 2020
1 parent d95c56d commit 46a0505
Show file tree
Hide file tree
Showing 39 changed files with 1,791 additions and 17 deletions.
6 changes: 6 additions & 0 deletions azure/durable_functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@
Exposes the different API components intended for public consumption
"""
from .orchestrator import Orchestrator
from .entity import Entity
from .models.utils.entity_utils import EntityId
from .models.DurableOrchestrationClient import DurableOrchestrationClient
from .models.DurableOrchestrationContext import DurableOrchestrationContext
from .models.DurableEntityContext import DurableEntityContext
from .models.RetryOptions import RetryOptions
from .models.TokenSource import ManagedIdentityTokenSource

__all__ = [
'Orchestrator',
'Entity',
'EntityId',
'DurableOrchestrationClient',
'DurableEntityContext',
'DurableOrchestrationContext',
'ManagedIdentityTokenSource',
'RetryOptions'
Expand Down
119 changes: 119 additions & 0 deletions azure/durable_functions/entity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from .models import DurableEntityContext
from .models.entities import OperationResult, EntityState
from datetime import datetime
from typing import Callable, Any, List, Dict

class InternalEntityException(Exception):
pass

class Entity:
"""Durable Entity Class.
Responsible for executing the user-defined entity function.
"""

def __init__(self, entity_func: Callable[[DurableEntityContext], None]):
"""Create a new entity for the user-defined entity.
Responsible for executing the user-defined entity function
Parameters
----------
entity_func: Callable[[DurableEntityContext], Generator[Any, Any, Any]]
The user defined entity function
"""
self.fn: Callable[[DurableEntityContext], None] = entity_func

def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) -> str:
"""Handle the execution of the user-defined entity function.
Loops over the batch, which serves to specify inputs to the entity,
and collects results and generates a final state, which are returned.
Parameters
----------
context: DurableEntityContext
The entity context of the entity, which the user interacts with as their Durable API
Returns
-------
str
A JSON-formatted string representing the output state, results, and exceptions for the
entity execution.
"""
response = EntityState(results=[], signals=[])
for operation_data in batch:
result: Any = None
is_error: bool = False
start_time: datetime = datetime.now()

try:
# populate context
operation = operation_data["name"]
if operation is None:
raise InternalEntityException("Durable Functions Internal Error: Entity operation was missing a name field")
context._operation = operation
context._input = operation_data["input"]
self.fn(context)
result = context._result

except InternalEntityException as e:
raise e

except Exception as e:
is_error = True
result = str(e)

duration: int = self._elapsed_milliseconds_since(start_time)
operation_result = OperationResult(
is_error=is_error,
duration=duration,
result=result
)
response.results.append(operation_result)

response.state = context._state
response.entity_exists = context._exists
return response.to_json_string()

@classmethod
def create(cls, fn: Callable[[DurableEntityContext], None]) -> Callable[[Any], str]:
"""Create an instance of the entity class.
Parameters
----------
fn (Callable[[DurableEntityContext], None]): [description]
Returns
-------
Callable[[Any], str]
Handle function of the newly created entity client
"""
def handle(context) -> str:
# It is not clear when the context JSON would be found
# inside a "body"-key, but this pattern matches the
# orchestrator implementation, so we keep it for safety.
context_body = getattr(context, "body", None)
if context_body is None:
context_body = context
ctx, batch = DurableEntityContext.from_json(context_body)
return Entity(fn).handle(ctx, batch)
return handle

def _elapsed_milliseconds_since(self, start_time: datetime) -> int:
"""Calculate the elapsed time, in milliseconds, from the start_time to the present.
Parameters
----------
start_time: datetime
The timestamp of when the entity began processing a batched request.
Returns
-------
int
The time, in millseconds, from start_time to now
"""
end_time = datetime.now()
time_diff = end_time - start_time
elapsed_time = int(time_diff.total_seconds() * 1000)
return elapsed_time
200 changes: 200 additions & 0 deletions azure/durable_functions/models/DurableEntityContext.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
from typing import Optional, Any, Dict, Tuple, List, Callable
from azure.functions._durable_functions import _deserialize_custom_object
import json


class DurableEntityContext:
"""Context of the durable entity context.
Describes the API used to specify durable entity user code.
"""

def __init__(self,
name: str,
key: str,
exists: bool,
state: Any):
"""Context of the durable entity context.
Describes the API used to specify durable entity user code.
Parameters
----------
name: str
The name of the Durable Entity
key: str
The key of the Durable Entity
exists: bool
Flag to determine if the entity exists
state: Any
The internal state of the Durable Entity
"""
self._entity_name: str = name
self._entity_key: str = key

self._exists: bool = exists
self._is_newly_constructed: bool = False

self._state: Any = state
self._input: Any = None
self._operation: Optional[str] = None
self._result: Any = None

@property
def entity_name(self) -> str:
"""Get the name of the Entity.
Returns
-------
str
The name of the entity
"""
return self._entity_name

@property
def entity_key(self) -> str:
"""Get the Entity key.
Returns
-------
str
The entity key
"""
return self._entity_key

@property
def operation_name(self) -> Optional[str]:
"""Get the current operation name.
Returns
-------
Optional[str]
The current operation name
"""
if self._operation is None:
raise Exception("Entity operation is unassigned")
return self._operation

@property
def is_newly_constructed(self) -> bool:
"""Determine if the Entity was newly constructed.
Returns
-------
bool
True if the Entity was newly constructed. False otherwise.
"""
# This is not updated at the moment, as its semantics are unclear
return self._is_newly_constructed

@classmethod
def from_json(cls, json_str: str) -> Tuple['DurableEntityContext', List[Dict[str, Any]]]:
"""Instantiate a DurableEntityContext from a JSON-formatted string.
Parameters
----------
json_string: str
A JSON-formatted string, returned by the durable-extension,
which represents the entity context
Returns
-------
DurableEntityContext
The DurableEntityContext originated from the input string
"""
json_dict = json.loads(json_str)
json_dict["name"] = json_dict["self"]["name"]
json_dict["key"] = json_dict["self"]["key"]
json_dict.pop("self")

serialized_state = json_dict["state"]
if serialized_state is not None:
json_dict["state"] = from_json_util(serialized_state)

batch = json_dict.pop("batch")
return cls(**json_dict), batch

def set_state(self, state: Any) -> None:
"""Set the state of the entity.
Parameter
---------
state: Any
The new state of the entity
"""
self._exists = True

# should only serialize the state at the end of the batch
self._state = state

def get_state(self, initializer: Optional[Callable[[], Any]] = None) -> Any:
"""Get the current state of this entity.
Parameters
----------
initializer: Optional[Callable[[], Any]]
A 0-argument function to provide an initial state. Defaults to None.
Returns
-------
Any
The current state of the entity
"""
state = self._state
if state is not None:
return state
elif initializer:
if not callable(initializer):
raise Exception("initializer argument needs to be a callable function")
state = initializer()
return state

def get_input(self) -> Any:
"""Get the input for this operation.
Returns
-------
Any
The input for the current operation
"""
input_ = None
req_input = self._input
req_input = json.loads(req_input)
input_ = None if req_input is None else from_json_util(req_input)
return input_

def set_result(self, result: Any) -> None:
"""Set the result (return value) of the entity.
Paramaters
----------
result: Any
The result / return value for the entity
"""
self._exists = True
self._result = result

def destruct_on_exit(self) -> None:
"""Delete this entity after the operation completes."""
self._exists = False
self._state = None

def from_json_util(self, json_str: str) -> Any:
"""Load an arbitrary datatype from its JSON representation.
The Out-of-proc SDK has a special JSON encoding strategy
to enable arbitrary datatypes to be serialized. This utility
loads a JSON with the assumption that it follows that encoding
method.
Parameters
----------
json_str: str
A JSON-formatted string, from durable-extension
Returns
-------
Any:
The original datatype that was serialized
"""
return json.loads(json_str, object_hook=_deserialize_custom_object)
Loading

0 comments on commit 46a0505

Please sign in to comment.