Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python decorators for Workflow #635

Closed
Tracked by #7410
mukundansundar opened this issue Nov 7, 2023 · 9 comments
Closed
Tracked by #7410

Python decorators for Workflow #635

mukundansundar opened this issue Nov 7, 2023 · 9 comments
Assignees

Comments

@mukundansundar
Copy link
Contributor

Describe the proposal

The current Python SDK implementation of Workflow is structured and provides a set of functions to operate the workflow of your code/ long running scenarios.
e.g.: schedule_new_workflow(), get_workflow_state(), wait_for_workflow_start()

The proposal is to leverage the strengths of Python programming model, to simplify the developers’ efforts further, by introducing decorators pattern to add additional features/ functionality to existing functions without modifying the original structure of the code.

Proposed Solution

The Workflow lifecycle is involved with multiple steps as shown below and we have multiple functions for each of these steps.
image

Currently in Python workflow SDK, we have workflow function definition, activity functions definition followed by registering of the workflow and activity function against the WorkflowRuntime instance.
For now, there is only one WorkflowRuntime instance created which interacts with the Dapr sidecar is shown below.

wfr = WorkflowRuntime(host, port)

Given that workflow instance(wfr), we can register multiple workflow functions against it as well as multiple activity functions against it.

  • Decorators at Workflow level
    • @wfr.workflow.register(name) – Register the workflow with DAPR side car. name is related to the GitHub issue.
    • @wfr.worflow.serializer(serializer)/@wfr.workflow.deserializer(deserializer) – to set serialization/ deserialization logic with workflow input/output
    • @wfr.workflow.retry()– Resiliency, to apply the retries on Workflow.
      • In durabletask-python library, there are options to set RetryPolicy for an activity, this functionality is to extend it to the workflow level and be able to set a default policy for any run of a particular workflow.
  • Decorators at Activity level
    • @wfr.activity.register(name) – Register an Activity with workflow runtime. name is related to the GitHub issue.
    • @wfr.activity.serializer(serializer)/@wfr.activity.deserializer(deserializer) – to set serialization/ deserialization logic with activity input/output.
    • @wfr.activity.retry() - Resiliency, to apply the retries on Activity. Currently an activity can be called with a RetryPolicy, based on the code in durabletask-python library. This is to be able to set a default retry policy for the activity even if a retry policy is not specified at the call site.

Example code with workflow and activity level decorators:

wfr = WorkflowRuntime(host, port)
@wfr.workflow.register(name=hello_world”)
@wfr.workflow.serializer(CustomInputSerializer())
def hello_world_wf(ctx: DaprWorkflowContext, input):
    print(f'{input}')
    yield ctx.call_activity(hello_act, input=1)
    yield ctx.call_activity(hello_act, input=10)
    yield ctx.wait_for_external_event("event1")
    yield ctx.call_activity(hello_act, input=100)
    yield ctx.call_activity(hello_act, input=1000)

@wfr.activity.register(name=hello”)
@wfr.activity.retry(max_number_of_attempts=3)
def hello_act(ctx: WorkflowActivityContext, input):
    global counter
    counter += input
    print(f'New counter value is: {counter}!', flush=True)

def main( ):
        wfr.start()
      . . . start workflow as normal

Potential changes needed to codebase

  • Addition of retry policy in dapr/python-sdk to mirror what is there in microsoft/durabletask-python library.
    • Workflow specific retries are not there in the library yet. If needed, that can be added.
  • Change WorkflowRuntime or add a new class to define the decorators stated above.
    • For the register_activity and register_workflow methods in the class, we can reuse the code that is currently there in WorkflowRuntime class and extend it to add a decorator for the same.
  • Setting the serializer, deserializers on the activity and the workflow levels, changes are needed in the microsoft/durabletask-python library first which then needs to be propagated into the dapr/python-sdk workflow code.

If the WorkflowRuntime instance is going to be a singleton instance for the application, we can potentially do away with the creation of the instance and also simplify the decorators to workflow.register(name), activity.register(name) which is then registers against the singleton WorkflowRuntime instance created when the app starts.

@lucus-sun
Copy link

+1 please make sure we also have the waitForInstanceCompletion like java sdk.

@mukundansundar
Copy link
Contributor Author

mukundansundar commented Dec 11, 2023

@berndverst @cgillum @DeepanshuA
A specific question with respect to the initialization of the workflow runtime instance and registering the workflow, activity against a particular runtime.
As of now we always have only one single workflow runtime instance, with a connection specified by host and port (can default to dapr host port)

I would like to modify the decorator

@wfr.workflow.register

To something like

@workflow.register(name=None, workflow_runtime=None)

or simply

@workflow(name=None, workflow_runtime=None)

and same for activity also
Instead of

@wfr.activity.register

be either

@activity.register(name=None, workflow_runtime=None)

or

@activity(name=None, workflow_runtime=None)

The default value of workflow runtime instance being created at the start itself, based on values of Dapr host and port from settings.py.

It can if needed be overridden by specifying a different workflow_runtime value if a new one is created by the user.

This in turn would make it that, a default workflow runtime instance will be created if either workflow or activity decorators are used and only that instance will be used and it cannot be changed later on i.e. make WorkflowRuntime class a singleton class. If the WorkflowRuntime class is made a singleton class, then we can make the other two methods of the class start and shutdown classmethods and thus we should be able to class them using the class itself directly.

I would like your thoughts on this change.

After change:

## default workflow_runtime is used
@workflow.register(name=hello_world”)
@workflow.serializer(CustomInputSerializer())
def hello_world_wf(ctx: DaprWorkflowContext, input):
    print(f'{input}')
    yield ctx.call_activity(hello_act, input=1)
    yield ctx.call_activity(hello_act, input=10)
    yield ctx.wait_for_external_event("event1")
    yield ctx.call_activity(hello_act, input=100)
    yield ctx.call_activity(hello_act, input=1000)

@activity.register(name=hello”)
@activity.retry(max_number_of_attempts=3)
def hello_act(ctx: WorkflowActivityContext, input):
    global counter
    counter += input
    print(f'New counter value is: {counter}!', flush=True)

def main( ):
        WorkflowRuntime.start()
      . . . start workflow as normal
        WorkflowRuntime.shutdown()
```

@mukundansundar
Copy link
Contributor Author

there is initial draft code that is there in #651. The pending work is adding tests and a TODO for creating a default init for WorkflowRuntime.

@cgillum
Copy link
Contributor

cgillum commented Dec 12, 2023

Thanks @mukundansundar. Of the options you mentioned, I'd actually prefer the more explicit approach which doesn't include a default WorkflowRuntime singleton. My reasoning is the following:

  • The explicit approach makes the code be more self-explanatory.
  • Default singletons may add some "magic" that makes it harder for the average user to understand what's going on.
  • The explicit approach is similar to existing frameworks, like Flask and FastAPI. This familiarity will help users understand it more quickly.

That said, I do value simplicity, and one simplification I'd make to your initial proposal is to combine the multiple decorators into just one. I also wonder if the user should be presented with WorkflowApp instead of WorkflowRuntime as the root concept.

wfapp = WorkflowApp(host, port)

@wfapp.workflow(name="hello_world", serializer=CustomInputSerializer)
def hello_world_wf(ctx: DaprWorkflowContext, input):
    print(f'{input}')
    yield ctx.call_activity(hello_act, input=1)
    yield ctx.call_activity(hello_act, input=10)
    yield ctx.wait_for_external_event("event1")
    yield ctx.call_activity(hello_act, input=100)
    yield ctx.call_activity(hello_act, input=1000)

activity_retry_policy = ...

@wfapp.activity(name="hello", retry_policy=activity_retry_policy)
def hello_act(ctx: WorkflowActivityContext, input):
    print(f'Activity input: {input}', flush=True)

def main( ):
        wfapp.start()
      . . . start workflow as normal

Let me know what you think.

@mukundansundar
Copy link
Contributor Author

mukundansundar commented Dec 13, 2023

I think having the WorkflowRuntime(WorkflowApp) initialized explicitly does add more clarity to the flow.

For the renaming part, my one concern is we have wfapp.start(). Could that give an incorrect notion that the workflow app has started and the workflow itself has started?

When it is workflow_runtime.start(), that clearly says its the runtime you are starting and not the app/workflow.

@cgillum Can the runtime be started and stopped multiple times in a given application?
If so can we consider prioritizing with statement and use that instead of start and shutdown?

@bkcsfi
Copy link

bkcsfi commented Dec 21, 2023

A concern I have with the wfapp sample in #635 (comment) is that it suggests that the module that attaches functions to the wfapp is the same module that runs the wfapp, which is not the case for my application.

As I mentioned in #651 I have multiple modules with activities and workflows, but only one workflow runtime that will be instantiated at the top-most level.

I think in this example, WorkflowApp has the same issues as WorkflowRuntime (because in this example WorkflowApp also takes host and port arg)

Would you consider the FastAPI approach, where FastAPI() can be instantiated as a lightweight container to which functions get attached?

Then a higher level module can import the the 'app' from each child module and mount them to the top-most FastAPI() instance, which is the one that actually does the work.

What if WorkflowApp were only a container for registered workflows and activities, and didn't create a GrpcEndpoint or a TaskHubGrpcWorker until the start() method were called?

# module1.py
wfapp = WorkflowApp() # a lightweight container

@wfapp.workflow(name="hello_world", serializer=CustomInputSerializer)
def hello_world_wf(ctx: DaprWorkflowContext, input):
    print(f'{input}')
    yield ctx.call_activity(hello_act, input=1)
...

Then (optionally in a higher level module, can combine multiple WorkflowApp together)

# main.py
from module1 import wfapp as module1_wfapp
from module2 import wfapp as module2_wfapp

wfapp = WorkflowApp() # a lightweight container
wfapp.attach(module1_wfapp)
wfapp.attach(module2_wfapp)

def main( ):
        wf_runtime = wfapp.start(host, port) # instantiate WorkflowRuntime, register workflows and activities and calls wf_runtime.start()
        # do stuff
        wf_runtime.shutdown()

@bkcsfi
Copy link

bkcsfi commented Dec 21, 2023

alternatively if workflow_runtime is also going to be a context manager, then maybe this approach?

# main.py
from module1 import wfapp as module1_wfapp
from module2 import wfapp as module2_wfapp

wfapp = WorkflowApp() # a lightweight container
wfapp.attach(module1_wfapp)
wfapp.attach(module2_wfapp)

def main( ):
       with wfapp.create_runtime(host, port) as wf_runtime: # instantiate WorkflowRuntime, register workflows and activities
            wf_runtime.start()
            # do other stuff or sleep or whatever

shutdown() is automatically called when the wf_runtime context exits

alternatively create_runtime() can call wf_runtime.start(), in which case wfapp.start() makes more sense, as in the first example.

@stuartleeks
Copy link

I've been experimenting with decorators for Dapr Workflow for a recent project and have just been pointed to this issue.

I have created a repo with the decorator code extracted if you're interested: https://github.com/stuartleeks/dapr-workflow-exploration-python

@mukundansundar
Copy link
Contributor Author

closing this issue since the initial set of decorators are implemented in #651 and any further enhancements will be tracked as separate issues.

@mukundansundar mukundansundar self-assigned this Feb 8, 2024
@berndverst berndverst modified the milestones: v1.13, workflow-0.4.0 Feb 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants