Skip to content

Commit

Permalink
check for errors when getting GCP instance iterator
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Shmulevich <[email protected]>
  • Loading branch information
dmitsh committed Mar 1, 2025
1 parent f844abf commit 5f6f206
Show file tree
Hide file tree
Showing 4 changed files with 347 additions and 77 deletions.
25 changes: 18 additions & 7 deletions pkg/providers/gcp/instance_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"cloud.google.com/go/compute/apiv1/computepb"
"github.com/agrea/ptr"
"google.golang.org/api/iterator"
"k8s.io/klog/v2"

"github.com/NVIDIA/topograph/pkg/topology"
Expand All @@ -32,25 +33,27 @@ import (
func (p *baseProvider) generateInstanceTopology(ctx context.Context, pageSize *int, cis []topology.ComputeInstances) (*topology.ClusterTopology, error) {
client, err := p.clientFactory()
if err != nil {
return nil, fmt.Errorf("unable to get client: %v", err)
return nil, fmt.Errorf("failed to get client: %v", err)
}

projectID, err := client.ProjectID(ctx)
if err != nil {
return nil, fmt.Errorf("unable to get project ID: %v", err)
return nil, fmt.Errorf("failed to get project ID: %v", err)
}

topo := topology.NewClusterTopology()

maxRes := castPageSize(pageSize)
for _, ci := range cis {
p.generateRegionInstanceTopology(ctx, client, projectID, maxRes, topo, &ci)
if err := p.generateRegionInstanceTopology(ctx, client, projectID, maxRes, topo, &ci); err != nil {
return nil, fmt.Errorf("failed to get instance topology: %v", err)
}
}

return topo, nil
}

func (p *baseProvider) generateRegionInstanceTopology(ctx context.Context, client Client, projectID string, maxRes *uint32, topo *topology.ClusterTopology, ci *topology.ComputeInstances) {
func (p *baseProvider) generateRegionInstanceTopology(ctx context.Context, client Client, projectID string, maxRes *uint32, topo *topology.ClusterTopology, ci *topology.ComputeInstances) error {
klog.InfoS("Getting instance topology", "region", ci.Region, "project", projectID)

req := computepb.ListInstancesRequest{
Expand All @@ -62,8 +65,16 @@ func (p *baseProvider) generateRegionInstanceTopology(ctx context.Context, clien

for {
klog.V(4).InfoS("ListInstances", "request", req.String())
instances, token := client.Instances(ctx, &req)
for _, instance := range instances {
iter, token := client.Instances(ctx, &req)
for {
instance, err := iter.Next()
if err != nil {
if err == iterator.Done {
break
} else {
return err
}
}
instanceId := strconv.FormatUint(*instance.Id, 10)
klog.V(4).Infof("Checking instance %s", instanceId)

Expand Down Expand Up @@ -96,7 +107,7 @@ func (p *baseProvider) generateRegionInstanceTopology(ctx context.Context, clien

if len(token) == 0 {
klog.V(4).Infof("Total processed nodes: %d", topo.Len())
return
return nil
}
req.PageToken = &token
}
Expand Down
48 changes: 8 additions & 40 deletions pkg/providers/gcp/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
computepb "cloud.google.com/go/compute/apiv1/computepb"
"cloud.google.com/go/compute/metadata"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/api/iterator"
v1 "k8s.io/api/core/v1"

"github.com/NVIDIA/topograph/pkg/providers"
Expand All @@ -40,53 +39,28 @@ type baseProvider struct {

type ClientFactory func() (Client, error)

type InstanceIterator interface {
Next() (*computepb.Instance, error)
}

type Client interface {
ProjectID(ctx context.Context) (string, error)
Zones(ctx context.Context, req *computepb.ListZonesRequest, opts ...gax.CallOption) []string
Instances(ctx context.Context, req *computepb.ListInstancesRequest, opts ...gax.CallOption) ([]*computepb.Instance, string)
Instances(ctx context.Context, req *computepb.ListInstancesRequest, opts ...gax.CallOption) (InstanceIterator, string)
}

type gcpClient struct {
zoneClient *compute_v1.ZonesClient
instanceClient *compute_v1.InstancesClient
}

func (c *gcpClient) ProjectID(ctx context.Context) (string, error) {
return metadata.ProjectIDWithContext(ctx)
}

func (c *gcpClient) Zones(ctx context.Context, req *computepb.ListZonesRequest, opts ...gax.CallOption) []string {
now := time.Now()
iter := c.zoneClient.List(ctx, req, opts...)
requestLatency.WithLabelValues("ListZones").Observe(time.Since(now).Seconds())

zones := make([]string, 0)
for {
zone, err := iter.Next()
if err == iterator.Done {
break
}
zones = append(zones, *zone.Name)
}

return zones
}

func (c *gcpClient) Instances(ctx context.Context, req *computepb.ListInstancesRequest, opts ...gax.CallOption) ([]*computepb.Instance, string) {
func (c *gcpClient) Instances(ctx context.Context, req *computepb.ListInstancesRequest, opts ...gax.CallOption) (InstanceIterator, string) {
now := time.Now()
iter := c.instanceClient.List(ctx, req, opts...)
requestLatency.WithLabelValues("ListInstances").Observe(time.Since(now).Seconds())

instances := make([]*computepb.Instance, 0)
for {
instance, err := iter.Next()
if err == iterator.Done {
break
}
instances = append(instances, instance)
}

return instances, iter.PageInfo().Token
return iter, iter.PageInfo().Token
}

func NamedLoader() (string, providers.Loader) {
Expand All @@ -95,18 +69,12 @@ func NamedLoader() (string, providers.Loader) {

func Loader(ctx context.Context, config providers.Config) (providers.Provider, error) {
clientFactory := func() (Client, error) {
zoneClient, err := compute_v1.NewZonesRESTClient(ctx)
if err != nil {
return nil, fmt.Errorf("unable to get zones client: %s", err.Error())
}

instanceClient, err := compute_v1.NewInstancesRESTClient(ctx)
if err != nil {
return nil, fmt.Errorf("unable to get instances client: %s", err.Error())
return nil, fmt.Errorf("failed to get instances client: %s", err.Error())
}

return &gcpClient{
zoneClient: zoneClient,
instanceClient: instanceClient,
}, nil
}
Expand Down
116 changes: 86 additions & 30 deletions pkg/providers/gcp/provider_sim.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

computepb "cloud.google.com/go/compute/apiv1/computepb"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/api/iterator"

"github.com/NVIDIA/topograph/pkg/models"
"github.com/NVIDIA/topograph/pkg/providers"
Expand All @@ -33,35 +34,88 @@ const (
NAME_SIM = "gcp-sim"
)

type SimClient struct {
Model *models.Model
type simClient struct {
model *models.Model
pages []*simInstanceIter
}

func (c *SimClient) ProjectID(ctx context.Context) (string, error) {
return "", nil
type simInstanceIter struct {
instances []*computepb.Instance
indx int
next bool
err bool
}

func (c *SimClient) Zones(ctx context.Context, req *computepb.ListZonesRequest, opts ...gax.CallOption) []string {
return []string{"zone"}
func (iter *simInstanceIter) Next() (*computepb.Instance, error) {
if iter.err {
return nil, fmt.Errorf("iterator error")
}

if iter.indx >= len(iter.instances) {
return nil, iterator.Done
}
ret := iter.instances[iter.indx]
iter.indx++

return ret, nil
}

func (c *SimClient) Instances(ctx context.Context, req *computepb.ListInstancesRequest, opts ...gax.CallOption) ([]*computepb.Instance, string) {
instances := make([]*computepb.Instance, 0, len(c.Model.Nodes))

for _, node := range c.Model.Nodes {
physicalHost := fmt.Sprintf("/%s/%s/%s", node.NetLayers[1], node.NetLayers[0], node.Name)
instanceID, _ := strconv.ParseUint(node.Name, 10, 64)
instance := &computepb.Instance{
Id: &instanceID,
Name: &node.Name,
ResourceStatus: &computepb.ResourceStatus{
PhysicalHost: &physicalHost,
},
func newSimClient(model *models.Model) (*simClient, error) {
// divide nodes into 2 pages
n := len(model.Nodes)
nodeNames := make([]string, 0, n)
for name := range model.Nodes {
nodeNames = append(nodeNames, name)
}
mid := n / 2
pages := make([]*simInstanceIter, 2)

for i, pair := range []struct{ from, to int }{
{from: 0, to: mid},
{from: mid + 1, to: n - 1},
} {
if pair.from > pair.to {
pages[i] = &simInstanceIter{}
} else {
instances := make([]*computepb.Instance, 0, pair.to-pair.from+1)
for j := pair.from; j <= pair.to; j++ {
node := model.Nodes[nodeNames[j]]
physicalHost := fmt.Sprintf("/%s/%s/%s", node.NetLayers[1], node.NetLayers[0], node.Name)
instanceID, err := strconv.ParseUint(node.Name, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid instance ID %q; must be numerical", node.Name)
}
instance := &computepb.Instance{
Id: &instanceID,
Name: &node.Name,
ResourceStatus: &computepb.ResourceStatus{
PhysicalHost: &physicalHost,
},
}
instances = append(instances, instance)
}
pages[i] = &simInstanceIter{instances: instances}
}
instances = append(instances, instance)
}

return instances, ""
pages[0].next = true

return &simClient{
model: model,
pages: pages,
}, nil
}

func (c *simClient) ProjectID(ctx context.Context) (string, error) {
return "", nil
}

func (c *simClient) Instances(ctx context.Context, req *computepb.ListInstancesRequest, opts ...gax.CallOption) (InstanceIterator, string) {
if req.PageToken == nil {
return c.pages[0], "next"
} else {
return c.pages[1], ""
}
}

func NamedLoaderSim() (string, providers.Loader) {
Expand All @@ -74,35 +128,37 @@ func LoaderSim(ctx context.Context, cfg providers.Config) (providers.Provider, e
return nil, err
}

csp_model, err := models.NewModelFromFile(p.ModelPath)
model, err := models.NewModelFromFile(p.ModelPath)
if err != nil {
return nil, fmt.Errorf("unable to load model file for simulation, %v", err)
return nil, fmt.Errorf("failed to load model file for simulation: %v", err)
}
simClient := &SimClient{
Model: csp_model,

client, err := newSimClient(model)
if err != nil {
return nil, fmt.Errorf("failed to create simulation client: %v", err)
}

clientFactory := func() (Client, error) {
return simClient, nil
return client, nil
}

return NewSim(clientFactory), nil
}

type SimProvider struct {
type simProvider struct {
baseProvider
}

func NewSim(clientFactory ClientFactory) *SimProvider {
return &SimProvider{
func NewSim(clientFactory ClientFactory) *simProvider {
return &simProvider{
baseProvider: baseProvider{clientFactory: clientFactory},
}
}

// Engine support

func (p *SimProvider) GetComputeInstances(ctx context.Context) ([]topology.ComputeInstances, error) {
func (p *simProvider) GetComputeInstances(ctx context.Context) ([]topology.ComputeInstances, error) {
client, _ := p.clientFactory()

return client.(*SimClient).Model.Instances, nil
return client.(*simClient).model.Instances, nil
}
Loading

0 comments on commit 5f6f206

Please sign in to comment.