diff --git a/client/allocrunner/taskrunner/stats_hook.go b/client/allocrunner/taskrunner/stats_hook.go index e227f9524db..4dee3315cf8 100644 --- a/client/allocrunner/taskrunner/stats_hook.go +++ b/client/allocrunner/taskrunner/stats_hook.go @@ -88,56 +88,18 @@ func (h *statsHook) Exited(context.Context, *interfaces.TaskExitedRequest, *inte // Collection ends when the passed channel is closed func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interfaces.DriverStats) { - ch, err := handle.Stats(ctx, h.interval) +MAIN: + ch, err := h.callStatsWithRetry(ctx, handle) if err != nil { - // Check if the driver doesn't implement stats - if err.Error() == cstructs.DriverStatsNotImplemented.Error() { - h.logger.Debug("driver does not support stats") - return - } - h.logger.Error("failed to start stats collection for task", "error", err) + return } - var backoff time.Duration - var retry int - limit := time.Second * 5 for { - time.Sleep(backoff) select { case ru, ok := <-ch: - // Channel is closed + // if channel closes, re-establish a new one if !ok { - var re *structs.RecoverableError - ch, err = handle.Stats(ctx, h.interval) - if err == nil { - goto RETRY - } - - // We do not log when the plugin is shutdown since this is - // likely because the driver plugin has unexpectedly exited, - // in which case sleeping and trying again or returning based - // on the stop channel is the correct behavior - if err != bstructs.ErrPluginShutdown { - h.logger.Debug("error fetching stats of task", "error", err) - goto RETRY - } - // check if the error is terminal otherwise it's likely a - // transport error and we should retry - re, ok = err.(*structs.RecoverableError) - if ok && re.IsUnrecoverable() { - return - } - h.logger.Warn("stats collection for task failed", "error", err) - RETRY: - // Calculate the new backoff - backoff = (1 << (2 * uint64(retry))) * time.Second - if backoff > limit { - backoff = limit - } - // Increment retry counter - retry++ - - continue + goto MAIN } // Update stats on TaskRunner and emit them @@ -149,6 +111,59 @@ func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interf } } +// callStatsWithRetry invokes handle driver Stats() functions and retries until channel is established +// successfully. Returns an error if it encounters a permanent error. +// +// It logs the errors with appropriate log levels; don't log returned error +func (h *statsHook) callStatsWithRetry(ctx context.Context, handle interfaces.DriverStats) (<-chan *cstructs.TaskResourceUsage, error) { + var retry int + +MAIN: + if ctx.Err() != nil { + return nil, ctx.Err() + } + + ch, err := handle.Stats(ctx, h.interval) + if err == nil { + return ch, nil + } + + // Check if the driver doesn't implement stats + if err.Error() == cstructs.DriverStatsNotImplemented.Error() { + h.logger.Debug("driver does not support stats") + return nil, err + } + + // check if the error is terminal otherwise it's likely a + // transport error and we should retry + if re, ok := err.(*structs.RecoverableError); ok && re.IsUnrecoverable() { + h.logger.Error("failed to start stats collection for task with unrecoverable error", "error", err) + return nil, err + } + + // We do not warn when the plugin is shutdown since this is + // likely because the driver plugin has unexpectedly exited, + // in which case sleeping and trying again or returning based + // on the stop channel is the correct behavior + if err == bstructs.ErrPluginShutdown { + h.logger.Debug("failed to fetching stats of task", "error", err) + } else { + h.logger.Error("failed to start stats collection for task", "error", err) + } + + limit := time.Second * 5 + backoff := 1 << (2 * uint64(retry)) * time.Second + if backoff > limit || retry > 5 { + backoff = limit + } + + // Increment retry counter + retry++ + + time.Sleep(backoff) + goto MAIN +} + func (h *statsHook) Shutdown() { h.mu.Lock() defer h.mu.Unlock() diff --git a/plugins/drivers/client.go b/plugins/drivers/client.go index ff284de6cc0..3bb81a4f4b6 100644 --- a/plugins/drivers/client.go +++ b/plugins/drivers/client.go @@ -269,7 +269,7 @@ func (d *driverPluginClient) TaskStats(ctx context.Context, taskID string, inter return nil, structs.NewRecoverableError(err, rec.Recoverable) } } - return nil, err + return nil, grpcutils.HandleGrpcErr(err, d.doneCtx) } ch := make(chan *cstructs.TaskResourceUsage, 1)