Skip to content

Commit

Permalink
Create new deadline for probe calls with default timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
falfaroc committed Dec 23, 2024
1 parent 79f3be0 commit c7d6a8e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 83 deletions.
105 changes: 30 additions & 75 deletions service/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ const (
sioReplicationPairExists = "A Replication Pair for the specified local volume already exists"

DriverConfigParamsYaml = "driver-config-params.yaml"

DefaultAPITimeout = 5 * time.Second
)

// Extra metadata field names for propagating to goscaleio and beyond.
Expand All @@ -155,8 +157,6 @@ const (

var interestingParameters = [...]string{0: "FsType", 1: KeyMkfsFormatOption, 2: KeyBandwidthLimitInKbps, 3: KeyIopsLimit}

// var lock = sync.RWMutex{}

type ZoneContent struct {
systemID string
protectionDomain ProtectionDomainName
Expand Down Expand Up @@ -2616,8 +2616,6 @@ func (s *service) getZoneFromZoneLabelKey(ctx context.Context, zoneLabelKey stri
func (s *service) systemProbeAll(ctx context.Context) error {
// probe all arrays
Log.Infoln("Probing all associated arrays")
// allArrayFail := true
// errMap := make(map[string]error)
errMap := new(sync.Map)
zoneName := ""
usingZones := s.opts.zoneLabelKey != "" && s.isNodeMode()
Expand All @@ -2634,18 +2632,9 @@ func (s *service) systemProbeAll(ctx context.Context) error {
var wg sync.WaitGroup
errChan := make(chan error, len(s.opts.arrays))

deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(10 * time.Second)
}

// Calculate the new deadline by subtracting the desired duration
newDeadline := deadline.Add(-(100 * time.Millisecond))
newCtx, cancel := context.WithDeadline(ctx, newDeadline)
newCtx, cancel := createProbeContextWithDeadline(ctx)
defer cancel()

Log.Printf("[SystemProbeAll - FERNANDO] context deadline: %v, new deadline: %v", deadline, newDeadline)

for _, array := range s.opts.arrays {
// If zone information is available, use it to probe the array
if usingZones && !array.isInZone(zoneName) {
Expand All @@ -2661,18 +2650,14 @@ func (s *service) systemProbeAll(ctx context.Context) error {
go func(array *ArrayConnectionData) {
defer wg.Done()

Log.Infof("[SystemProbeAll - FERNANDO] probing array %s", array.SystemID)

err := s.systemProbe(newCtx, array)
systemID := array.SystemID
if err != nil {
// errMap[systemID] = err
errMap.Store(systemID, err)
Log.Errorf("array %s probe failed: %v", array.SystemID, err)
errChan <- err
} else {
Log.Infof("array %s probed successfully", systemID)
// allArrayFail = false
}

}(array)
Expand Down Expand Up @@ -2700,8 +2685,8 @@ func (s *service) systemProbeAll(ctx context.Context) error {

// systemProbe will probe the given array
func (s *service) systemProbe(ctx context.Context, array *ArrayConnectionData) error {
s.mutex.Lock()
defer s.mutex.Unlock()
s.probeMutex.Lock()
defer s.probeMutex.Unlock()

// Check that we have the details needed to login to the Gateway
if array.Endpoint == "" {
Expand Down Expand Up @@ -2730,26 +2715,23 @@ func (s *service) systemProbe(ctx context.Context, array *ArrayConnectionData) e
// Create ScaleIO API client if needed
if s.adminClients[systemID] == nil {
skipCertificateValidation := array.SkipCertificateValidation || array.Insecure
c, err := goscaleio.NewClientWithArgs(array.Endpoint, "", math.MaxInt64, skipCertificateValidation, !s.opts.DisableCerts)
client, err := goscaleio.NewClientWithArgs(array.Endpoint, "", math.MaxInt64, skipCertificateValidation, !s.opts.DisableCerts)
if err != nil {
return status.Errorf(codes.FailedPrecondition,
"unable to create ScaleIO client: %s", err.Error())
}
s.adminClients[systemID] = c
// s.setMuxAdminClient(systemID, c)

s.adminClients[systemID] = client
for _, name := range altSystemNames {
s.adminClients[name] = c
// s.setMuxAdminClient(name, c)
s.adminClients[name] = client
}
}

Log.Printf("Login to PowerFlex Gateway, system=%s, endpoint=%s, user=%s\n", systemID, array.Endpoint, array.Username)

// client := s.getMuxAdminClient(systemID)
// client := s.adminClients[systemID]

if s.adminClients[systemID].GetToken() == "" {
_, err := s.adminClients[systemID].WithContext(ctx).Authenticate(&goscaleio.ConfigConnect{
client := s.adminClients[systemID]
if client.GetToken() == "" {
_, err := client.WithContext(ctx).Authenticate(&goscaleio.ConfigConnect{
Endpoint: array.Endpoint,
Username: array.Username,
Password: array.Password,
Expand All @@ -2762,34 +2744,25 @@ func (s *service) systemProbe(ctx context.Context, array *ArrayConnectionData) e

// initialize system if needed
if s.systems[systemID] == nil {
system, err := s.adminClients[systemID].WithContext(ctx).FindSystem(
array.SystemID, array.SystemID, "")
system, err := client.WithContext(ctx).FindSystem(array.SystemID, array.SystemID, "")
if err != nil {
return status.Errorf(codes.FailedPrecondition,
"unable to find matching PowerFlex system name: %s",
err.Error())
}

s.systems[systemID] = system
// s.setMuxSystem(systemID, system)
if system.System != nil && system.System.Name != "" {
Log.Printf("Found Name for system=%s with ID=%s", system.System.Name, system.System.ID)
s.connectedSystemNameToID[system.System.Name] = system.System.ID
s.systems[system.System.ID] = system
s.adminClients[system.System.ID] = s.adminClients[systemID]

// s.setMuxConnectedSystemNameToID(system.System.Name, system.System.ID)
// s.setMuxSystem(system.System.ID, system)
// s.setMuxAdminClient(system.System.ID, client)
s.adminClients[system.System.ID] = client
}
// associate alternate system name to systemID
for _, name := range altSystemNames {
s.systems[name] = system
s.adminClients[name] = s.adminClients[systemID]
s.adminClients[name] = client
s.connectedSystemNameToID[name] = system.System.ID

// s.setMuxSystem(name, system)
// s.setMuxAdminClient(name, client)
// s.setMuxConnectedSystemNameToID(name, system.System.ID)
}
}

Expand Down Expand Up @@ -3809,38 +3782,20 @@ func (s *service) verifySystem(systemID string) (*goscaleio.Client, error) {
return adminClient, nil
}

func (s *service) setMuxAdminClient(systemID string, client *goscaleio.Client) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.adminClients[systemID] = client
}

func (s *service) getMuxAdminClient(systemID string) *goscaleio.Client {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.adminClients[systemID]
}

func (s *service) setMuxSystem(name string, system *goscaleio.System) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.systems[name] = system
}

func (s *service) getMuxSystem(name string) *goscaleio.System {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.systems[name]
}
func createProbeContextWithDeadline(ctx context.Context) (context.Context, context.CancelFunc) {
defaultProbeDeadline := time.Now().Add(DefaultAPITimeout)
probeDeadline, ok := ctx.Deadline()
if !ok {
Log.Println("Probe deadline not in context, using default")
probeDeadline = time.Now().Add(DefaultAPITimeout)
}

func (s *service) setMuxConnectedSystemNameToID(name, id string) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.connectedSystemNameToID[name] = id
}
// Set the deadline to be the lowest of the two times.
if probeDeadline.After(defaultProbeDeadline) {
Log.Printf("Original Probe Deadline %s is greater than defaultProbeDeadline %s, setting to default", probeDeadline, defaultProbeDeadline)
probeDeadline = defaultProbeDeadline
}

func (s *service) getMuxConnectedSystemNameToID(name string) string {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.connectedSystemNameToID[name]
// Calculate the new deadline by subtracting the desired duration
return context.WithDeadline(ctx, probeDeadline)
}
16 changes: 8 additions & 8 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ type service struct {
volumePrefixToSystems map[string][]string
connectedSystemNameToID map[string]string

mutex sync.Mutex
probeMutex sync.Mutex
}

type Config struct {
Expand Down Expand Up @@ -545,18 +545,18 @@ func (s *service) BeforeServe(
// Update the ConfigMap with the Interface IPs
s.updateConfigMap(s.getIPAddressByInterface, ConfigMapFilePath)

Log.Printf("[BeforeServe] Context: %+v", ctx)

newContext, cancel := context.WithDeadline(ctx, time.Now().Add(1*time.Second))
defer cancel()

if _, ok := csictx.LookupEnv(ctx, "X_CSI_VXFLEXOS_NO_PROBE_ON_START"); !ok {
Log.Printf("BeforeServe probing starting %s", time.Now().Format("15:04:05.000000000"))
// probe before the server starts, to avoid errors in the controller, we must return before 2 seconds.
beforeServeMaxTimeout := 1 * time.Second
newContext, cancel := context.WithDeadline(ctx, time.Now().Add(beforeServeMaxTimeout))
defer cancel()

err := s.doProbe(newContext)
Log.Printf("BeforeServe - doProbe: Completed at %s", time.Now().Format(time.RFC3339))
Log.Printf("BeforeServe probing complete %s", time.Now().Format("15:04:05.000000000"))
return err
}

Log.Printf("BeforeServe: Completed at %s", time.Now().Format(time.RFC3339))
return nil
}

Expand Down

0 comments on commit c7d6a8e

Please sign in to comment.