Skip to content

Commit

Permalink
add timeout to streamLogs; switch from CloseSend to CloseRecv in stre…
Browse files Browse the repository at this point in the history
…amLogs

So we ran with the change from #712 in our production system, and while we confirmed the results logging memory leak
was addresed, after about 12 to 13 hours, our reconciler threads systematically became deadlocked, and our watcher
quit processing events.

We as of yet have not been able to get a goroutine dump with stack traces when this problem occurs, so we are unclear
whether the #712 fixes have had a direct cause to the deadlock, or if another issue was encountered.  Among other things
our api server container restarted during the watcher deadlock, where the previous pod logs gave no clear indicationas to why.

This change pulls a couple of potentially helpful bits to help either diagnose or work around the deadlock:
1) we have added a timeout to the context used in streamLogs, in case a blockage in the UpdateLog call somehow was causing the problem
2) we also employ the combination of cancelling the context on method exit, to again unblock things, as well as the switch to
CloseAndRecv instead of CloseSend to confirm the UpdateLog finished, so that our canceling of the streamLog context does not
intermittenly cancel an UpdateLog call that would have otherwise succeeded.

rh-pre-commit.version: 2.2.0
rh-pre-commit.check-secrets: ENABLED
  • Loading branch information
gabemontero committed Mar 8, 2024
1 parent cf31c50 commit 4cc4cab
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions pkg/watcher/reconciler/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"strings"
"time"

"github.com/fatih/color"
Expand Down Expand Up @@ -355,8 +356,10 @@ func (r *Reconciler) sendLog(ctx context.Context, o results.Object) error {
}

func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType, logName string) error {
logger := logging.FromContext(ctx)
logsClient, err := r.resultsClient.UpdateLog(ctx)
streamCtx, streamCancel := context.WithTimeout(ctx, 5*time.Minute)
defer streamCancel()
logger := logging.FromContext(streamCtx)
logsClient, err := r.resultsClient.UpdateLog(streamCtx)
if err != nil {
return fmt.Errorf("failed to create UpdateLog client: %w", err)
}
Expand Down Expand Up @@ -436,10 +439,24 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType,
logger.Error(flushErr)
return flushErr
}
if closeErr := logsClient.CloseSend(); closeErr != nil {
logger.Warnw("CloseSend ret err",
// so we use CloseAndRecv vs. just CloseSent to achieve a few things:
// 1) CloseAndRecv calls CloseSend under the covers, followed by a Recv call to obtain a LogSummary
// 2) LogSummary appears to have some stats on the state of operations
// 3) It also appears to be the best form of "confirmation" that the asynchronous operation of UpdateLog on the api
// server side has reached a terminal state
// 4) Hence, creating a child context which we cancel hopefully does not interrupt the UpdateLog call when this method exits,
// 5) However, we need the context cancel to close out the last goroutine launched in newClientStreamWithParams that does
// the final clean, otherwise we end up with our now familiar goroutine leak, which in the end is a memory leak

// comparing closeErr with io.EOF does not work; and I could not find code / desc etc. constants in the grpc code that handled
// the wrapped EOF error we expect to get from grpc when things are "OK"
if logSummary, closeErr := logsClient.CloseAndRecv(); closeErr != nil && !strings.Contains(closeErr.Error(), "EOF") {
logger.Warnw("CloseAndRecv ret err",
zap.String("name", o.GetName()),
zap.String("error", closeErr.Error()))
if logSummary != nil {
logger.Errorw("CloseAndRecv", zap.String("logSummary", logSummary.String()))
}
logger.Error(closeErr)
return closeErr
}
Expand Down

0 comments on commit 4cc4cab

Please sign in to comment.