Skip to content

Commit

Permalink
Update Context Deadline During Probe Calls (#386)
Browse files Browse the repository at this point in the history
* Probe testing

* Locking systemProbe for goroutines

* Create new deadline for probe calls with default timeout

* Address lint errors

* Revert goroutine probing
  • Loading branch information
falfaroc authored Dec 26, 2024
1 parent 200b918 commit dab475a
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 16 deletions.
60 changes: 45 additions & 15 deletions service/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ const (
sioReplicationPairExists = "A Replication Pair for the specified local volume already exists"

DriverConfigParamsYaml = "driver-config-params.yaml"

DefaultAPITimeout = 5 * time.Second
)

// Extra metadata field names for propagating to goscaleio and beyond.
Expand Down Expand Up @@ -2628,26 +2630,32 @@ func (s *service) systemProbeAll(ctx context.Context) error {
Log.Infof("probing zoneLabel '%s', zone value: '%s'", s.opts.zoneLabelKey, zoneName)
}

newCtx, cancel := createProbeContextWithDeadline(ctx)
defer cancel()

for _, array := range s.opts.arrays {
// If zone information is available, use it to probe the array
if usingZones && !array.isInZone(zoneName) {
// Driver node containers should not probe arrays that exist outside their assigned zone
// Driver controller container should probe all arrays
Log.Infof("array %s zone %s does not match %s, not pinging this array\n", array.SystemID, array.AvailabilityZone.Name, zoneName)
errMap[array.SystemID] = fmt.Errorf("array %s zone %s does not match %s, not pinging this array", array.SystemID, array.AvailabilityZone.Name, zoneName)
continue
}

err := s.systemProbe(ctx, array)
err := s.systemProbe(newCtx, array)
systemID := array.SystemID
if err == nil {
Log.Infof("array %s probed successfully", systemID)
allArrayFail = false
} else {
if err != nil {
errMap[systemID] = err
Log.Errorf("array %s probe failed: %v", array.SystemID, err)
} else {
allArrayFail = false
Log.Infof("array %s probed successfully", systemID)
}
}

Log.Printf("[SystemProbeAll] Number of failed probes: %d", len(errMap))

if allArrayFail {
return status.Error(codes.FailedPrecondition,
fmt.Sprintf("All arrays are not working. Could not proceed further: %v", errMap))
Expand Down Expand Up @@ -2685,21 +2693,23 @@ func (s *service) systemProbe(ctx context.Context, array *ArrayConnectionData) e
// Create ScaleIO API client if needed
if s.adminClients[systemID] == nil {
skipCertificateValidation := array.SkipCertificateValidation || array.Insecure
c, err := goscaleio.NewClientWithArgs(array.Endpoint, "", math.MaxInt64, skipCertificateValidation, !s.opts.DisableCerts)
client, err := goscaleio.NewClientWithArgs(array.Endpoint, "", math.MaxInt64, skipCertificateValidation, !s.opts.DisableCerts)
if err != nil {
return status.Errorf(codes.FailedPrecondition,
"unable to create ScaleIO client: %s", err.Error())
}
s.adminClients[systemID] = c

s.adminClients[systemID] = client
for _, name := range altSystemNames {
s.adminClients[name] = c
s.adminClients[name] = client
}
}

Log.Printf("Login to PowerFlex Gateway, system=%s, endpoint=%s, user=%s\n", systemID, array.Endpoint, array.Username)

if s.adminClients[systemID].GetToken() == "" {
_, err := s.adminClients[systemID].WithContext(ctx).Authenticate(&goscaleio.ConfigConnect{
client := s.adminClients[systemID]
if client.GetToken() == "" {
_, err := client.WithContext(ctx).Authenticate(&goscaleio.ConfigConnect{
Endpoint: array.Endpoint,
Username: array.Username,
Password: array.Password,
Expand All @@ -2712,24 +2722,24 @@ func (s *service) systemProbe(ctx context.Context, array *ArrayConnectionData) e

// initialize system if needed
if s.systems[systemID] == nil {
system, err := s.adminClients[systemID].WithContext(ctx).FindSystem(
array.SystemID, array.SystemID, "")
system, err := client.WithContext(ctx).FindSystem(array.SystemID, array.SystemID, "")
if err != nil {
return status.Errorf(codes.FailedPrecondition,
"unable to find matching PowerFlex system name: %s",
err.Error())
}

s.systems[systemID] = system
if system.System != nil && system.System.Name != "" {
Log.Printf("Found Name for system=%s with ID=%s", system.System.Name, system.System.ID)
s.connectedSystemNameToID[system.System.Name] = system.System.ID
s.systems[system.System.ID] = system
s.adminClients[system.System.ID] = s.adminClients[systemID]
s.adminClients[system.System.ID] = client
}
// associate alternate system name to systemID
for _, name := range altSystemNames {
s.systems[name] = system
s.adminClients[name] = s.adminClients[systemID]
s.adminClients[name] = client
s.connectedSystemNameToID[name] = system.System.ID
}
}
Expand All @@ -2740,7 +2750,8 @@ func (s *service) systemProbe(ctx context.Context, array *ArrayConnectionData) e
sysID = id
s.opts.arrays[sysID] = array
}
if array.IsDefault == true {

if array.IsDefault {
Log.Infof("default array is set to array ID: %s", sysID)
s.opts.defaultSystemID = sysID
Log.Printf("%s is the default array, skipping VolumePrefixToSystems map update. \n", sysID)
Expand All @@ -2750,6 +2761,7 @@ func (s *service) systemProbe(ctx context.Context, array *ArrayConnectionData) e
return err
}
}

return nil
}

Expand Down Expand Up @@ -3747,3 +3759,21 @@ func (s *service) verifySystem(systemID string) (*goscaleio.Client, error) {

return adminClient, nil
}

func createProbeContextWithDeadline(ctx context.Context) (context.Context, context.CancelFunc) {
defaultProbeDeadline := time.Now().Add(DefaultAPITimeout)
probeDeadline, ok := ctx.Deadline()
if !ok {
Log.Println("Probe deadline not in context, using default")
probeDeadline = time.Now().Add(DefaultAPITimeout)
}

// Set the deadline to be the lowest of the two times.
if probeDeadline.After(defaultProbeDeadline) {
Log.Printf("Original Probe Deadline %s is greater than defaultProbeDeadline %s, setting to default", probeDeadline, defaultProbeDeadline)
probeDeadline = defaultProbeDeadline
}

// Calculate the new deadline by subtracting the desired duration
return context.WithDeadline(ctx, probeDeadline)
}
11 changes: 10 additions & 1 deletion service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,17 @@ func (s *service) BeforeServe(
s.updateConfigMap(s.getIPAddressByInterface, ConfigMapFilePath)

if _, ok := csictx.LookupEnv(ctx, "X_CSI_VXFLEXOS_NO_PROBE_ON_START"); !ok {
return s.doProbe(ctx)
Log.Printf("BeforeServe probing starting %s", time.Now().Format("15:04:05.000000000"))
// probe before the server starts, to avoid errors in the controller, we must return before 2 seconds.
beforeServeMaxTimeout := 1 * time.Second
newContext, cancel := context.WithDeadline(ctx, time.Now().Add(beforeServeMaxTimeout))
defer cancel()

err := s.doProbe(newContext)
Log.Printf("BeforeServe probing complete %s", time.Now().Format("15:04:05.000000000"))
return err
}

return nil
}

Expand Down

0 comments on commit dab475a

Please sign in to comment.