-
Notifications
You must be signed in to change notification settings - Fork 45
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: implement actions waiter (#407)
Implement a simpler and more versatile waiting functions for actions. Most use cases when waiting for actions is to return early if an action fails. If all actions must be waited until completion, the users should use the `WaitForFunc` function. If the final actions objects are needed, the users should use the `WaitForFunc` function to store the final actions using the `handleUpdate` callback. This deprecates the `ActionClient.WatchOverallProgress` and `ActionClient.WatchProgress` methods.
- Loading branch information
Showing
8 changed files
with
459 additions
and
129 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
package hcloud | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"maps" | ||
"slices" | ||
"time" | ||
) | ||
|
||
type ActionWaiter interface { | ||
WaitForFunc(ctx context.Context, handleUpdate func(update *Action) error, actions ...*Action) error | ||
WaitFor(ctx context.Context, actions ...*Action) error | ||
} | ||
|
||
var _ ActionWaiter = (*ActionClient)(nil) | ||
|
||
// WaitForFunc waits until all actions are completed by polling the API at the interval | ||
// defined by [WithPollBackoffFunc]. An action is considered as complete when its status is | ||
// either [ActionStatusSuccess] or [ActionStatusError]. | ||
// | ||
// The handleUpdate callback is called every time an action is updated. | ||
func (c *ActionClient) WaitForFunc(ctx context.Context, handleUpdate func(update *Action) error, actions ...*Action) error { | ||
running := make(map[int64]struct{}, len(actions)) | ||
for _, action := range actions { | ||
if action.Status == ActionStatusRunning { | ||
running[action.ID] = struct{}{} | ||
} else if handleUpdate != nil { | ||
// We filter out already completed actions from the API polling loop; while | ||
// this isn't a real update, the caller should be notified about the new | ||
// state. | ||
if err := handleUpdate(action); err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
|
||
retries := 0 | ||
for { | ||
if len(running) == 0 { | ||
break | ||
} | ||
|
||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
case <-time.After(c.action.client.pollBackoffFunc(retries)): | ||
retries++ | ||
} | ||
|
||
opts := ActionListOpts{ | ||
Sort: []string{"status", "id"}, | ||
ID: make([]int64, 0, len(running)), | ||
} | ||
for actionID := range running { | ||
opts.ID = append(opts.ID, actionID) | ||
} | ||
slices.Sort(opts.ID) | ||
|
||
updates, err := c.AllWithOpts(ctx, opts) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if len(updates) != len(running) { | ||
// Some actions may not exist in the API, also fail early to prevent an | ||
// infinite loop when updates == 0. | ||
|
||
notFound := maps.Clone(running) | ||
for _, update := range updates { | ||
delete(notFound, update.ID) | ||
} | ||
notFoundIDs := make([]int64, 0, len(notFound)) | ||
for unknownID := range notFound { | ||
notFoundIDs = append(notFoundIDs, unknownID) | ||
} | ||
|
||
return fmt.Errorf("actions not found: %v", notFoundIDs) | ||
} | ||
|
||
for _, update := range updates { | ||
if update.Status != ActionStatusRunning { | ||
delete(running, update.ID) | ||
} | ||
|
||
if handleUpdate != nil { | ||
if err := handleUpdate(update); err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// WaitFor waits until all actions succeed by polling the API at the interval defined by | ||
// [WithPollBackoffFunc]. An action is considered as succeeded when its status is either | ||
// [ActionStatusSuccess]. | ||
// | ||
// If a single action fails, the function will stop waiting and the error set in the | ||
// action will be returned as an [ActionError]. | ||
// | ||
// For more flexibility, see the [WaitForFunc] function. | ||
func (c *ActionClient) WaitFor(ctx context.Context, actions ...*Action) error { | ||
return c.WaitForFunc( | ||
ctx, | ||
func(update *Action) error { | ||
if update.Status == ActionStatusError { | ||
return update.Error() | ||
} | ||
return nil | ||
}, | ||
actions..., | ||
) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
package hcloud | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestWaitFor(t *testing.T) { | ||
RunMockedTestCases(t, | ||
[]MockedTestCase{ | ||
{ | ||
Name: "succeed", | ||
WantRequests: []MockedRequest{ | ||
{"GET", "/actions?id=1509772237&page=1&sort=status&sort=id", nil, 200, | ||
`{ | ||
"actions": [ | ||
{ "id": 1509772237, "status": "running", "progress": 0 } | ||
], | ||
"meta": { "pagination": { "page": 1 }} | ||
}`}, | ||
{"GET", "/actions?id=1509772237&page=1&sort=status&sort=id", nil, 200, | ||
`{ | ||
"actions": [ | ||
{ "id": 1509772237, "status": "success", "progress": 100 } | ||
], | ||
"meta": { "pagination": { "page": 1 }} | ||
}`}, | ||
}, | ||
Run: func(env testEnv) { | ||
actions := []*Action{{ID: 1509772237, Status: ActionStatusRunning}} | ||
|
||
err := env.Client.Action.WaitFor(context.Background(), actions...) | ||
assert.NoError(t, err) | ||
}, | ||
}, | ||
{ | ||
Name: "succeed with already succeeded action", | ||
Run: func(env testEnv) { | ||
actions := []*Action{{ID: 1509772237, Status: ActionStatusSuccess}} | ||
|
||
err := env.Client.Action.WaitFor(context.Background(), actions...) | ||
assert.NoError(t, err) | ||
}, | ||
}, | ||
{ | ||
Name: "fail with unknown action", | ||
WantRequests: []MockedRequest{ | ||
{"GET", "/actions?id=1509772237&page=1&sort=status&sort=id", nil, 200, | ||
`{ | ||
"actions": [], | ||
"meta": { "pagination": { "page": 1 }} | ||
}`}, | ||
}, | ||
Run: func(env testEnv) { | ||
actions := []*Action{{ID: 1509772237, Status: ActionStatusRunning}} | ||
|
||
err := env.Client.Action.WaitFor(context.Background(), actions...) | ||
assert.Error(t, err) | ||
assert.Equal(t, "actions not found: [1509772237]", err.Error()) | ||
}, | ||
}, | ||
{ | ||
Name: "fail with canceled context", | ||
Run: func(env testEnv) { | ||
actions := []*Action{{ID: 1509772237, Status: ActionStatusRunning}} | ||
|
||
ctx, cancelFunc := context.WithCancel(context.Background()) | ||
cancelFunc() | ||
err := env.Client.Action.WaitFor(ctx, actions...) | ||
assert.Error(t, err) | ||
}, | ||
}, | ||
{ | ||
Name: "fail with api error", | ||
WantRequests: []MockedRequest{ | ||
{"GET", "/actions?id=1509772237&page=1&sort=status&sort=id", nil, 503, ""}, | ||
}, | ||
Run: func(env testEnv) { | ||
actions := []*Action{{ID: 1509772237, Status: ActionStatusRunning}} | ||
|
||
err := env.Client.Action.WaitFor(context.Background(), actions...) | ||
assert.Error(t, err) | ||
assert.Equal(t, "hcloud: server responded with status code 503", err.Error()) | ||
}, | ||
}, | ||
}, | ||
) | ||
} | ||
|
||
func TestWaitForFunc(t *testing.T) { | ||
RunMockedTestCases(t, | ||
[]MockedTestCase{ | ||
{ | ||
Name: "succeed", | ||
WantRequests: []MockedRequest{ | ||
{"GET", "/actions?id=1509772237&id=1509772238&page=1&sort=status&sort=id", nil, 200, | ||
`{ | ||
"actions": [ | ||
{ "id": 1509772237, "status": "running", "progress": 40 }, | ||
{ "id": 1509772238, "status": "running", "progress": 0 } | ||
], | ||
"meta": { "pagination": { "page": 1 }} | ||
}`}, | ||
{"GET", "/actions?id=1509772237&id=1509772238&page=1&sort=status&sort=id", nil, 200, | ||
`{ | ||
"actions": [ | ||
{ "id": 1509772237, "status": "running", "progress": 60 }, | ||
{ "id": 1509772238, "status": "running", "progress": 50 } | ||
], | ||
"meta": { "pagination": { "page": 1 }} | ||
}`}, | ||
{"GET", "/actions?id=1509772237&id=1509772238&page=1&sort=status&sort=id", nil, 200, | ||
`{ | ||
"actions": [ | ||
{ "id": 1509772237, "status": "success", "progress": 100 }, | ||
{ "id": 1509772238, "status": "running", "progress": 75 } | ||
], | ||
"meta": { "pagination": { "page": 1 }} | ||
}`}, | ||
{"GET", "/actions?id=1509772238&page=1&sort=status&sort=id", nil, 200, | ||
`{ | ||
"actions": [ | ||
{ "id": 1509772238, "status": "error", "progress": 75, | ||
"error": { | ||
"code": "action_failed", | ||
"message": "Something went wrong with the action" | ||
} | ||
} | ||
], | ||
"meta": { "pagination": { "page": 1 }} | ||
}`}, | ||
}, | ||
Run: func(env testEnv) { | ||
actions := []*Action{ | ||
{ID: 1509772236, Status: ActionStatusSuccess}, | ||
{ID: 1509772237, Status: ActionStatusRunning}, | ||
{ID: 1509772238, Status: ActionStatusRunning}, | ||
} | ||
progress := make([]int, 0) | ||
|
||
progressByAction := make(map[int64]int, len(actions)) | ||
err := env.Client.Action.WaitForFunc(context.Background(), func(update *Action) error { | ||
switch update.Status { | ||
case ActionStatusRunning: | ||
progressByAction[update.ID] = update.Progress | ||
case ActionStatusSuccess: | ||
progressByAction[update.ID] = 100 | ||
case ActionStatusError: | ||
progressByAction[update.ID] = 100 | ||
} | ||
|
||
sum := 0 | ||
for _, value := range progressByAction { | ||
sum += value | ||
} | ||
progress = append(progress, sum/len(actions)) | ||
|
||
return nil | ||
}, actions...) | ||
|
||
assert.Nil(t, err) | ||
assert.Equal(t, []int{33, 46, 46, 53, 70, 83, 91, 100}, progress) | ||
}, | ||
}, | ||
}, | ||
) | ||
} |
Oops, something went wrong.