Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CA]: hetzner cloud firewall feature #4185

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cluster-autoscaler/cloudprovider/hetzner/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ The cluster autoscaler for Hetzner Cloud scales worker nodes.

`HCLOUD_NETWORK` Default empty , The name of the network that is used in the cluster , @see https://docs.hetzner.cloud/#networks

`HCLOUD_FIREWALL` Default empty , The name of the firewall that is used in the cluster , @see https://docs.hetzner.cloud/#firewalls

`HCLOUD_SSH_KEY` Default empty , This SSH Key will have access to the fresh created server, @see https://docs.hetzner.cloud/#ssh-keys

Node groups must be defined with the `--nodes=<min-servers>:<max-servers>:<instance-type>:<region>:<name>` flag.
Expand Down
107 changes: 96 additions & 11 deletions cluster-autoscaler/cloudprovider/hetzner/hcloud-go/hcloud/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,16 @@ func (c *ActionClient) GetByID(ctx context.Context, id int) (*Action, *Response,
// ActionListOpts specifies options for listing actions.
type ActionListOpts struct {
ListOpts
ID []int
Status []ActionStatus
Sort []string
}

func (l ActionListOpts) values() url.Values {
vals := l.ListOpts.values()
for _, id := range l.ID {
vals.Add("id", fmt.Sprintf("%d", id))
}
for _, status := range l.Status {
vals.Add("status", string(status))
}
Expand Down Expand Up @@ -157,7 +161,7 @@ func (c *ActionClient) All(ctx context.Context) ([]*Action, error) {
opts := ActionListOpts{}
opts.PerPage = 50

_, err := c.client.all(func(page int) (*Response, error) {
err := c.client.all(func(page int) (*Response, error) {
opts.Page = page
actions, resp, err := c.List(ctx, opts)
if err != nil {
Expand All @@ -173,24 +177,97 @@ func (c *ActionClient) All(ctx context.Context) ([]*Action, error) {
return allActions, nil
}

// WatchProgress watches the action's progress until it completes with success or error.
func (c *ActionClient) WatchProgress(ctx context.Context, action *Action) (<-chan int, <-chan error) {
errCh := make(chan error, 1)
// AllWithOpts returns all actions for the given options.
func (c *ActionClient) AllWithOpts(ctx context.Context, opts ActionListOpts) ([]*Action, error) {
allActions := []*Action{}

err := c.client.all(func(page int) (*Response, error) {
opts.Page = page
actions, resp, err := c.List(ctx, opts)
if err != nil {
return resp, err
}
allActions = append(allActions, actions...)
return resp, nil
})
if err != nil {
return nil, err
}

return allActions, nil
}

// WatchOverallProgress watches several actions' progress until they complete with success or error.
func (c *ActionClient) WatchOverallProgress(ctx context.Context, actions []*Action) (<-chan int, <-chan error) {
errCh := make(chan error, len(actions))
progressCh := make(chan int)

go func() {
defer close(errCh)
defer close(progressCh)

successIDs := make([]int, 0, len(actions))
watchIDs := make(map[int]struct{}, len(actions))
for _, action := range actions {
watchIDs[action.ID] = struct{}{}
}

ticker := time.NewTicker(c.client.pollInterval)
sendProgress := func(p int) {
defer ticker.Stop()
for {
select {
case progressCh <- p:
break
default:
case <-ctx.Done():
errCh <- ctx.Err()
return
case <-ticker.C:
break
}

opts := ActionListOpts{}
for watchID := range watchIDs {
opts.ID = append(opts.ID, watchID)
}

as, err := c.AllWithOpts(ctx, opts)
if err != nil {
errCh <- err
return
}

for _, a := range as {
switch a.Status {
case ActionStatusRunning:
continue
case ActionStatusSuccess:
delete(watchIDs, a.ID)
successIDs := append(successIDs, a.ID)
sendProgress(progressCh, int(float64(len(actions)-len(successIDs))/float64(len(actions))*100))
case ActionStatusError:
delete(watchIDs, a.ID)
errCh <- fmt.Errorf("action %d failed: %w", a.ID, a.Error())
}
}

if len(watchIDs) == 0 {
return
}
}
}()

return progressCh, errCh
}

// WatchProgress watches one action's progress until it completes with success or error.
func (c *ActionClient) WatchProgress(ctx context.Context, action *Action) (<-chan int, <-chan error) {
errCh := make(chan error, 1)
progressCh := make(chan int)

go func() {
defer close(errCh)
defer close(progressCh)

ticker := time.NewTicker(c.client.pollInterval)
defer ticker.Stop()

for {
select {
Expand All @@ -209,10 +286,9 @@ func (c *ActionClient) WatchProgress(ctx context.Context, action *Action) (<-cha

switch a.Status {
case ActionStatusRunning:
sendProgress(a.Progress)
break
sendProgress(progressCh, a.Progress)
case ActionStatusSuccess:
sendProgress(100)
sendProgress(progressCh, 100)
errCh <- nil
return
case ActionStatusError:
Expand All @@ -224,3 +300,12 @@ func (c *ActionClient) WatchProgress(ctx context.Context, action *Action) (<-cha

return progressCh, errCh
}

func sendProgress(progressCh chan int, p int) {
select {
case progressCh <- p:
break
default:
break
}
}
Loading