Skip to content

Commit

Permalink
initial code for decorators and optional naming of workflows and acti…
Browse files Browse the repository at this point in the history
…vities

Signed-off-by: Mukundan Sundararajan <[email protected]>
  • Loading branch information
mukundansundar committed Dec 19, 2023
1 parent 8344420 commit 2688f3c
Show file tree
Hide file tree
Showing 12 changed files with 294 additions and 48 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ cd python-sdk
pip3 install -e .
pip3 install -e ./ext/dapr-ext-grpc/
pip3 install -e ./ext/dapr-ext-fastapi/
pip3 install -e ./ext/dapr-ext-workflow/
```

3. Install required packages
Expand Down
39 changes: 39 additions & 0 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,20 @@ Each of the examples in this directory can be run directly from the command line
### Task Chaining

This example demonstrates how to chain "activity" tasks together in a workflow. You can run this sample using the following command:
<!--STEP
name: Run the task chaining example
expected_stdout_lines:
- "== APP == Step 1: Received input: 42."
- "== APP == Step 2: Received input: 43."
- "== APP == Step 3: Received input: 86."
- "== APP == Workflow completed! Status: WorkflowStatus.COMPLETED"
timeout_seconds: 30
-->

```sh
dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 task_chaining.py
```
<!--END_STEP-->

The output of this example should look like this:

Expand All @@ -41,9 +51,38 @@ The output of this example should look like this:

This example demonstrates how to fan-out a workflow into multiple parallel tasks, and then fan-in the results of those tasks. You can run this sample using the following command:

<!--STEP
name: Run the fan-out/fan-in example
match_order: none
expected_stdout_lines:
- "== APP == Processing work item: 1."
- "== APP == Processing work item: 2."
- "== APP == Processing work item: 3."
- "== APP == Processing work item: 4."
- "== APP == Processing work item: 5."
- "== APP == Processing work item: 6."
- "== APP == Processing work item: 7."
- "== APP == Processing work item: 8."
- "== APP == Processing work item: 9."
- "== APP == Processing work item: 10."
- "== APP == Work item 1 processed. Result: 2."
- "== APP == Work item 2 processed. Result: 4."
- "== APP == Work item 3 processed. Result: 6."
- "== APP == Work item 4 processed. Result: 8."
- "== APP == Work item 5 processed. Result: 10."
- "== APP == Work item 6 processed. Result: 12."
- "== APP == Work item 7 processed. Result: 14."
- "== APP == Work item 8 processed. Result: 16."
- "== APP == Work item 9 processed. Result: 18."
- "== APP == Work item 10 processed. Result: 20."
- "== APP == Final result: 110."
timeout_seconds: 30
-->

```sh
dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 fan_out_fan_in.py
```
<!--END_STEP-->

The output of this sample should look like this:

Expand Down
18 changes: 8 additions & 10 deletions examples/workflow/fan_out_fan_in.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
from typing import List
import dapr.ext.workflow as wf

wfr = wf.WorkflowRuntime("localhost", "50001")

@wfr.workflow(name="batch_processing")
def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
# get a batch of N work items to process in parallel
work_batch = yield ctx.call_activity(get_work_batch, input=wf_input)
Expand All @@ -27,30 +29,26 @@ def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
total = sum(outputs)
yield ctx.call_activity(process_results, input=total)


@wfr.activity(name="get_batch")
def get_work_batch(ctx, batch_size: int) -> List[int]:
return [i + 1 for i in range(batch_size)]


@wfr.activity
def process_work_item(ctx, work_item: int) -> int:
print(f'Processing work item: {work_item}.')
time.sleep(5)
result = work_item * 2
print(f'Work item {work_item} processed. Result: {result}.')
return result


@wfr.activity(name="final_process")
def process_results(ctx, final_result: int):
print(f'Final result: {final_result}.')


if __name__ == '__main__':
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
workflowRuntime.register_workflow(batch_processing_workflow)
workflowRuntime.register_activity(get_work_batch)
workflowRuntime.register_activity(process_work_item)
workflowRuntime.register_activity(process_results)
workflowRuntime.start()
wfr.start()
time.sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(
Expand All @@ -59,4 +57,4 @@ def process_results(ctx, final_result: int):
print(f'Workflow started. Instance ID: {instance_id}')
state = wf_client.wait_for_workflow_completion(instance_id)

workflowRuntime.shutdown()
wfr.shutdown()
15 changes: 7 additions & 8 deletions examples/workflow/human_approval.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from dapr.clients import DaprClient
import dapr.ext.workflow as wf

wfr = wf.WorkflowRuntime("localhost", "50001")

@dataclass
class Order:
Expand All @@ -37,7 +38,7 @@ class Approval:
def from_dict(dict):
return Approval(**dict)


@wfr.workflow(name="purchase_order_wf")
def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
# Orders under $1000 are auto-approved
if order.cost < 1000:
Expand All @@ -59,10 +60,12 @@ def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
return f"Approved by '{approval_details.approver}'"


@wfr.activity(name="send_approval")
def send_approval_request(_, order: Order) -> None:
print(f'*** Requesting approval from user for order: {order}')


@wfr.activity
def place_order(_, order: Order) -> None:
print(f'*** Placing order: {order}')

Expand All @@ -76,12 +79,8 @@ def place_order(_, order: Order) -> None:
parser.add_argument("--timeout", type=int, default=60, help="Timeout in seconds")
args = parser.parse_args()

# configure and start the workflow runtime
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
workflowRuntime.register_workflow(purchase_order_workflow)
workflowRuntime.register_activity(send_approval_request)
workflowRuntime.register_activity(place_order)
workflowRuntime.start()
# start the workflow runtime
wfr.start()

# Start a purchase order workflow using the user input
order = Order(args.cost, "MyProduct", 1)
Expand Down Expand Up @@ -119,4 +118,4 @@ def prompt_for_approval():
except TimeoutError:
print("*** Workflow timed out!")

workflowRuntime.shutdown()
wfr.shutdown()
15 changes: 8 additions & 7 deletions examples/workflow/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@
from dataclasses import dataclass
from datetime import timedelta
import random
from time import sleep
import dapr.ext.workflow as wf


wfr = wf.WorkflowRuntime()
@dataclass
class JobStatus:
job_id: str
is_healthy: bool


@wfr.workflow(name="status_monitor")
def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus):
# poll a status endpoint associated with this job
status = yield ctx.call_activity(check_status, input=job)
Expand All @@ -43,20 +45,19 @@ def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus):
ctx.continue_as_new(job)


@wfr.activity
def check_status(ctx, _) -> str:
return random.choice(["healthy", "unhealthy"])


@wfr.activity
def send_alert(ctx, message: str):
print(f'*** Alert: {message}')


if __name__ == '__main__':
workflowRuntime = wf.WorkflowRuntime()
workflowRuntime.register_workflow(status_monitor_workflow)
workflowRuntime.register_activity(check_status)
workflowRuntime.register_activity(send_alert)
workflowRuntime.start()
wfr.start()
sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
job_id = "job1"
Expand All @@ -75,4 +76,4 @@ def send_alert(ctx, message: str):
print(f'Workflow already running. Instance ID: {job_id}')

input("Press Enter to stop...\n")
workflowRuntime.shutdown()
wfr.shutdown()
20 changes: 12 additions & 8 deletions examples/workflow/task_chaining.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from time import sleep

import dapr.ext.workflow as wf


wfr = wf.WorkflowRuntime("localhost", "50001")

@wfr.workflow(name="random_workflow")
def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
try:
result1 = yield ctx.call_activity(step1, input=wf_input)
Expand All @@ -24,37 +29,36 @@ def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
return [result1, result2, result3]


@wfr.activity(name="step10")
def step1(ctx, activity_input):
print(f'Step 1: Received input: {activity_input}.')
# Do some work
return activity_input + 1


@wfr.activity
def step2(ctx, activity_input):
print(f'Step 2: Received input: {activity_input}.')
# Do some work
return activity_input * 2


@wfr.activity
def step3(ctx, activity_input):
print(f'Step 3: Received input: {activity_input}.')
# Do some work
return activity_input ^ 2


@wfr.activity
def error_handler(ctx, error):
print(f'Executing error handler: {error}.')
# Do some compensating work


if __name__ == '__main__':
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
workflowRuntime.register_workflow(task_chain_workflow)
workflowRuntime.register_activity(step1)
workflowRuntime.register_activity(step2)
workflowRuntime.register_activity(step3)
workflowRuntime.register_activity(error_handler)
workflowRuntime.start()
wfr.start()
sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(
Expand All @@ -64,4 +68,4 @@ def error_handler(ctx, error):
state = wf_client.wait_for_workflow_completion(instance_id)
print(f'Workflow completed! Status: {state.runtime_status}')

workflowRuntime.shutdown()
wfr.shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,13 @@ def schedule_new_workflow(self, workflow: Workflow, *, input: Optional[TInput] =
Returns:
The ID of the scheduled workflow instance.
"""
if hasattr(workflow, '_registered_name'):
return self.__obj.schedule_new_orchestration(workflow.__dict__['_registered_name'],

Check warning on line 79 in ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py#L79

Added line #L79 was not covered by tests
input=input, instance_id=instance_id,
start_at=start_at)
return self.__obj.schedule_new_orchestration(workflow.__name__, input=input,
instance_id=instance_id, start_at=start_at)
instance_id=instance_id,
start_at=start_at)

def get_workflow_state(self, instance_id: str, *,
fetch_payloads: bool = True) -> Optional[WorkflowState]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:

def call_activity(self, activity: Callable[[WorkflowActivityContext, TInput], TOutput], *,
input: TInput = None) -> task.Task[TOutput]:
if hasattr(activity, '_registered_name'):
return self.__obj.call_activity(activity=activity.__dict__['_registered_name'],

Check warning on line 57 in ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py#L57

Added line #L57 was not covered by tests
input=input)
return self.__obj.call_activity(activity=activity.__name__, input=input)

def call_child_workflow(self, workflow: Workflow, *,
Expand All @@ -62,7 +65,12 @@ def wf(ctx: task.OrchestrationContext, inp: TInput):
daprWfContext = DaprWorkflowContext(ctx)
return workflow(daprWfContext, inp)
# copy workflow name so durabletask.worker can find the orchestrator in its registry
wf.__name__ = workflow.__name__

# Any workflow function using python decorator will have a _registered_name attribute
if hasattr(workflow, '_registered_name'):
wf.__name__ = workflow.__dict__['_registered_name']

Check warning on line 71 in ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py#L71

Added line #L71 was not covered by tests
else:
wf.__name__ = workflow.__name__
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id)

def wait_for_external_event(self, name: str) -> task.Task:
Expand Down
Loading

0 comments on commit 2688f3c

Please sign in to comment.