Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect driver stats when driver plugins are restarted #5948

Merged
merged 3 commits into from
Jul 17, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 57 additions & 43 deletions client/allocrunner/taskrunner/stats_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,56 +88,18 @@ func (h *statsHook) Exited(context.Context, *interfaces.TaskExitedRequest, *inte
// Collection ends when the passed channel is closed
func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interfaces.DriverStats) {

ch, err := handle.Stats(ctx, h.interval)
MAIN:
ch, err := h.callStatsWithRetry(ctx, handle)
if err != nil {
// Check if the driver doesn't implement stats
if err.Error() == cstructs.DriverStatsNotImplemented.Error() {
h.logger.Debug("driver does not support stats")
return
}
h.logger.Error("failed to start stats collection for task", "error", err)
return
}

var backoff time.Duration
var retry int
limit := time.Second * 5
for {
time.Sleep(backoff)
select {
case ru, ok := <-ch:
// Channel is closed
// if channel closes, re-establish a new one
if !ok {
var re *structs.RecoverableError
ch, err = handle.Stats(ctx, h.interval)
if err == nil {
goto RETRY
}

// We do not log when the plugin is shutdown since this is
// likely because the driver plugin has unexpectedly exited,
// in which case sleeping and trying again or returning based
// on the stop channel is the correct behavior
if err != bstructs.ErrPluginShutdown {
h.logger.Debug("error fetching stats of task", "error", err)
goto RETRY
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my rewrite, I flipped this conditional logic. The comment doesn't match the conditional. I assume the comment intends to say that if the plugin is shutdown, we want to log in debug level rather than an error level (done in L130). If so, the check here should be err == bstructs.ErrPluginShutdown

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment is correct - its saying that if the err is ErrPluginShutdown we should not be logging because that logging is unnecessary. I am not sure about retrying here vs one layer up to call TaskStats once the plugin restarts though.

Not an expert in this code though, so @nickethier should chime in..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the old code, if err != bstructs.ErrPluginShutdown we log in debug mode in L121, but if it's ErrPluginShutdown, then we will log with a Warn level in L130. I'm having a hard time reconciling that with comment.

The old logic for logging is in:

if err != bstructs.ErrPluginShutdown {
h.logger.Debug("error fetching stats of task", "error", err)
goto RETRY
}
// check if the error is terminal otherwise it's likely a
// transport error and we should retry
re, ok = err.(*structs.RecoverableError)
if ok && re.IsUnrecoverable() {
return
}
h.logger.Warn("stats collection for task failed", "error", err)

}
// check if the error is terminal otherwise it's likely a
// transport error and we should retry
re, ok = err.(*structs.RecoverableError)
if ok && re.IsUnrecoverable() {
return
}
h.logger.Warn("stats collection for task failed", "error", err)
RETRY:
// Calculate the new backoff
backoff = (1 << (2 * uint64(retry))) * time.Second
if backoff > limit {
backoff = limit
}
// Increment retry counter
retry++

continue
goto MAIN
}

// Update stats on TaskRunner and emit them
Expand All @@ -149,6 +111,58 @@ func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interf
}
}

// callStatsWithRetry invokes handle driver Stats() functions and retries until channel is established
// successfully. Returns an error if it encounters a permanent error.
//
// It logs the errors with appropriate log levels; don't log returned error
func (h *statsHook) callStatsWithRetry(ctx context.Context, handle interfaces.DriverStats) (<-chan *cstructs.TaskResourceUsage, error) {
var retry int

MAIN:
if ctx.Err() != nil {
return nil, ctx.Err()
}

ch, err := handle.Stats(ctx, h.interval)
if err == nil {
return ch, nil
}

// Check if the driver doesn't implement stats
if err.Error() == cstructs.DriverStatsNotImplemented.Error() {
h.logger.Debug("driver does not support stats")
return nil, err
}

// check if the error is terminal otherwise it's likely a
// transport error and we should retry
if re, ok := err.(*structs.RecoverableError); ok && re.IsUnrecoverable() {
return nil, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error gets swallowed. Is it useful enough to log it even if debug? When callStatsWithRetry is called and returns with an error it just returns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we should! I logged them as an error just like the other case where we retry.

}

// We do not warn when the plugin is shutdown since this is
// likely because the driver plugin has unexpectedly exited,
// in which case sleeping and trying again or returning based
// on the stop channel is the correct behavior
if err == bstructs.ErrPluginShutdown {
h.logger.Debug("failed to fetching stats of task", "error", err)
} else {
h.logger.Error("failed to start stats collection for task", "error", err)
}

limit := time.Second * 5
backoff := 1 << (2 * uint64(retry)) * time.Second
if backoff > limit || retry > 5 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a retry check mostly to avoid dealing with integer overflow. If retry count is large enough, we get some wrapping effect, and backoff becomes a negative value! You can see the effects in https://play.golang.org/p/JYvHeAsSTUk .

The backoff computation computes the following backoff value for retry values.

0 	 1s
1 	 4s
2 	 16s
3 	 1m4s
4 	 4m16s
5 	 17m4s
6 	 1h8m16s
7 	 4h33m4s
8 	 18h12m16s
9 	 72h49m4s
10 	 291h16m16s
11 	 1165h5m4s
12 	 4660h20m16s
13 	 18641h21m4s
14 	 74565h24m16s
15 	 298261h37m4s
16 	 1193046h28m16s
17 	 -351909h41m29.709551616s
18 	 -1407638h45m58.838206464s
19 	 -506459h29m21.64327424s

Here, I avoided adding a retry cap to minimize changes compared to old code.

backoff = limit
}

// Increment retry counter
retry++

time.Sleep(backoff)
goto MAIN
}

func (h *statsHook) Shutdown() {
h.mu.Lock()
defer h.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion plugins/drivers/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (d *driverPluginClient) TaskStats(ctx context.Context, taskID string, inter
return nil, structs.NewRecoverableError(err, rec.Recoverable)
}
}
return nil, err
return nil, grpcutils.HandleGrpcErr(err, d.doneCtx)
}

ch := make(chan *cstructs.TaskResourceUsage, 1)
Expand Down