Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvement/flexible task iteration #352

Merged
merged 11 commits into from
Sep 20, 2023
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/serializer/migration/version_1_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from datetime import datetime, timedelta

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.bpmn.specs.event_definitions.timer import LOCALTZ

from .exceptions import VersionMigrationError
Expand Down
19 changes: 9 additions & 10 deletions SpiffWorkflow/bpmn/specs/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# 02110-1301 USA

from SpiffWorkflow.exceptions import WorkflowException
from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState, TaskFilter
from SpiffWorkflow.specs.StartTask import StartTask
from SpiffWorkflow.specs.Join import Join

Expand All @@ -40,15 +40,15 @@ def _predict_hook(self, my_task):
# Events attached to the main task might occur
my_task._sync_children(self.outputs, state=TaskState.MAYBE)
# The main child's state is based on this task's state
state = TaskState.FUTURE if my_task._is_definite() else my_task.state
state = TaskState.FUTURE if my_task.has_state(TaskState.DEFINITE_MASK) else my_task.state
for child in my_task.children:
if not isinstance(child.task_spec, BoundaryEvent):
child._set_state(state)

def _update_hook(self, my_task):
super()._update_hook(my_task)
for task in my_task.children:
if isinstance(task.task_spec, BoundaryEvent) and task._is_predicted():
if isinstance(task.task_spec, BoundaryEvent) and task.has_state(TaskState.PREDICTED_MASK):
task._set_state(TaskState.WAITING)
task.task_spec._predict(task)
return True
Expand All @@ -61,9 +61,8 @@ def __init__(self, wf_spec, name, **kwargs):
super().__init__(wf_spec, name, **kwargs)

def _check_threshold_structured(self, my_task, force=False):
# Retrieve a list of all activated tasks from the associated
# task that did the conditional parallel split.
split_task = my_task._find_ancestor_from_name(self.split_task)
# Retrieve a list of all activated tasks from the associated task that did the conditional parallel split.
split_task = my_task.find_ancestor(self.split_task)
if split_task is None:
raise WorkflowException(f'Split at {self.split_task} was not reached', task_spec=self)

Expand All @@ -79,8 +78,8 @@ def _check_threshold_structured(self, my_task, force=False):
if main is None:
raise WorkflowException(f'No main task found', task_spec=self)

interrupt = any([t._has_state(TaskState.READY|TaskState.COMPLETED) for t in interrupting])
finished = main._is_finished() or interrupt
interrupt = any([t.has_state(TaskState.READY|TaskState.COMPLETED) for t in interrupting])
finished = main.has_state(TaskState.FINISHED_MASK) or interrupt
if finished:
cancel = [t for t in interrupting + noninterrupting if t.state == TaskState.WAITING]
if interrupt:
Expand All @@ -96,7 +95,7 @@ def _check_threshold_unstructured(self, my_task, force=False):
# Look at the tree to find all ready and waiting tasks (excluding
# ourself). The EndJoin waits for everyone!
waiting_tasks = []
for task in my_task.workflow.get_tasks(TaskState.READY | TaskState.WAITING):
for task in my_task.workflow.get_tasks(task_filter=TaskFilter(state=TaskState.READY|TaskState.WAITING)):
if task.thread_id != my_task.thread_id:
continue
if task.task_spec == my_task.task_spec:
Expand All @@ -108,4 +107,4 @@ def _check_threshold_unstructured(self, my_task, force=False):
def _run_hook(self, my_task):
result = super(_EndJoin, self)._run_hook(my_task)
my_task.workflow.data.update(my_task.data)
return result
return result
4 changes: 2 additions & 2 deletions SpiffWorkflow/bpmn/specs/mixins/events/end_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState, TaskFilter
from SpiffWorkflow.bpmn.specs.event_definitions.simple import TerminateEventDefinition, CancelEventDefinition
from .event_types import ThrowingEvent

Expand Down Expand Up @@ -49,7 +49,7 @@ def _on_complete_hook(self, my_task):

# We are finished. Set the workflow data and cancel all tasks
my_task.workflow.set_data(**my_task.data)
for task in my_task.workflow.get_tasks(TaskState.NOT_FINISHED_MASK):
for task in my_task.workflow.get_tasks(task_filter=TaskFilter(state=TaskState.NOT_FINISHED_MASK)):
task.cancel()

elif isinstance(self.event_definition, CancelEventDefinition):
Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/specs/mixins/events/event_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA
import time
from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.specs.base import TaskSpec

from SpiffWorkflow.bpmn.specs.event_definitions.simple import NoneEventDefinition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState
from .event_types import ThrowingEvent, CatchingEvent


Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/specs/mixins/events/start_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState
from .event_types import CatchingEvent


Expand Down
9 changes: 5 additions & 4 deletions SpiffWorkflow/bpmn/specs/mixins/inclusive_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# 02110-1301 USA

from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException
from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState, TaskFilter
from SpiffWorkflow.specs.MultiChoice import MultiChoice
from .unstructured_join import UnstructuredJoin

Expand Down Expand Up @@ -70,7 +70,7 @@ def test(self):

def _check_threshold_unstructured(self, my_task, force=False):
# Look at the tree to find all places where this task is used.
tasks = my_task.workflow.get_tasks_from_spec_name(self.name)
tasks = my_task.workflow.get_tasks(task_filter=TaskFilter(spec_name=self.name))

# Look up which tasks have parents completed.
completed_inputs = set([ task.parent.task_spec for task in tasks if task.parent.state == TaskState.COMPLETED ])
Expand All @@ -80,7 +80,7 @@ def _check_threshold_unstructured(self, my_task, force=False):
# A spec only has to complete once, even if on multiple paths
waiting_tasks = []
for task in tasks:
if task.parent._has_state(TaskState.DEFINITE_MASK) and task.parent.task_spec not in completed_inputs:
if task.parent.has_state(TaskState.DEFINITE_MASK) and task.parent.task_spec not in completed_inputs:
waiting_tasks.append(task.parent)

if force:
Expand All @@ -92,7 +92,8 @@ def _check_threshold_unstructured(self, my_task, force=False):
else:
# Handle the case where there are paths from active tasks that must go through waiting inputs
waiting_inputs = [i for i in self.inputs if i not in completed_inputs]
sources = [t.task_spec for t in my_task.workflow.get_tasks(TaskState.READY | TaskState.WAITING)]
task_filter = TaskFilter(state=TaskState.READY|TaskState.WAITING)
sources = [t.task_spec for t in my_task.workflow.get_tasks(task_filter=task_filter)]

# This will go back through a task spec's ancestors and return the source, if applicable
def check(spec):
Expand Down
9 changes: 5 additions & 4 deletions SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from copy import deepcopy
from collections.abc import Iterable, Sequence, Mapping, MutableSequence, MutableMapping

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.specs.base import TaskSpec
from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.util.deep_merge import DeepMerge
from SpiffWorkflow.bpmn.specs.bpmn_task_spec import BpmnTaskSpec
from SpiffWorkflow.bpmn.exceptions import WorkflowDataException
Expand All @@ -36,10 +37,10 @@ def process_children(self, my_task):
merged = self._merged_children(my_task)
child_running = False
for child in self._instances(my_task):
if child._has_state(TaskState.FINISHED_MASK) and str(child.id) not in merged:
if child.has_state(TaskState.FINISHED_MASK) and str(child.id) not in merged:
self.child_completed_action(my_task, child)
merged.append(str(child.id))
elif not child._has_state(TaskState.FINISHED_MASK):
elif not child.has_state(TaskState.FINISHED_MASK):
child_running = True
my_task.internal_data['merged'] = merged
return child_running
Expand Down Expand Up @@ -134,7 +135,7 @@ def task_info(self, my_task):
for task in self._instances(my_task):
key_or_index = task.internal_data.get('key_or_index')
value = task.internal_data.get('item') if key_or_index is None else key_or_index
if task._has_state(TaskState.FINISHED_MASK):
if task.has_state(TaskState.FINISHED_MASK):
info['completed'].append(value)
else:
info['running'].append(value)
Expand Down
8 changes: 4 additions & 4 deletions SpiffWorkflow/bpmn/specs/mixins/parallel_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState, TaskFilter
from .unstructured_join import UnstructuredJoin


Expand All @@ -43,7 +43,7 @@ class ParallelGateway(UnstructuredJoin):
"""
def _check_threshold_unstructured(self, my_task, force=False):

tasks = my_task.workflow.get_tasks_from_spec_name(self.name)
tasks = my_task.workflow.get_tasks(task_filter=TaskFilter(spec_name=self.name))
# Look up which tasks have parents completed.
waiting_tasks = []
waiting_inputs = set(self.inputs)
Expand All @@ -59,10 +59,10 @@ def remove_ancestor(task):
if task.parent.state == TaskState.COMPLETED and task.parent.task_spec in waiting_inputs:
waiting_inputs.remove(task.parent.task_spec)
# Do not wait for descendants of this task
elif task._is_descendant_of(my_task):
elif task.is_descendant_of(my_task):
remove_ancestor(task)
# Ignore predicted tasks; we don't care about anything not definite
elif task.parent._has_state(TaskState.DEFINITE_MASK):
elif task.parent.has_state(TaskState.DEFINITE_MASK):
waiting_tasks.append(task.parent)

return force or len(waiting_inputs) == 0, waiting_tasks
22 changes: 11 additions & 11 deletions SpiffWorkflow/bpmn/specs/mixins/subworkflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from copy import deepcopy

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.specs.base import TaskSpec
from SpiffWorkflow.bpmn.exceptions import WorkflowDataException

Expand Down Expand Up @@ -64,8 +64,8 @@ def copy_data(self, my_task, subworkflow):
# But our data management is already hopelessly messed up and in dire needs of reconsideration
if len(subworkflow.spec.data_objects) > 0:
subworkflow.data = my_task.workflow.data
start = subworkflow.get_tasks_from_spec_name('Start')
start[0].set_data(**my_task.data)
start = subworkflow.get_next_task(spec_name='Start')
start.set_data(**my_task.data)

def update_data(self, my_task, subworkflow):
my_task.data = deepcopy(subworkflow.last_task.data)
Expand All @@ -77,8 +77,8 @@ def create_workflow(self, my_task):
def start_workflow(self, my_task):
subworkflow = my_task.workflow.top_workflow.get_subprocess(my_task)
self.copy_data(my_task, subworkflow)
for child in subworkflow.task_tree.children:
child.task_spec._update(child)
start = subworkflow.get_next_task(spec_name='Start')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so so so much cleaner. What an improvement!

start.run()
my_task._set_state(TaskState.WAITING)


Expand All @@ -89,10 +89,10 @@ def __init__(self, wf_spec, bpmn_id, subworkflow_spec, **kwargs):

def copy_data(self, my_task, subworkflow):

start = subworkflow.get_tasks_from_spec_name('Start')
start = subworkflow.get_next_task(spec_name='Start')
if subworkflow.spec.io_specification is None or len(subworkflow.spec.io_specification.data_inputs) == 0:
# Copy all task data into start task if no inputs specified
start[0].set_data(**my_task.data)
start.set_data(**my_task.data)
else:
# Otherwise copy only task data with the specified names
for var in subworkflow.spec.io_specification.data_inputs:
Expand All @@ -102,24 +102,24 @@ def copy_data(self, my_task, subworkflow):
task=my_task,
data_input=var,
)
start[0].data[var.bpmn_id] = my_task.data[var.bpmn_id]
start.data[var.bpmn_id] = my_task.data[var.bpmn_id]

def update_data(self, my_task, subworkflow):

if subworkflow.spec.io_specification is None or len(subworkflow.spec.io_specification.data_outputs) == 0:
# Copy all workflow data if no outputs are specified
my_task.data = deepcopy(subworkflow.last_task.data)
else:
end = subworkflow.get_tasks_from_spec_name('End')
end = subworkflow.get_next_task(spec_name='End')
# Otherwise only copy data with the specified names
for var in subworkflow.spec.io_specification.data_outputs:
if var.bpmn_id not in end[0].data:
if var.bpmn_id not in end.data:
raise WorkflowDataException(
"The Data Output was not available in the subprocess output.",
task=my_task,
data_output=var,
)
my_task.data[var.bpmn_id] = end[0].data[var.bpmn_id]
my_task.data[var.bpmn_id] = end.data[var.bpmn_id]


class TransactionSubprocess(SubWorkflowTask):
Expand Down
8 changes: 4 additions & 4 deletions SpiffWorkflow/bpmn/specs/mixins/unstructured_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState, TaskIterator
from SpiffWorkflow.specs.Join import Join


Expand All @@ -36,14 +36,14 @@ def _do_join(self, my_task):
# to build the task tree underneath the most recently changed task.
last_changed = None
thread_tasks = []
for task in split_task._find_any(self):
for task in TaskIterator(split_task, spec_name=self.name):
if task.thread_id != my_task.thread_id:
# Ignore tasks from other threads. (Do we need this condition?)
continue
if not task.parent._is_finished():
if not task.parent.has_state(TaskState.FINISHED_MASK):
# For an inclusive join, this can happen - it's a future join
continue
if my_task._is_descendant_of(task):
if my_task.is_descendant_of(task):
# Skip ancestors (otherwise the branch this task is on will get dropped)
continue
# We have found a matching instance.
Expand Down
Loading