Skip to content

Commit

Permalink
Merge pull request kubernetes#4185 from sergeyshevch/feature/ca/hetzn…
Browse files Browse the repository at this point in the history
…er-firewall

[CA]: hetzner cloud firewall feature
  • Loading branch information
k8s-ci-robot authored Jun 6, 2022
2 parents ce58b65 + 97e9374 commit 82bc463
Show file tree
Hide file tree
Showing 39 changed files with 2,552 additions and 152 deletions.
2 changes: 2 additions & 0 deletions cluster-autoscaler/cloudprovider/hetzner/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ The cluster autoscaler for Hetzner Cloud scales worker nodes.

`HCLOUD_NETWORK` Default empty , The name of the network that is used in the cluster , @see https://docs.hetzner.cloud/#networks

`HCLOUD_FIREWALL` Default empty , The name of the firewall that is used in the cluster , @see https://docs.hetzner.cloud/#firewalls

`HCLOUD_SSH_KEY` Default empty , This SSH Key will have access to the fresh created server, @see https://docs.hetzner.cloud/#ssh-keys

Node groups must be defined with the `--nodes=<min-servers>:<max-servers>:<instance-type>:<region>:<name>` flag.
Expand Down
107 changes: 96 additions & 11 deletions cluster-autoscaler/cloudprovider/hetzner/hcloud-go/hcloud/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,16 @@ func (c *ActionClient) GetByID(ctx context.Context, id int) (*Action, *Response,
// ActionListOpts specifies options for listing actions.
type ActionListOpts struct {
ListOpts
ID []int
Status []ActionStatus
Sort []string
}

func (l ActionListOpts) values() url.Values {
vals := l.ListOpts.values()
for _, id := range l.ID {
vals.Add("id", fmt.Sprintf("%d", id))
}
for _, status := range l.Status {
vals.Add("status", string(status))
}
Expand Down Expand Up @@ -157,7 +161,7 @@ func (c *ActionClient) All(ctx context.Context) ([]*Action, error) {
opts := ActionListOpts{}
opts.PerPage = 50

_, err := c.client.all(func(page int) (*Response, error) {
err := c.client.all(func(page int) (*Response, error) {
opts.Page = page
actions, resp, err := c.List(ctx, opts)
if err != nil {
Expand All @@ -173,24 +177,97 @@ func (c *ActionClient) All(ctx context.Context) ([]*Action, error) {
return allActions, nil
}

// WatchProgress watches the action's progress until it completes with success or error.
func (c *ActionClient) WatchProgress(ctx context.Context, action *Action) (<-chan int, <-chan error) {
errCh := make(chan error, 1)
// AllWithOpts returns all actions for the given options.
func (c *ActionClient) AllWithOpts(ctx context.Context, opts ActionListOpts) ([]*Action, error) {
allActions := []*Action{}

err := c.client.all(func(page int) (*Response, error) {
opts.Page = page
actions, resp, err := c.List(ctx, opts)
if err != nil {
return resp, err
}
allActions = append(allActions, actions...)
return resp, nil
})
if err != nil {
return nil, err
}

return allActions, nil
}

// WatchOverallProgress watches several actions' progress until they complete with success or error.
func (c *ActionClient) WatchOverallProgress(ctx context.Context, actions []*Action) (<-chan int, <-chan error) {
errCh := make(chan error, len(actions))
progressCh := make(chan int)

go func() {
defer close(errCh)
defer close(progressCh)

successIDs := make([]int, 0, len(actions))
watchIDs := make(map[int]struct{}, len(actions))
for _, action := range actions {
watchIDs[action.ID] = struct{}{}
}

ticker := time.NewTicker(c.client.pollInterval)
sendProgress := func(p int) {
defer ticker.Stop()
for {
select {
case progressCh <- p:
break
default:
case <-ctx.Done():
errCh <- ctx.Err()
return
case <-ticker.C:
break
}

opts := ActionListOpts{}
for watchID := range watchIDs {
opts.ID = append(opts.ID, watchID)
}

as, err := c.AllWithOpts(ctx, opts)
if err != nil {
errCh <- err
return
}

for _, a := range as {
switch a.Status {
case ActionStatusRunning:
continue
case ActionStatusSuccess:
delete(watchIDs, a.ID)
successIDs := append(successIDs, a.ID)
sendProgress(progressCh, int(float64(len(actions)-len(successIDs))/float64(len(actions))*100))
case ActionStatusError:
delete(watchIDs, a.ID)
errCh <- fmt.Errorf("action %d failed: %w", a.ID, a.Error())
}
}

if len(watchIDs) == 0 {
return
}
}
}()

return progressCh, errCh
}

// WatchProgress watches one action's progress until it completes with success or error.
func (c *ActionClient) WatchProgress(ctx context.Context, action *Action) (<-chan int, <-chan error) {
errCh := make(chan error, 1)
progressCh := make(chan int)

go func() {
defer close(errCh)
defer close(progressCh)

ticker := time.NewTicker(c.client.pollInterval)
defer ticker.Stop()

for {
select {
Expand All @@ -209,10 +286,9 @@ func (c *ActionClient) WatchProgress(ctx context.Context, action *Action) (<-cha

switch a.Status {
case ActionStatusRunning:
sendProgress(a.Progress)
break
sendProgress(progressCh, a.Progress)
case ActionStatusSuccess:
sendProgress(100)
sendProgress(progressCh, 100)
errCh <- nil
return
case ActionStatusError:
Expand All @@ -224,3 +300,12 @@ func (c *ActionClient) WatchProgress(ctx context.Context, action *Action) (<-cha

return progressCh, errCh
}

func sendProgress(progressCh chan int, p int) {
select {
case progressCh <- p:
break
default:
break
}
}
Loading

0 comments on commit 82bc463

Please sign in to comment.