diff --git a/cmd/watcher/main.go b/cmd/watcher/main.go index d950bd7c0..bbb686daa 100644 --- a/cmd/watcher/main.go +++ b/cmd/watcher/main.go @@ -67,6 +67,7 @@ var ( requeueInterval = flag.Duration("requeue_interval", 10*time.Minute, "How long the Watcher waits to reprocess keys on certain events (e.g. an object doesn't match the provided selectors)") namespace = flag.String("namespace", corev1.NamespaceAll, "Should the Watcher only watch a single namespace, then this value needs to be set to the namespace name otherwise leave it empty.") checkOwner = flag.Bool("check_owner", true, "If enabled, owner references will be checked while deleting objects") + updateLogTimeout = flag.Duration("update_log_timeout", 30*time.Second, "How log the Watcher waits for the UpdateLog operation for storing logs to complete before it aborts.") ) func main() { @@ -99,6 +100,7 @@ func main() { CompletedResourceGracePeriod: *completedRunGracePeriod, RequeueInterval: *requeueInterval, CheckOwner: *checkOwner, + UpdateLogTimeout: updateLogTimeout, } if selector := *labelSelector; selector != "" { diff --git a/pkg/api/server/v1alpha2/logs.go b/pkg/api/server/v1alpha2/logs.go index 3ede5d789..9c773d925 100644 --- a/pkg/api/server/v1alpha2/logs.go +++ b/pkg/api/server/v1alpha2/logs.go @@ -105,6 +105,14 @@ func (s *Server) UpdateLog(srv pb.Logs_UpdateLogServer) error { } }() for { + // the underlying grpc stream RecvMsg method blocks until this receives a message or it is done, + // with the client now setting a context deadline, if a timeout occurs, that should make this done/canceled; let's check to confirm + deadline, ok := srv.Context().Deadline() + if !ok { + s.logger.Warnf("UpdateLog called with no deadline: %#v", srv) + } else { + s.logger.Infof("UpdateLog called with deadline: %s for %#v", deadline.String(), srv) + } recv, err := srv.Recv() // If we reach the end of the srv, we receive an io.EOF error if err != nil { diff --git a/pkg/watcher/reconciler/config.go b/pkg/watcher/reconciler/config.go index a6f538f01..1c4303554 100644 --- a/pkg/watcher/reconciler/config.go +++ b/pkg/watcher/reconciler/config.go @@ -41,6 +41,9 @@ type Config struct { // Check owner reference when deleting objects. By default, objects having owner references set won't be deleted. CheckOwner bool + + // UpdateLogTimeout is the time we provide for storing logs before aborting + UpdateLogTimeout *time.Duration } // GetDisableAnnotationUpdate returns whether annotation updates should be diff --git a/pkg/watcher/reconciler/dynamic/dynamic.go b/pkg/watcher/reconciler/dynamic/dynamic.go index c80c07d58..e561750fb 100644 --- a/pkg/watcher/reconciler/dynamic/dynamic.go +++ b/pkg/watcher/reconciler/dynamic/dynamic.go @@ -18,6 +18,9 @@ import ( "bytes" "context" "fmt" + "os" + "runtime/pprof" + "strings" "time" "github.com/fatih/color" @@ -36,6 +39,8 @@ import ( "github.com/tektoncd/results/pkg/watcher/results" pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto" "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -89,14 +94,82 @@ func NewDynamicReconciler(rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient } } +func printGoroutines(logger *zap.SugaredLogger, o results.Object) { + // manual testing has confirmed you don't have to explicitly enable pprof to get goroutine dumps with + // stack traces; this lines up with the stack traces you receive if a panic occurs, as well as the + // stack trace you receive if you send a SIGQUIT and/or SIGABRT to a running go program + profile := pprof.Lookup("goroutine") + if profile == nil { + logger.Warnw("Leaving dynamic Reconciler only after context timeout, number of profiles found", + zap.String("namespace", o.GetNamespace()), + zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), + zap.String("name", o.GetName())) + } else { + err := profile.WriteTo(os.Stdout, 2) + if err != nil { + logger.Errorw("problem writing goroutine dump", + zap.String("error", err.Error()), + zap.String("namespace", o.GetNamespace()), + zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), + zap.String("name", o.GetName())) + } + } + +} + // Reconcile handles result/record uploading for the given Run object. // If enabled, the object may be deleted upon successful result upload. func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error { + var ctxCancel context.CancelFunc + // context with timeout does not work with the partial end to end flow that exists with unit tests; + // this field will alway be set for real + if r.cfg != nil && r.cfg.UpdateLogTimeout != nil { + ctx, ctxCancel = context.WithTimeout(ctx, *r.cfg.UpdateLogTimeout) + } + // we dont defer the dynamicCancle because golang defers follow a LIFO pattern + // and we want to have our context analysis defer function be able to distinguish between + // the context channel being closed because of Canceled or DeadlineExceeded logger := logging.FromContext(ctx) + defer func() { + if ctx == nil { + return + } + ctxErr := ctx.Err() + if ctxErr == nil { + logger.Warnw("Leaving dynamic Reconciler somehow but the context channel is not closed", + zap.String("namespace", o.GetNamespace()), + zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), + zap.String("name", o.GetName())) + return + } + if ctxErr == context.Canceled { + logger.Infow("Leaving dynamic Reconciler normally with context properly canceled", + zap.String("namespace", o.GetNamespace()), + zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), + zap.String("name", o.GetName())) + return + } + if ctxErr == context.DeadlineExceeded { + logger.Warnw("Leaving dynamic Reconciler only after context timeout, initiating thread dump", + zap.String("namespace", o.GetNamespace()), + zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), + zap.String("name", o.GetName())) + printGoroutines(logger, o) + return + } + logger.Warnw("Leaving dynamic Reconciler with unexpected error", + zap.String("error", ctxErr.Error()), + zap.String("namespace", o.GetNamespace()), + zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), + zap.String("name", o.GetName())) + }() if o.GetObjectKind().GroupVersionKind().Empty() { gvk, err := convert.InferGVK(o) if err != nil { + if ctxCancel != nil { + ctxCancel() + } return err } o.GetObjectKind().SetGroupVersionKind(gvk) @@ -110,18 +183,32 @@ func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error { if err != nil { logger.Debugw("Error upserting record to API server", zap.Error(err), timeTakenField) + // in case a call to cancel overwrites the error set in the context + if status.Code(err) == codes.DeadlineExceeded { + printGoroutines(logger, o) + } + if ctxCancel != nil { + ctxCancel() + } return fmt.Errorf("error upserting record: %w", err) } // Update logs if enabled. if r.resultsClient.LogsClient != nil { - if err := r.sendLog(ctx, o); err != nil { + if err = r.sendLog(ctx, o); err != nil { logger.Errorw("Error sending log", zap.String("namespace", o.GetNamespace()), zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), zap.String("name", o.GetName()), zap.Error(err), ) + // in case a call to cancel overwrites the error set in the context + if status.Code(err) == codes.DeadlineExceeded { + printGoroutines(logger, o) + } + if ctxCancel != nil { + ctxCancel() + } return err } } @@ -132,11 +219,25 @@ func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error { recordAnnotation := annotation.Annotation{Name: annotation.Record, Value: rec.GetName()} resultAnnotation := annotation.Annotation{Name: annotation.Result, Value: res.GetName()} - if err := r.addResultsAnnotations(logging.WithLogger(ctx, logger), o, recordAnnotation, resultAnnotation); err != nil { + if err = r.addResultsAnnotations(logging.WithLogger(ctx, logger), o, recordAnnotation, resultAnnotation); err != nil { + // no grpc calls from addResultsAnnotation + if ctxCancel != nil { + ctxCancel() + } return err } - return r.deleteUponCompletion(logging.WithLogger(ctx, logger), o) + if err = r.deleteUponCompletion(logging.WithLogger(ctx, logger), o); err != nil { + // no grpc calls from addResultsAnnotation + if ctxCancel != nil { + ctxCancel() + } + return err + } + if ctxCancel != nil { + ctxCancel() + } + return nil } // addResultsAnnotations adds Results annotations to the object in question if @@ -333,22 +434,21 @@ func (r *Reconciler) sendLog(ctx context.Context, o results.Object) error { zap.String("name", o.GetName()), ) - go func() { - err := r.streamLogs(ctx, o, logType, logName) - if err != nil { - logger.Errorw("Error streaming log", - zap.String("namespace", o.GetNamespace()), - zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), - zap.String("name", o.GetName()), - zap.Error(err), - ) - } - logger.Debugw("Streaming log completed", + err = r.streamLogs(ctx, o, logType, logName) + if err != nil { + logger.Errorw("Error streaming log", zap.String("namespace", o.GetNamespace()), zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), zap.String("name", o.GetName()), + zap.Error(err), ) - }() + } + logger.Debugw("Streaming log completed", + zap.String("namespace", o.GetNamespace()), + zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), + zap.String("name", o.GetName()), + ) + } return nil @@ -396,6 +496,13 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType, Err: inMemWriteBufferStderr, }, logChan, errChan) + // pull the first error that occurred and return on that; reminder - per https://golang.org/ref/spec#Channel_types + // channels act as FIFO queues + chanErr, ok := <-errChan + if ok && chanErr != nil { + return fmt.Errorf("error occurred while calling tkn client write: %w", chanErr) + } + bufStdout := inMemWriteBufferStdout.Bytes() cntStdout, writeStdOutErr := writer.Write(bufStdout) if writeStdOutErr != nil { @@ -436,10 +543,24 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType, logger.Error(flushErr) return flushErr } - if closeErr := logsClient.CloseSend(); closeErr != nil { - logger.Warnw("CloseSend ret err", + // so we use CloseAndRecv vs. just CloseSent to achieve a few things: + // 1) CloseAndRecv calls CloseSend under the covers, followed by a Recv call to obtain a LogSummary + // 2) LogSummary appears to have some stats on the state of operations + // 3) It also appears to be the best form of "confirmation" that the asynchronous operation of UpdateLog on the api + // server side has reached a terminal state + // 4) Hence, creating a child context which we cancel hopefully does not interrupt the UpdateLog call when this method exits, + // 5) However, we need the context cancel to close out the last goroutine launched in newClientStreamWithParams that does + // the final clean, otherwise we end up with our now familiar goroutine leak, which in the end is a memory leak + + // comparing closeErr with io.EOF does not work; and I could not find code / desc etc. constants in the grpc code that handled + // the wrapped EOF error we expect to get from grpc when things are "OK" + if logSummary, closeErr := logsClient.CloseAndRecv(); closeErr != nil && !strings.Contains(closeErr.Error(), "EOF") { + logger.Warnw("CloseAndRecv ret err", zap.String("name", o.GetName()), zap.String("error", closeErr.Error())) + if logSummary != nil { + logger.Errorw("CloseAndRecv", zap.String("logSummary", logSummary.String())) + } logger.Error(closeErr) return closeErr }