Skip to content

Commit

Permalink
[GCE] Support paginated instance listing
Browse files Browse the repository at this point in the history
  • Loading branch information
x13n committed Dec 15, 2023
1 parent 49bed4c commit e3d3303
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 51 deletions.
124 changes: 77 additions & 47 deletions cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,69 +308,99 @@ func (client *autoscalingGceClientV1) DeleteInstances(migRef GceRef, instances [

func (client *autoscalingGceClientV1) FetchMigInstances(migRef GceRef) ([]cloudprovider.Instance, error) {
registerRequest("instance_group_managers", "list_managed_instances")
gceInstances, err := client.gceService.InstanceGroupManagers.ListManagedInstances(migRef.Project, migRef.Zone, migRef.Name).Do()
b := newInstanceListBuilder(migRef)
err := client.gceService.InstanceGroupManagers.ListManagedInstances(migRef.Project, migRef.Zone, migRef.Name).Pages(context.Background(), b.loadPage)
if err != nil {
klog.V(4).Infof("Failed MIG info request for %s %s %s: %v", migRef.Project, migRef.Zone, migRef.Name, err)
return nil, err
}
infos := []cloudprovider.Instance{}
errorCodeCounts := make(map[string]int)
errorLoggingQuota := klogx.NewLoggingQuota(100)
for _, gceInstance := range gceInstances.ManagedInstances {
return b.build(), nil
}

type instanceListBuilder struct {
migRef GceRef
errorCodeCounts map[string]int
errorLoggingQuota *klogx.Quota
infos []cloudprovider.Instance
}

func newInstanceListBuilder(migRef GceRef) *instanceListBuilder {
return &instanceListBuilder{
migRef: migRef,
errorCodeCounts: make(map[string]int),
errorLoggingQuota: klogx.NewLoggingQuota(100),
}
}

func (i *instanceListBuilder) loadPage(page *gce.InstanceGroupManagersListManagedInstancesResponse) error {
if i.infos == nil {
i.infos = make([]cloudprovider.Instance, 0, len(page.ManagedInstances))
}
for _, gceInstance := range page.ManagedInstances {
ref, err := ParseInstanceUrlRef(gceInstance.Instance)
if err != nil {
klog.Errorf("Received error while parsing of the instance url: %v", err)
continue
}
instance := i.gceInstanceToInstance(ref, gceInstance)
i.infos = append(i.infos, instance)
}
return nil
}

instance := cloudprovider.Instance{
Id: ref.ToProviderId(),
Status: &cloudprovider.InstanceStatus{
State: getInstanceState(gceInstance.CurrentAction),
},
}
func (i *instanceListBuilder) gceInstanceToInstance(ref GceRef, gceInstance *gce.ManagedInstance) cloudprovider.Instance {
instance := cloudprovider.Instance{
Id: ref.ToProviderId(),
Status: &cloudprovider.InstanceStatus{
State: getInstanceState(gceInstance.CurrentAction),
},
}

if instance.Status.State == cloudprovider.InstanceCreating {
var errorInfo *cloudprovider.InstanceErrorInfo
errorMessages := []string{}
lastAttemptErrors := getLastAttemptErrors(gceInstance)
for _, instanceError := range lastAttemptErrors {
errorCodeCounts[instanceError.Code]++
if newErrorInfo := GetErrorInfo(instanceError.Code, instanceError.Message, gceInstance.InstanceStatus, errorInfo); newErrorInfo != nil {
// override older error
errorInfo = newErrorInfo
} else {
// no error
continue
}
if instance.Status.State != cloudprovider.InstanceCreating {
return instance
}

if instanceError.Message != "" {
errorMessages = append(errorMessages, instanceError.Message)
}
}
if errorInfo != nil {
errorInfo.ErrorMessage = strings.Join(errorMessages, "; ")
instance.Status.ErrorInfo = errorInfo
}
var errorInfo *cloudprovider.InstanceErrorInfo
errorMessages := []string{}
lastAttemptErrors := getLastAttemptErrors(gceInstance)
for _, instanceError := range lastAttemptErrors {
i.errorCodeCounts[instanceError.Code]++
if newErrorInfo := GetErrorInfo(instanceError.Code, instanceError.Message, gceInstance.InstanceStatus, errorInfo); newErrorInfo != nil {
// override older error
errorInfo = newErrorInfo
} else {
// no error
continue
}
if instanceError.Message != "" {
errorMessages = append(errorMessages, instanceError.Message)
}
}
if errorInfo != nil {
errorInfo.ErrorMessage = strings.Join(errorMessages, "; ")
instance.Status.ErrorInfo = errorInfo
}

if len(lastAttemptErrors) > 0 {
gceInstanceJSONBytes, err := gceInstance.MarshalJSON()
var gceInstanceJSON string
if err != nil {
gceInstanceJSON = fmt.Sprintf("Got error from MarshalJSON; %v", err)
} else {
gceInstanceJSON = string(gceInstanceJSONBytes)
}
klogx.V(4).UpTo(errorLoggingQuota).Infof("Got GCE instance which is being created and has lastAttemptErrors; gceInstance=%v; errorInfo=%#v", gceInstanceJSON, errorInfo)
}
if len(lastAttemptErrors) > 0 {
gceInstanceJSONBytes, err := gceInstance.MarshalJSON()
var gceInstanceJSON string
if err != nil {
gceInstanceJSON = fmt.Sprintf("Got error from MarshalJSON; %v", err)
} else {
gceInstanceJSON = string(gceInstanceJSONBytes)
}
infos = append(infos, instance)
klogx.V(4).UpTo(i.errorLoggingQuota).Infof("Got GCE instance which is being created and has lastAttemptErrors; gceInstance=%v; errorInfo=%#v", gceInstanceJSON, errorInfo)
}
klogx.V(4).Over(errorLoggingQuota).Infof("Got %v other GCE instances being created with lastAttemptErrors", -errorLoggingQuota.Left())
if len(errorCodeCounts) > 0 {
klog.Warningf("Spotted following instance creation error codes: %#v", errorCodeCounts)

return instance
}

func (i *instanceListBuilder) build() []cloudprovider.Instance {
klogx.V(4).Over(i.errorLoggingQuota).Infof("Got %v other GCE instances being created with lastAttemptErrors", -i.errorLoggingQuota.Left())
if len(i.errorCodeCounts) > 0 {
klog.Warningf("Spotted following instance creation error codes: %#v", i.errorCodeCounts)
}
return infos, nil
return i.infos
}

// GetErrorInfo maps the error code, error message and instance status to CA instance error info
Expand Down
159 changes: 156 additions & 3 deletions cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,10 @@ func TestFetchMigInstancesInstanceUrlHandling(t *testing.T) {
g := newTestAutoscalingGceClient(t, "project1", server.URL, "")

testCases := []struct {
name string
lmiResponse gce_api.InstanceGroupManagersListManagedInstancesResponse
wantInstances []cloudprovider.Instance
name string
lmiResponse gce_api.InstanceGroupManagersListManagedInstancesResponse
lmiPageResponses map[string]gce_api.InstanceGroupManagersListManagedInstancesResponse
wantInstances []cloudprovider.Instance
}{
{
name: "all instances good",
Expand Down Expand Up @@ -269,6 +270,153 @@ func TestFetchMigInstancesInstanceUrlHandling(t *testing.T) {
},
},
},
{
name: "paginated response",
lmiResponse: gce_api.InstanceGroupManagersListManagedInstancesResponse{
ManagedInstances: []*gce_api.ManagedInstance{
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 2),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 42),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
},
NextPageToken: "foo",
},
lmiPageResponses: map[string]gce_api.InstanceGroupManagersListManagedInstancesResponse{
"foo": {
ManagedInstances: []*gce_api.ManagedInstance{
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 123),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 456),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
},
},
},
wantInstances: []cloudprovider.Instance{
{
Id: "gce://myprojid/myzone/myinst_2",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
{
Id: "gce://myprojid/myzone/myinst_42",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
{
Id: "gce://myprojid/myzone/myinst_123",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
{
Id: "gce://myprojid/myzone/myinst_456",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
},
},
{
name: "paginated response, more pages",
lmiResponse: gce_api.InstanceGroupManagersListManagedInstancesResponse{
ManagedInstances: []*gce_api.ManagedInstance{
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 2),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 42),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
},
NextPageToken: "foo",
},
lmiPageResponses: map[string]gce_api.InstanceGroupManagersListManagedInstancesResponse{
"foo": {
ManagedInstances: []*gce_api.ManagedInstance{
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 123),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 456),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
},
NextPageToken: "bar",
},
"bar": {
ManagedInstances: []*gce_api.ManagedInstance{
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 789),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 666),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
},
},
},
wantInstances: []cloudprovider.Instance{
{
Id: "gce://myprojid/myzone/myinst_2",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
{
Id: "gce://myprojid/myzone/myinst_42",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
{
Id: "gce://myprojid/myzone/myinst_123",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
{
Id: "gce://myprojid/myzone/myinst_456",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
{
Id: "gce://myprojid/myzone/myinst_789",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
{
Id: "gce://myprojid/myzone/myinst_666",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
},
},
{
name: "instances with bad url",
lmiResponse: gce_api.InstanceGroupManagersListManagedInstancesResponse{
Expand Down Expand Up @@ -329,6 +477,11 @@ func TestFetchMigInstancesInstanceUrlHandling(t *testing.T) {
b, err := json.Marshal(tc.lmiResponse)
assert.NoError(t, err)
server.On("handle", "/projects/zones/instanceGroupManagers/listManagedInstances").Return(string(b)).Times(1)
for token, response := range tc.lmiPageResponses {
b, err := json.Marshal(response)
assert.NoError(t, err)
server.On("handle", "/projects/zones/instanceGroupManagers/listManagedInstances", token).Return(string(b)).Times(1)
}
gotInstances, err := g.FetchMigInstances(GceRef{})
assert.NoError(t, err)
if diff := cmp.Diff(tc.wantInstances, gotInstances, cmpopts.EquateErrors()); diff != "" {
Expand Down
8 changes: 7 additions & 1 deletion cluster-autoscaler/utils/test/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,14 @@ func NewHttpServerMock(fields ...HttpServerMockField) *HttpServerMock {

func (l *HttpServerMock) handle(req *http.Request, w http.ResponseWriter, serverMock *HttpServerMock) string {
url := req.URL.Path
query := req.URL.Query()
var response string
args := l.Called(url)
var args mock.Arguments
if query.Has("pageToken") {
args = l.Called(url, query.Get("pageToken"))
} else {
args = l.Called(url)
}
for i, field := range l.fields {
switch field {
case MockFieldResponse:
Expand Down

0 comments on commit e3d3303

Please sign in to comment.