diff --git a/azure/durable_functions/models/TaskOrchestrationExecutor.py b/azure/durable_functions/models/TaskOrchestrationExecutor.py index 11ea74bb..1033b016 100644 --- a/azure/durable_functions/models/TaskOrchestrationExecutor.py +++ b/azure/durable_functions/models/TaskOrchestrationExecutor.py @@ -68,6 +68,22 @@ def execute(self, context: DurableOrchestrationContext, self.context = context evaluated_user_code = fn(context) + # The minimum History size is 2, in the shape: [OrchestratorStarted, ExecutionStarted]. + # At the start of replay, the `is_replaying` flag is determined from the + # ExecutionStarted event. + # For some reason, OrchestratorStarted does not update its `isPlayed` field. + if len(history) < 2: + err_message = "Internal Durable Functions error: "\ + + f"received History array of size {len(history)} "\ + + "when a minimum size of 2 is expected. "\ + + "Please report this issue at "\ + + "https://github.com/Azure/azure-functions-durable-python/issues." + raise Exception(err_message) + + # Set initial is_replaing state. + execution_started_event = history[1] + self.current_task.is_played = execution_started_event.is_played + # If user code is a generator, then it uses `yield` statements (the DF API) # and so we iterate through the DF history, generating tasks and populating # them with values when the history provides them diff --git a/tests/orchestrator/test_is_replaying_flag.py b/tests/orchestrator/test_is_replaying_flag.py index e3e1023d..141db672 100644 --- a/tests/orchestrator/test_is_replaying_flag.py +++ b/tests/orchestrator/test_is_replaying_flag.py @@ -47,7 +47,7 @@ def add_timer_action(state: OrchestratorState, fire_at: datetime): def test_is_replaying_initial_value(): - context_builder = ContextBuilder("") + context_builder = ContextBuilder("", is_replaying=False) result = get_orchestration_property( context_builder, generator_function, "durable_context") diff --git a/tests/orchestrator/test_sequential_orchestrator.py b/tests/orchestrator/test_sequential_orchestrator.py index c55e9fbb..42fbf65c 100644 --- a/tests/orchestrator/test_sequential_orchestrator.py +++ b/tests/orchestrator/test_sequential_orchestrator.py @@ -24,6 +24,17 @@ def generator_function(context): return outputs +def generator_function_is_replaying(context): + outputs = [] + + outputs.append(context.is_replaying) + yield context.call_activity("Hello", "Tokyo") + outputs.append(context.is_replaying) + yield context.call_activity("Hello", "Seattle") + outputs.append(context.is_replaying) + yield context.call_activity("Hello", "London") + return outputs + def generator_function_no_yield(context): outputs = [] @@ -150,11 +161,11 @@ def add_hello_action(state: OrchestratorState, input_: str): state.actions.append([action]) def add_hello_completed_events( - context_builder: ContextBuilder, id_: int, result: str): + context_builder: ContextBuilder, id_: int, result: str, is_played=False): context_builder.add_task_scheduled_event(name='Hello', id_=id_) context_builder.add_orchestrator_completed_event() context_builder.add_orchestrator_started_event() - context_builder.add_task_completed_event(id_=id_, result=result) + context_builder.add_task_completed_event(id_=id_, result=result, is_played=is_played) def add_hello_failed_events( @@ -286,6 +297,26 @@ def test_tokyo_and_seattle_and_london_state(): assert_valid_schema(result) assert_orchestration_state_equals(expected, result) +def test_sequential_is_replaying(): + context_builder = ContextBuilder('test_simple_function', is_replaying=True) + add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"", True) + add_hello_completed_events(context_builder, 1, "\"Hello Seattle!\"", True) + add_hello_completed_events(context_builder, 2, "\"Hello London!\"", True) + + result = get_orchestration_state_result( + context_builder, generator_function_is_replaying) + + expected_state = base_expected_state( + [True, True, True]) + add_hello_action(expected_state, 'Tokyo') + add_hello_action(expected_state, 'Seattle') + add_hello_action(expected_state, 'London') + expected_state._is_done = True + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) + def test_sequential_orchestration_no_yield(): context_builder = ContextBuilder('test_simple_function') add_hello_completed_events(context_builder, 0, "\"Hello London!\"") diff --git a/tests/test_utils/ContextBuilder.py b/tests/test_utils/ContextBuilder.py index 4c4fae00..cee972f2 100644 --- a/tests/test_utils/ContextBuilder.py +++ b/tests/test_utils/ContextBuilder.py @@ -14,7 +14,7 @@ class ContextBuilder: - def __init__(self, name: str="", increase_time: bool = True, starting_time: Optional[datetime] = None, replay_schema: ReplaySchema = ReplaySchema.V1): + def __init__(self, name: str="", increase_time: bool = True, starting_time: Optional[datetime] = None, is_replaying=False, replay_schema: ReplaySchema = ReplaySchema.V1): self.increase_time = increase_time self.instance_id = uuid.uuid4() self.is_replaying: bool = False @@ -28,7 +28,7 @@ def __init__(self, name: str="", increase_time: bool = True, starting_time: Opti self.upperSchemaVersion = replay_schema.value self.add_orchestrator_started_event() - self.add_execution_started_event(name) + self.add_execution_started_event(name, is_played=is_replaying) def get_base_event( self, event_type: HistoryEventType, id_: int = -1, @@ -87,8 +87,8 @@ def add_task_scheduled_event( event.Input_ = input_ self.history_events.append(event) - def add_task_completed_event(self, id_: int, result): - event = self.get_base_event(HistoryEventType.TASK_COMPLETED) + def add_task_completed_event(self, id_: int, result, is_played=False): + event = self.get_base_event(HistoryEventType.TASK_COMPLETED, is_played=is_played) event.Result = result event.TaskScheduledId = id_ self.history_events.append(event) @@ -116,8 +116,8 @@ def add_timer_fired_event(self, id_: int, fire_at: str, is_played: bool = True): self.history_events.append(event) def add_execution_started_event( - self, name: str, version: str = '', input_=None): - event = self.get_base_event(HistoryEventType.EXECUTION_STARTED, is_played=True) + self, name: str, version: str = '', input_=None, is_played=True): + event = self.get_base_event(HistoryEventType.EXECUTION_STARTED, is_played=is_played) event.orchestration_instance = OrchestrationInstance() self.instance_id = event.orchestration_instance.instance_id event.Name = name