Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update isReplaying flag at beginning of replay #390

Merged
merged 11 commits into from
Jul 19, 2022
16 changes: 16 additions & 0 deletions azure/durable_functions/models/TaskOrchestrationExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,22 @@ def execute(self, context: DurableOrchestrationContext,
self.context = context
evaluated_user_code = fn(context)

# The minimum History size is 2, in the shape: [OrchestratorStarted, ExecutionStarted].
# At the start of replay, the `is_replaying` flag is determined from the
# ExecutionStarted event.
# For some reason, OrchestratorStarted does not update its `isPlayed` field.
if len(history) < 2:
err_message = "Internal Durable Functions error: "\
+ f"received History array of size {len(history)} "\
+ "when a minimum size of 2 is expected. "\
+ "Please report this issue at "\
+ "https://github.com/Azure/azure-functions-durable-python/issues."
raise Exception(err_message)

# Set initial is_replaing state.
execution_started_event = history[1]
self.current_task.is_played = execution_started_event.is_played

# If user code is a generator, then it uses `yield` statements (the DF API)
# and so we iterate through the DF history, generating tasks and populating
# them with values when the history provides them
Expand Down
2 changes: 1 addition & 1 deletion tests/orchestrator/test_is_replaying_flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def add_timer_action(state: OrchestratorState, fire_at: datetime):

def test_is_replaying_initial_value():

context_builder = ContextBuilder("")
context_builder = ContextBuilder("", is_replaying=False)
result = get_orchestration_property(
context_builder, generator_function, "durable_context")

Expand Down
35 changes: 33 additions & 2 deletions tests/orchestrator/test_sequential_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ def generator_function(context):

return outputs

def generator_function_is_replaying(context):
outputs = []

outputs.append(context.is_replaying)
yield context.call_activity("Hello", "Tokyo")
outputs.append(context.is_replaying)
yield context.call_activity("Hello", "Seattle")
outputs.append(context.is_replaying)
yield context.call_activity("Hello", "London")
return outputs

def generator_function_no_yield(context):
outputs = []

Expand Down Expand Up @@ -150,11 +161,11 @@ def add_hello_action(state: OrchestratorState, input_: str):
state.actions.append([action])

def add_hello_completed_events(
context_builder: ContextBuilder, id_: int, result: str):
context_builder: ContextBuilder, id_: int, result: str, is_played=False):
context_builder.add_task_scheduled_event(name='Hello', id_=id_)
context_builder.add_orchestrator_completed_event()
context_builder.add_orchestrator_started_event()
context_builder.add_task_completed_event(id_=id_, result=result)
context_builder.add_task_completed_event(id_=id_, result=result, is_played=is_played)


def add_hello_failed_events(
Expand Down Expand Up @@ -286,6 +297,26 @@ def test_tokyo_and_seattle_and_london_state():
assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_sequential_is_replaying():
context_builder = ContextBuilder('test_simple_function', is_replaying=True)
add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"", True)
add_hello_completed_events(context_builder, 1, "\"Hello Seattle!\"", True)
add_hello_completed_events(context_builder, 2, "\"Hello London!\"", True)

result = get_orchestration_state_result(
context_builder, generator_function_is_replaying)

expected_state = base_expected_state(
[True, True, True])
add_hello_action(expected_state, 'Tokyo')
add_hello_action(expected_state, 'Seattle')
add_hello_action(expected_state, 'London')
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_sequential_orchestration_no_yield():
context_builder = ContextBuilder('test_simple_function')
add_hello_completed_events(context_builder, 0, "\"Hello London!\"")
Expand Down
12 changes: 6 additions & 6 deletions tests/test_utils/ContextBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


class ContextBuilder:
def __init__(self, name: str="", increase_time: bool = True, starting_time: Optional[datetime] = None, replay_schema: ReplaySchema = ReplaySchema.V1):
def __init__(self, name: str="", increase_time: bool = True, starting_time: Optional[datetime] = None, is_replaying=False, replay_schema: ReplaySchema = ReplaySchema.V1):
self.increase_time = increase_time
self.instance_id = uuid.uuid4()
self.is_replaying: bool = False
Expand All @@ -28,7 +28,7 @@ def __init__(self, name: str="", increase_time: bool = True, starting_time: Opti
self.upperSchemaVersion = replay_schema.value

self.add_orchestrator_started_event()
self.add_execution_started_event(name)
self.add_execution_started_event(name, is_played=is_replaying)

def get_base_event(
self, event_type: HistoryEventType, id_: int = -1,
Expand Down Expand Up @@ -87,8 +87,8 @@ def add_task_scheduled_event(
event.Input_ = input_
self.history_events.append(event)

def add_task_completed_event(self, id_: int, result):
event = self.get_base_event(HistoryEventType.TASK_COMPLETED)
def add_task_completed_event(self, id_: int, result, is_played=False):
event = self.get_base_event(HistoryEventType.TASK_COMPLETED, is_played=is_played)
event.Result = result
event.TaskScheduledId = id_
self.history_events.append(event)
Expand Down Expand Up @@ -116,8 +116,8 @@ def add_timer_fired_event(self, id_: int, fire_at: str, is_played: bool = True):
self.history_events.append(event)

def add_execution_started_event(
self, name: str, version: str = '', input_=None):
event = self.get_base_event(HistoryEventType.EXECUTION_STARTED, is_played=True)
self, name: str, version: str = '', input_=None, is_played=True):
event = self.get_base_event(HistoryEventType.EXECUTION_STARTED, is_played=is_played)
event.orchestration_instance = OrchestrationInstance()
self.instance_id = event.orchestration_instance.instance_id
event.Name = name
Expand Down