Skip to content

Commit

Permalink
use last_state_change to figure out if a cancelled task needs an even…
Browse files Browse the repository at this point in the history
…t in the task service instead of with get_tasks and check when manually completing tasks w/ burnettk (#595)

Co-authored-by: jasquat <[email protected]>
  • Loading branch information
jasquat and jasquat authored Oct 26, 2023
1 parent 056da10 commit 71b0ff1
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1617,7 +1617,8 @@ def complete_task(self, spiff_task: SpiffTask, human_task: HumanTaskModel, user:
f"Cannot find a task with guid {self.process_instance_model.id} and task_id is {human_task.task_id}"
)

task_model.start_in_seconds = time.time()
run_started_at = time.time()
task_model.start_in_seconds = run_started_at
task_exception = None
task_event = ProcessInstanceEventType.task_completed.value
try:
Expand All @@ -1637,6 +1638,7 @@ def complete_task(self, spiff_task: SpiffTask, human_task: HumanTaskModel, user:
process_instance=self.process_instance_model,
serializer=self._serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
run_started_at=run_started_at,
)
task_service.update_task_model(task_model, spiff_task)
JsonDataModel.insert_or_update_json_data_records(task_service.json_data_dicts)
Expand All @@ -1657,7 +1659,13 @@ def complete_task(self, spiff_task: SpiffTask, human_task: HumanTaskModel, user:
spiff_task_to_process = spiff_task
if spiff_task_to_process.triggered is True:
spiff_task_to_process = spiff_task.parent
task_service.process_parents_and_children_and_save_to_database(spiff_task_to_process)

tasks_to_update = self.bpmn_process_instance.get_tasks(updated_ts=run_started_at)
for spiff_task_to_update in tasks_to_update:
if spiff_task_to_update.id != spiff_task.id:
task_service.update_task_model_with_spiff_task(spiff_task_to_update)

task_service.save_objects_to_database()

# this is the thing that actually commits the db transaction (on behalf of the other updates above as well)
self.save()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def __init__(
process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer,
bpmn_definition_to_task_definitions_mappings: dict,
run_started_at: float | None = None,
) -> None:
self.process_instance = process_instance
self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings
Expand All @@ -117,6 +118,8 @@ def __init__(
self.json_data_dicts: dict[str, JsonDataDict] = {}
self.process_instance_events: dict[str, ProcessInstanceEventModel] = {}

self.run_started_at: float | None = run_started_at

def save_objects_to_database(self, save_process_instance_events: bool = True) -> None:
db.session.bulk_save_objects(self.bpmn_processes.values())
db.session.bulk_save_objects(self.task_models.values())
Expand Down Expand Up @@ -208,8 +211,11 @@ def update_task_model_with_spiff_task(
task_model.start_in_seconds = start_and_end_times["start_in_seconds"]
task_model.end_in_seconds = start_and_end_times["end_in_seconds"]

# let failed tasks raise and we will log the event then
if task_model.state in ["COMPLETED", "CANCELLED"]:
# let failed tasks raise and we will log the event then.
# avoid creating events for the same state transition multiple times to avoid multiple cancelled events
if task_model.state in ["COMPLETED", "CANCELLED"] and (
self.run_started_at is None or spiff_task.last_state_change >= self.run_started_at
):
event_type = ProcessInstanceEventType.task_completed.value
if task_model.state == "CANCELLED":
event_type = ProcessInstanceEventType.task_cancelled.value
Expand Down Expand Up @@ -438,7 +444,6 @@ def add_tasks_to_bpmn_process(
if spiff_task.has_state(TaskState.PREDICTED_MASK):
self.__class__.remove_spiff_task_from_parent(spiff_task, self.task_models)
continue

task_model = TaskModel.query.filter_by(guid=task_id).first()
if task_model is None:
task_model = self.__class__._create_task(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,11 @@ def __init__(
self.spiff_tasks_to_process: set[UUID] = set()
self.spiff_task_timestamps: dict[UUID, StartAndEndTimes] = {}

self.run_started_at = time.time()

self.task_service = TaskService(
process_instance=self.process_instance,
serializer=self.serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
run_started_at=time.time(),
)

def will_complete_task(self, spiff_task: SpiffTask) -> None:
Expand Down Expand Up @@ -252,6 +251,7 @@ def add_object_to_db_session(self, bpmn_process_instance: BpmnWorkflow) -> None:
# 1ead87b4b496525df8cc0e27836c3e987d593dc0 if you are curious.
for waiting_spiff_task in bpmn_process_instance.get_tasks(
state=TaskState.WAITING
| TaskState.CANCELLED
| TaskState.READY
| TaskState.MAYBE
| TaskState.LIKELY
Expand All @@ -261,21 +261,6 @@ def add_object_to_db_session(self, bpmn_process_instance: BpmnWorkflow) -> None:
):
self.task_service.update_task_model_with_spiff_task(waiting_spiff_task)

# FIXME: this may have broken error boundary events getting cancelled.
# Getting all cancelled tasks to see if that fixes it
#
# only process cancelled tasks that were cancelled during this run
# NOTE: this could mean we do not add task models that we should be adding
# in which case we may have to remove the updated_ts filter here and
# instead just avoid creating the event in update_task_model_with_spiff_task
cancelled_spiff_tasks = bpmn_process_instance.get_tasks(
state=TaskState.CANCELLED # , updated_ts=self.run_started_at
)
for cancelled_spiff_task in cancelled_spiff_tasks:
self.task_service.update_task_model_with_spiff_task(
spiff_task=cancelled_spiff_task,
)

self.task_service.save_objects_to_database()

if self.secondary_engine_step_delegate:
Expand Down

0 comments on commit 71b0ff1

Please sign in to comment.