From 2ab9f4ecd325d5120869e1f398c6f15aad769d6c Mon Sep 17 00:00:00 2001 From: gabemontero Date: Fri, 8 Mar 2024 14:29:58 -0500 Subject: [PATCH] encapsulate all grpc calls from dynamic reconciler with cancelling conext with timeout switch from CloseSend to CloseRecv in streamLogs add analysis, possible goroutine dump, to the context with timeout in dynamic reconclier So we ran with the change from #712 in our production system, and while we confirmed the results logging memory leak was addresed, after about 12 to 13 hours, our reconciler threads systematically became deadlocked, and our watcher quit processing events. We as of yet have not been able to get a goroutine dump with stack traces when this problem occurs, so we are unclear whether the #712 fixes have had a direct cause to the deadlock, or if another issue was encountered. Among other things our api server container restarted during the watcher deadlock, where the previous pod logs gave no clear indicationas to why. This change pulls a couple of potentially helpful bits to help either diagnose or work around the deadlock: 1) we have added a configurable timeout to the context used in the dynamic Reconcile method, in case a blockage in any RPC call using a context somehow was causing the problem 2) we also employ the combination of cancelling the context on method exit, to again unblock things, as well as the switch to CloseAndRecv instead of CloseSend to confirm the UpdateLog finished, so that our canceling of the streamLog context does not intermittenly cancel an UpdateLog call that would have otherwise succeeded. 3) we are analyzing how a context is released, and if it is from a timeout and not cancel, we initiate a goroutine dump with stack traces 4) using of a context with timeout that is canceled on exit from the reconcile method require no longer running 'sendLogs' on a separate goroutine, otherwise we re-introduced intermitent cancelling of 'UpdateLog' processing before it could complete. 5) we now log the dealines for UpdateLog on the api server side 6) we are back to pulling errors off of the tkn client error channel --- cmd/watcher/main.go | 2 + pkg/api/server/v1alpha2/logs.go | 8 ++ pkg/watcher/reconciler/config.go | 3 + pkg/watcher/reconciler/dynamic/dynamic.go | 155 +++++++++++++++++++--- 4 files changed, 151 insertions(+), 17 deletions(-) diff --git a/cmd/watcher/main.go b/cmd/watcher/main.go index d950bd7c0..bbb686daa 100644 --- a/cmd/watcher/main.go +++ b/cmd/watcher/main.go @@ -67,6 +67,7 @@ var ( requeueInterval = flag.Duration("requeue_interval", 10*time.Minute, "How long the Watcher waits to reprocess keys on certain events (e.g. an object doesn't match the provided selectors)") namespace = flag.String("namespace", corev1.NamespaceAll, "Should the Watcher only watch a single namespace, then this value needs to be set to the namespace name otherwise leave it empty.") checkOwner = flag.Bool("check_owner", true, "If enabled, owner references will be checked while deleting objects") + updateLogTimeout = flag.Duration("update_log_timeout", 30*time.Second, "How log the Watcher waits for the UpdateLog operation for storing logs to complete before it aborts.") ) func main() { @@ -99,6 +100,7 @@ func main() { CompletedResourceGracePeriod: *completedRunGracePeriod, RequeueInterval: *requeueInterval, CheckOwner: *checkOwner, + UpdateLogTimeout: updateLogTimeout, } if selector := *labelSelector; selector != "" { diff --git a/pkg/api/server/v1alpha2/logs.go b/pkg/api/server/v1alpha2/logs.go index 3ede5d789..9c773d925 100644 --- a/pkg/api/server/v1alpha2/logs.go +++ b/pkg/api/server/v1alpha2/logs.go @@ -105,6 +105,14 @@ func (s *Server) UpdateLog(srv pb.Logs_UpdateLogServer) error { } }() for { + // the underlying grpc stream RecvMsg method blocks until this receives a message or it is done, + // with the client now setting a context deadline, if a timeout occurs, that should make this done/canceled; let's check to confirm + deadline, ok := srv.Context().Deadline() + if !ok { + s.logger.Warnf("UpdateLog called with no deadline: %#v", srv) + } else { + s.logger.Infof("UpdateLog called with deadline: %s for %#v", deadline.String(), srv) + } recv, err := srv.Recv() // If we reach the end of the srv, we receive an io.EOF error if err != nil { diff --git a/pkg/watcher/reconciler/config.go b/pkg/watcher/reconciler/config.go index a6f538f01..1c4303554 100644 --- a/pkg/watcher/reconciler/config.go +++ b/pkg/watcher/reconciler/config.go @@ -41,6 +41,9 @@ type Config struct { // Check owner reference when deleting objects. By default, objects having owner references set won't be deleted. CheckOwner bool + + // UpdateLogTimeout is the time we provide for storing logs before aborting + UpdateLogTimeout *time.Duration } // GetDisableAnnotationUpdate returns whether annotation updates should be diff --git a/pkg/watcher/reconciler/dynamic/dynamic.go b/pkg/watcher/reconciler/dynamic/dynamic.go index c80c07d58..e561750fb 100644 --- a/pkg/watcher/reconciler/dynamic/dynamic.go +++ b/pkg/watcher/reconciler/dynamic/dynamic.go @@ -18,6 +18,9 @@ import ( "bytes" "context" "fmt" + "os" + "runtime/pprof" + "strings" "time" "github.com/fatih/color" @@ -36,6 +39,8 @@ import ( "github.com/tektoncd/results/pkg/watcher/results" pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto" "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -89,14 +94,82 @@ func NewDynamicReconciler(rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient } } +func printGoroutines(logger *zap.SugaredLogger, o results.Object) { + // manual testing has confirmed you don't have to explicitly enable pprof to get goroutine dumps with + // stack traces; this lines up with the stack traces you receive if a panic occurs, as well as the + // stack trace you receive if you send a SIGQUIT and/or SIGABRT to a running go program + profile := pprof.Lookup("goroutine") + if profile == nil { + logger.Warnw("Leaving dynamic Reconciler only after context timeout, number of profiles found", + zap.String("namespace", o.GetNamespace()), + zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), + zap.String("name", o.GetName())) + } else { + err := profile.WriteTo(os.Stdout, 2) + if err != nil { + logger.Errorw("problem writing goroutine dump", + zap.String("error", err.Error()), + zap.String("namespace", o.GetNamespace()), + zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), + zap.String("name", o.GetName())) + } + } + +} + // Reconcile handles result/record uploading for the given Run object. // If enabled, the object may be deleted upon successful result upload. func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error { + var ctxCancel context.CancelFunc + // context with timeout does not work with the partial end to end flow that exists with unit tests; + // this field will alway be set for real + if r.cfg != nil && r.cfg.UpdateLogTimeout != nil { + ctx, ctxCancel = context.WithTimeout(ctx, *r.cfg.UpdateLogTimeout) + } + // we dont defer the dynamicCancle because golang defers follow a LIFO pattern + // and we want to have our context analysis defer function be able to distinguish between + // the context channel being closed because of Canceled or DeadlineExceeded logger := logging.FromContext(ctx) + defer func() { + if ctx == nil { + return + } + ctxErr := ctx.Err() + if ctxErr == nil { + logger.Warnw("Leaving dynamic Reconciler somehow but the context channel is not closed", + zap.String("namespace", o.GetNamespace()), + zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), + zap.String("name", o.GetName())) + return + } + if ctxErr == context.Canceled { + logger.Infow("Leaving dynamic Reconciler normally with context properly canceled", + zap.String("namespace", o.GetNamespace()), + zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), + zap.String("name", o.GetName())) + return + } + if ctxErr == context.DeadlineExceeded { + logger.Warnw("Leaving dynamic Reconciler only after context timeout, initiating thread dump", + zap.String("namespace", o.GetNamespace()), + zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), + zap.String("name", o.GetName())) + printGoroutines(logger, o) + return + } + logger.Warnw("Leaving dynamic Reconciler with unexpected error", + zap.String("error", ctxErr.Error()), + zap.String("namespace", o.GetNamespace()), + zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), + zap.String("name", o.GetName())) + }() if o.GetObjectKind().GroupVersionKind().Empty() { gvk, err := convert.InferGVK(o) if err != nil { + if ctxCancel != nil { + ctxCancel() + } return err } o.GetObjectKind().SetGroupVersionKind(gvk) @@ -110,18 +183,32 @@ func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error { if err != nil { logger.Debugw("Error upserting record to API server", zap.Error(err), timeTakenField) + // in case a call to cancel overwrites the error set in the context + if status.Code(err) == codes.DeadlineExceeded { + printGoroutines(logger, o) + } + if ctxCancel != nil { + ctxCancel() + } return fmt.Errorf("error upserting record: %w", err) } // Update logs if enabled. if r.resultsClient.LogsClient != nil { - if err := r.sendLog(ctx, o); err != nil { + if err = r.sendLog(ctx, o); err != nil { logger.Errorw("Error sending log", zap.String("namespace", o.GetNamespace()), zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), zap.String("name", o.GetName()), zap.Error(err), ) + // in case a call to cancel overwrites the error set in the context + if status.Code(err) == codes.DeadlineExceeded { + printGoroutines(logger, o) + } + if ctxCancel != nil { + ctxCancel() + } return err } } @@ -132,11 +219,25 @@ func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error { recordAnnotation := annotation.Annotation{Name: annotation.Record, Value: rec.GetName()} resultAnnotation := annotation.Annotation{Name: annotation.Result, Value: res.GetName()} - if err := r.addResultsAnnotations(logging.WithLogger(ctx, logger), o, recordAnnotation, resultAnnotation); err != nil { + if err = r.addResultsAnnotations(logging.WithLogger(ctx, logger), o, recordAnnotation, resultAnnotation); err != nil { + // no grpc calls from addResultsAnnotation + if ctxCancel != nil { + ctxCancel() + } return err } - return r.deleteUponCompletion(logging.WithLogger(ctx, logger), o) + if err = r.deleteUponCompletion(logging.WithLogger(ctx, logger), o); err != nil { + // no grpc calls from addResultsAnnotation + if ctxCancel != nil { + ctxCancel() + } + return err + } + if ctxCancel != nil { + ctxCancel() + } + return nil } // addResultsAnnotations adds Results annotations to the object in question if @@ -333,22 +434,21 @@ func (r *Reconciler) sendLog(ctx context.Context, o results.Object) error { zap.String("name", o.GetName()), ) - go func() { - err := r.streamLogs(ctx, o, logType, logName) - if err != nil { - logger.Errorw("Error streaming log", - zap.String("namespace", o.GetNamespace()), - zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), - zap.String("name", o.GetName()), - zap.Error(err), - ) - } - logger.Debugw("Streaming log completed", + err = r.streamLogs(ctx, o, logType, logName) + if err != nil { + logger.Errorw("Error streaming log", zap.String("namespace", o.GetNamespace()), zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), zap.String("name", o.GetName()), + zap.Error(err), ) - }() + } + logger.Debugw("Streaming log completed", + zap.String("namespace", o.GetNamespace()), + zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), + zap.String("name", o.GetName()), + ) + } return nil @@ -396,6 +496,13 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType, Err: inMemWriteBufferStderr, }, logChan, errChan) + // pull the first error that occurred and return on that; reminder - per https://golang.org/ref/spec#Channel_types + // channels act as FIFO queues + chanErr, ok := <-errChan + if ok && chanErr != nil { + return fmt.Errorf("error occurred while calling tkn client write: %w", chanErr) + } + bufStdout := inMemWriteBufferStdout.Bytes() cntStdout, writeStdOutErr := writer.Write(bufStdout) if writeStdOutErr != nil { @@ -436,10 +543,24 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType, logger.Error(flushErr) return flushErr } - if closeErr := logsClient.CloseSend(); closeErr != nil { - logger.Warnw("CloseSend ret err", + // so we use CloseAndRecv vs. just CloseSent to achieve a few things: + // 1) CloseAndRecv calls CloseSend under the covers, followed by a Recv call to obtain a LogSummary + // 2) LogSummary appears to have some stats on the state of operations + // 3) It also appears to be the best form of "confirmation" that the asynchronous operation of UpdateLog on the api + // server side has reached a terminal state + // 4) Hence, creating a child context which we cancel hopefully does not interrupt the UpdateLog call when this method exits, + // 5) However, we need the context cancel to close out the last goroutine launched in newClientStreamWithParams that does + // the final clean, otherwise we end up with our now familiar goroutine leak, which in the end is a memory leak + + // comparing closeErr with io.EOF does not work; and I could not find code / desc etc. constants in the grpc code that handled + // the wrapped EOF error we expect to get from grpc when things are "OK" + if logSummary, closeErr := logsClient.CloseAndRecv(); closeErr != nil && !strings.Contains(closeErr.Error(), "EOF") { + logger.Warnw("CloseAndRecv ret err", zap.String("name", o.GetName()), zap.String("error", closeErr.Error())) + if logSummary != nil { + logger.Errorw("CloseAndRecv", zap.String("logSummary", logSummary.String())) + } logger.Error(closeErr) return closeErr }