forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
example.py
116 lines (98 loc) · 4.7 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import asyncio
import uuid
from temporalio.client import BuildIdOpAddNewCompatible, BuildIdOpAddNewDefault, Client
from temporalio.worker import Worker
from worker_versioning.activities import greet, super_greet
from worker_versioning.workflow_v1 import MyWorkflow as MyWorkflowV1
from worker_versioning.workflow_v1_1 import MyWorkflow as MyWorkflowV1_1
from worker_versioning.workflow_v2 import MyWorkflow as MyWorkflowV2
async def main():
client = await Client.connect("localhost:7233")
task_queue = f"worker-versioning-{uuid.uuid4()}"
# Start a 1.0 worker
async with Worker(
client,
task_queue=task_queue,
workflows=[MyWorkflowV1],
activities=[greet, super_greet],
build_id="1.0",
use_worker_versioning=True,
):
# Add 1.0 as the default version for the queue
await client.update_worker_build_id_compatibility(
task_queue, BuildIdOpAddNewDefault("1.0")
)
# Start a workflow which will run on the 1.0 worker
handle = await client.start_workflow(
MyWorkflowV1.run,
task_queue=task_queue,
id=f"worker-versioning-v1-{uuid.uuid4()}",
)
# Signal the workflow to proceed
await handle.signal(MyWorkflowV1.proceeder, "go")
# Give a chance for the worker to process the signal
# TODO Better?
await asyncio.sleep(1)
# Add 1.1 as the default version for the queue, compatible with 1.0
await client.update_worker_build_id_compatibility(
task_queue, BuildIdOpAddNewCompatible("1.1", "1.0")
)
# Stop the old worker, and start a 1.1 worker. We do this to speed along the example, since the
# 1.0 worker may continue to process tasks briefly after we make 1.1 the new default.
async with Worker(
client,
task_queue=task_queue,
workflows=[MyWorkflowV1_1],
activities=[greet, super_greet],
build_id="1.1",
use_worker_versioning=True,
):
# Continue driving the workflow. Take note that the new version of the workflow run by the 1.1
# worker is the one that takes over! You might see a workflow task timeout, if the 1.0 worker is
# processing a task as the version update happens. That's normal.
await handle.signal(MyWorkflowV1.proceeder, "go")
# Add a new *incompatible* version to the task queue, which will become the new overall default for the queue.
await client.update_worker_build_id_compatibility(
task_queue, BuildIdOpAddNewDefault("2.0")
)
# Start a 2.0 worker
async with Worker(
client,
task_queue=task_queue,
workflows=[MyWorkflowV2],
activities=[greet, super_greet],
build_id="2.0",
use_worker_versioning=True,
):
# Start a new workflow. Note that it will run on the new 2.0 version, without the client invocation changing
# at all! Note here we can use `MyWorkflowV1.run` because the signature of the workflow has not changed.
handle2 = await client.start_workflow(
MyWorkflowV1.run,
task_queue=task_queue,
id=f"worker-versioning-v2-{uuid.uuid4()}",
)
# Drive both workflows once more before concluding them. The first workflow will continue running on the 1.1
# worker.
await handle.signal(MyWorkflowV1.proceeder, "go")
await handle2.signal(MyWorkflowV1.proceeder, "go")
await handle.signal(MyWorkflowV1.proceeder, "finish")
await handle2.signal(MyWorkflowV1.proceeder, "finish")
# Wait for both workflows to complete
await handle.result()
await handle2.result()
# Lastly we'll demonstrate how you can use the gRPC api to determine if certain build IDs are ready to be
# retired. There's more information in the documentation, but here's a quick example that shows us how to
# tell when the 1.0 worker can be retired:
# There is a 5 minute buffer before we will consider IDs no longer reachable by new workflows, to
# account for replication in multi-cluster setups. Uncomment the following line to wait long enough to see
# the 1.0 worker become unreachable.
# await asyncio.sleep(60 * 5)
reachability = await client.get_worker_task_reachability(
build_ids=["2.0", "1.0", "1.1"]
)
if not reachability.build_id_reachability["1.0"].task_queue_reachability[
task_queue
]:
print("1.0 is ready to be retired!")
if __name__ == "__main__":
asyncio.run(main())