Skip to content

Commit

Permalink
Merge pull request #54 from wayfair-incubator/reprocess
Browse files Browse the repository at this point in the history
Allowing re-execution of tasks based on a reprocessing the message
  • Loading branch information
patkivikram authored Oct 30, 2023
2 parents 6710539 + 2f23f76 commit 5564bcf
Showing 1 changed file with 20 additions and 9 deletions.
29 changes: 20 additions & 9 deletions dagger/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,15 +322,21 @@ async def start(
ignore_status: bool = False,
) -> None:
# pre-execute
if self.status.code in [
TaskStatusEnum.COMPLETED.name,
TaskStatusEnum.SKIPPED.name,
]:
if (
self.status.code
in [
TaskStatusEnum.COMPLETED.name,
TaskStatusEnum.SKIPPED.name,
]
and not self.reprocess_on_message
):
return await self.on_complete(
status=self.status, workflow_instance=workflow_instance
)
if (
ignore_status or self.status.code == TaskStatusEnum.NOT_STARTED.name
ignore_status
or self.status.code == TaskStatusEnum.NOT_STARTED.name
or self.reprocess_on_message
) and workflow_instance:
self.status = TaskStatus(
code=TaskStatusEnum.EXECUTING.name, value=TaskStatusEnum.EXECUTING.value
Expand Down Expand Up @@ -964,16 +970,21 @@ async def stop(
)

async def start(self, workflow_instance: Optional[ITemplateDAGInstance]) -> None:
if self.status.code in [
TaskStatusEnum.COMPLETED.name,
TaskStatusEnum.SKIPPED.name,
]:
if (
self.status.code
in [
TaskStatusEnum.COMPLETED.name,
TaskStatusEnum.SKIPPED.name,
]
and not self.reprocess_on_message
):
return await self.on_complete(
status=self.status, workflow_instance=workflow_instance
)
if (
self.status.code == TaskStatusEnum.NOT_STARTED.name
or self.status.code == TaskStatusEnum.SUBMITTED.name
or self.reprocess_on_message
) and workflow_instance:
await self.execute(
runtime_parameters=workflow_instance.runtime_parameters,
Expand Down

0 comments on commit 5564bcf

Please sign in to comment.