Skip to content

Commit

Permalink
encapsulate all grpc calls from dynamic reconciler with cancelling co…
Browse files Browse the repository at this point in the history
…next with timeout

switch from CloseSend to CloseRecv in streamLogs
add analysis, possible goroutine dump, to the context with timeout in dynamic reconclier

So we ran with the change from #712 in our production system, and while we confirmed the results logging memory leak
was addresed, after about 12 to 13 hours, our reconciler threads systematically became deadlocked, and our watcher
quit processing events.

We as of yet have not been able to get a goroutine dump with stack traces when this problem occurs, so we are unclear
whether the #712 fixes have had a direct cause to the deadlock, or if another issue was encountered.  Among other things
our api server container restarted during the watcher deadlock, where the previous pod logs gave no clear indicationas to why.

This change pulls a couple of potentially helpful bits to help either diagnose or work around the deadlock:
1) we have added a timeout to the context used in the dynamic Reconcile method, in case a blockage in any RPC call using a context somehow was causing the problem
2) we also employ the combination of cancelling the context on method exit, to again unblock things, as well as the switch to
CloseAndRecv instead of CloseSend to confirm the UpdateLog finished, so that our canceling of the streamLog context does not
intermittenly cancel an UpdateLog call that would have otherwise succeeded.
3) we are analyzing how a context is released, and if it is from a timeout and not cancel, we initiate a goroutine dump with stack traces
4) using of a context with timeout that is canceled on exit from the reconcile method require no longer running 'sendLogs' on a separate goroutine, otherwise we re-introduced intermitent cancelling of 'UpdateLog' processing before it could complete.
5) we now log the dealines for UpdateLog on the api server side

rh-pre-commit.version: 2.2.0
rh-pre-commit.check-secrets: ENABLED
  • Loading branch information
gabemontero committed Mar 29, 2024
1 parent c37ca39 commit db9eb3e
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 19 deletions.
8 changes: 8 additions & 0 deletions pkg/api/server/v1alpha2/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ func (s *Server) UpdateLog(srv pb.Logs_UpdateLogServer) error {
}
}()
for {
// the underlying grpc stream RecvMsg method blocks until this receives a message or it is done,
// with the client now setting a context deadline, if a timeout occurs, that should make this done/canceled; let's check to confirm
deadline, ok := srv.Context().Deadline()
if !ok {
s.logger.Warnf("UpdateLog called with no deadline: %#v", srv)
} else {
s.logger.Infof("UpdateLog called with deadline: %s for %#v", deadline.String(), srv)
}
recv, err := srv.Recv()
// If we reach the end of the srv, we receive an io.EOF error
if err != nil {
Expand Down
132 changes: 113 additions & 19 deletions pkg/watcher/reconciler/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"bytes"
"context"
"fmt"
"os"
"runtime/pprof"
"strings"
"time"

"github.com/fatih/color"
Expand All @@ -36,6 +39,8 @@ import (
"github.com/tektoncd/results/pkg/watcher/results"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -89,14 +94,72 @@ func NewDynamicReconciler(rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient
}
}

func printGoroutines(logger *zap.SugaredLogger, o results.Object) {
// manual testing has confirmed you don't have to explicitly enable pprof to get goroutine dumps with
// stack traces; this lines up with the stack traces you receive if a panic occurs, as well as the
// stack trace you receive if you send a SIGQUIT and/or SIGABRT to a running go program
profile := pprof.Lookup("goroutine")
if profile == nil {
logger.Warnw("Leaving dynamic Reconciler only after context timeout, number of profiles found",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()))
} else {
err := profile.WriteTo(os.Stdout, 2)
if err != nil {
logger.Errorw("problem writing goroutine dump",
zap.String("error", err.Error()),
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()))
}
}

}

// Reconcile handles result/record uploading for the given Run object.
// If enabled, the object may be deleted upon successful result upload.
func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error {
logger := logging.FromContext(ctx)
dynamicContext, dynamicCancel := context.WithTimeout(ctx, 5*time.Minute)
// we dont defer the dynamicCancle because golang defers follow a LIFO pattern
// and we want to have our context analysis defer function be able to distinguish between
// the context channel being closed because of Canceled or DeadlineExceeded
logger := logging.FromContext(dynamicContext)
defer func() {
ctxErr := dynamicContext.Err()
if ctxErr == nil {
logger.Warnw("Leaving dynamic Reconciler somehow but the context channel is not closed",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()))
return
}
if ctxErr == context.Canceled {
logger.Infow("Leaving dynamic Reconciler normally with context properly canceled",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()))
return
}
if ctxErr == context.DeadlineExceeded {
logger.Warnw("Leaving dynamic Reconciler only after context timeout, initiating thread dump",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()))
printGoroutines(logger, o)
return
}
logger.Warnw("Leaving dynamic Reconciler with unexpected error",
zap.String("error", ctxErr.Error()),
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()))
}()

if o.GetObjectKind().GroupVersionKind().Empty() {
gvk, err := convert.InferGVK(o)
if err != nil {
dynamicCancel()
return err
}
o.GetObjectKind().SetGroupVersionKind(gvk)
Expand All @@ -105,23 +168,33 @@ func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error {

// Upsert record.
startTime := time.Now()
res, rec, err := r.resultsClient.Put(ctx, o)
res, rec, err := r.resultsClient.Put(dynamicContext, o)
timeTakenField := zap.Int64("results.tekton.dev/time-taken-ms", time.Since(startTime).Milliseconds())

if err != nil {
logger.Debugw("Error upserting record to API server", zap.Error(err), timeTakenField)
// in case a call to cancel overwrites the error set in the context
if status.Code(err) == codes.DeadlineExceeded {
printGoroutines(logger, o)
}
dynamicCancel()
return fmt.Errorf("error upserting record: %w", err)
}

// Update logs if enabled.
if r.resultsClient.LogsClient != nil {
if err := r.sendLog(ctx, o); err != nil {
if err = r.sendLog(dynamicContext, o); err != nil {
logger.Errorw("Error sending log",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.Error(err),
)
// in case a call to cancel overwrites the error set in the context
if status.Code(err) == codes.DeadlineExceeded {
printGoroutines(logger, o)
}
dynamicCancel()
return err
}
}
Expand All @@ -132,11 +205,19 @@ func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error {

recordAnnotation := annotation.Annotation{Name: annotation.Record, Value: rec.GetName()}
resultAnnotation := annotation.Annotation{Name: annotation.Result, Value: res.GetName()}
if err := r.addResultsAnnotations(logging.WithLogger(ctx, logger), o, recordAnnotation, resultAnnotation); err != nil {
if err = r.addResultsAnnotations(logging.WithLogger(dynamicContext, logger), o, recordAnnotation, resultAnnotation); err != nil {
// no grpc calls from addResultsAnnotation
dynamicCancel()
return err
}

return r.deleteUponCompletion(logging.WithLogger(ctx, logger), o)
if err = r.deleteUponCompletion(logging.WithLogger(dynamicContext, logger), o); err != nil {
// no grpc calls from addResultsAnnotation
dynamicCancel()
return err
}
dynamicCancel()
return nil
}

// addResultsAnnotations adds Results annotations to the object in question if
Expand Down Expand Up @@ -333,22 +414,21 @@ func (r *Reconciler) sendLog(ctx context.Context, o results.Object) error {
zap.String("name", o.GetName()),
)

go func() {
err := r.streamLogs(ctx, o, logType, logName)
if err != nil {
logger.Errorw("Error streaming log",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.Error(err),
)
}
logger.Debugw("Streaming log completed",
err = r.streamLogs(ctx, o, logType, logName)
if err != nil {
logger.Errorw("Error streaming log",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.Error(err),
)
}()
}
logger.Debugw("Streaming log completed",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
)

}

return nil
Expand Down Expand Up @@ -436,10 +516,24 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType,
logger.Error(flushErr)
return flushErr
}
if closeErr := logsClient.CloseSend(); closeErr != nil {
logger.Warnw("CloseSend ret err",
// so we use CloseAndRecv vs. just CloseSent to achieve a few things:
// 1) CloseAndRecv calls CloseSend under the covers, followed by a Recv call to obtain a LogSummary
// 2) LogSummary appears to have some stats on the state of operations
// 3) It also appears to be the best form of "confirmation" that the asynchronous operation of UpdateLog on the api
// server side has reached a terminal state
// 4) Hence, creating a child context which we cancel hopefully does not interrupt the UpdateLog call when this method exits,
// 5) However, we need the context cancel to close out the last goroutine launched in newClientStreamWithParams that does
// the final clean, otherwise we end up with our now familiar goroutine leak, which in the end is a memory leak

// comparing closeErr with io.EOF does not work; and I could not find code / desc etc. constants in the grpc code that handled
// the wrapped EOF error we expect to get from grpc when things are "OK"
if logSummary, closeErr := logsClient.CloseAndRecv(); closeErr != nil && !strings.Contains(closeErr.Error(), "EOF") {
logger.Warnw("CloseAndRecv ret err",
zap.String("name", o.GetName()),
zap.String("error", closeErr.Error()))
if logSummary != nil {
logger.Errorw("CloseAndRecv", zap.String("logSummary", logSummary.String()))
}
logger.Error(closeErr)
return closeErr
}
Expand Down

0 comments on commit db9eb3e

Please sign in to comment.