Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add timeout to dynamic Reconcile; switch from CloseSend to CloseRecv in streamLogs; add goroutine dump on timeouts/deadlocks #725

Merged
merged 1 commit into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/watcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var (
requeueInterval = flag.Duration("requeue_interval", 10*time.Minute, "How long the Watcher waits to reprocess keys on certain events (e.g. an object doesn't match the provided selectors)")
namespace = flag.String("namespace", corev1.NamespaceAll, "Should the Watcher only watch a single namespace, then this value needs to be set to the namespace name otherwise leave it empty.")
checkOwner = flag.Bool("check_owner", true, "If enabled, owner references will be checked while deleting objects")
updateLogTimeout = flag.Duration("update_log_timeout", 30*time.Second, "How log the Watcher waits for the UpdateLog operation for storing logs to complete before it aborts.")
)

func main() {
Expand Down Expand Up @@ -99,6 +100,7 @@ func main() {
CompletedResourceGracePeriod: *completedRunGracePeriod,
RequeueInterval: *requeueInterval,
CheckOwner: *checkOwner,
UpdateLogTimeout: updateLogTimeout,
}

if selector := *labelSelector; selector != "" {
Expand Down
8 changes: 8 additions & 0 deletions pkg/api/server/v1alpha2/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ func (s *Server) UpdateLog(srv pb.Logs_UpdateLogServer) error {
}
}()
for {
// the underlying grpc stream RecvMsg method blocks until this receives a message or it is done,
// with the client now setting a context deadline, if a timeout occurs, that should make this done/canceled; let's check to confirm
deadline, ok := srv.Context().Deadline()
if !ok {
s.logger.Warnf("UpdateLog called with no deadline: %#v", srv)
} else {
s.logger.Infof("UpdateLog called with deadline: %s for %#v", deadline.String(), srv)
}
recv, err := srv.Recv()
// If we reach the end of the srv, we receive an io.EOF error
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/watcher/reconciler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type Config struct {

// Check owner reference when deleting objects. By default, objects having owner references set won't be deleted.
CheckOwner bool

// UpdateLogTimeout is the time we provide for storing logs before aborting
UpdateLogTimeout *time.Duration
}

// GetDisableAnnotationUpdate returns whether annotation updates should be
Expand Down
155 changes: 138 additions & 17 deletions pkg/watcher/reconciler/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"bytes"
"context"
"fmt"
"os"
"runtime/pprof"
"strings"
"time"

"github.com/fatih/color"
Expand All @@ -36,6 +39,8 @@ import (
"github.com/tektoncd/results/pkg/watcher/results"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -89,14 +94,82 @@ func NewDynamicReconciler(rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient
}
}

func printGoroutines(logger *zap.SugaredLogger, o results.Object) {
// manual testing has confirmed you don't have to explicitly enable pprof to get goroutine dumps with
// stack traces; this lines up with the stack traces you receive if a panic occurs, as well as the
// stack trace you receive if you send a SIGQUIT and/or SIGABRT to a running go program
profile := pprof.Lookup("goroutine")
if profile == nil {
logger.Warnw("Leaving dynamic Reconciler only after context timeout, number of profiles found",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()))
} else {
err := profile.WriteTo(os.Stdout, 2)
if err != nil {
logger.Errorw("problem writing goroutine dump",
zap.String("error", err.Error()),
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()))
}
}

}

// Reconcile handles result/record uploading for the given Run object.
// If enabled, the object may be deleted upon successful result upload.
func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error {
var ctxCancel context.CancelFunc
// context with timeout does not work with the partial end to end flow that exists with unit tests;
// this field will alway be set for real
if r.cfg != nil && r.cfg.UpdateLogTimeout != nil {
ctx, ctxCancel = context.WithTimeout(ctx, *r.cfg.UpdateLogTimeout)
}
// we dont defer the dynamicCancle because golang defers follow a LIFO pattern
// and we want to have our context analysis defer function be able to distinguish between
// the context channel being closed because of Canceled or DeadlineExceeded
logger := logging.FromContext(ctx)
defer func() {
if ctx == nil {
return
}
ctxErr := ctx.Err()
if ctxErr == nil {
logger.Warnw("Leaving dynamic Reconciler somehow but the context channel is not closed",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()))
return
}
if ctxErr == context.Canceled {
logger.Infow("Leaving dynamic Reconciler normally with context properly canceled",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()))
return
}
if ctxErr == context.DeadlineExceeded {
logger.Warnw("Leaving dynamic Reconciler only after context timeout, initiating thread dump",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()))
printGoroutines(logger, o)
return
}
logger.Warnw("Leaving dynamic Reconciler with unexpected error",
zap.String("error", ctxErr.Error()),
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()))
}()

if o.GetObjectKind().GroupVersionKind().Empty() {
gvk, err := convert.InferGVK(o)
if err != nil {
if ctxCancel != nil {
ctxCancel()
}
return err
}
o.GetObjectKind().SetGroupVersionKind(gvk)
Expand All @@ -110,18 +183,32 @@ func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error {

if err != nil {
logger.Debugw("Error upserting record to API server", zap.Error(err), timeTakenField)
// in case a call to cancel overwrites the error set in the context
if status.Code(err) == codes.DeadlineExceeded {
printGoroutines(logger, o)
}
if ctxCancel != nil {
ctxCancel()
}
return fmt.Errorf("error upserting record: %w", err)
}

// Update logs if enabled.
if r.resultsClient.LogsClient != nil {
if err := r.sendLog(ctx, o); err != nil {
if err = r.sendLog(ctx, o); err != nil {
logger.Errorw("Error sending log",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.Error(err),
)
// in case a call to cancel overwrites the error set in the context
if status.Code(err) == codes.DeadlineExceeded {
printGoroutines(logger, o)
}
if ctxCancel != nil {
ctxCancel()
}
return err
}
}
Expand All @@ -132,11 +219,25 @@ func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error {

recordAnnotation := annotation.Annotation{Name: annotation.Record, Value: rec.GetName()}
resultAnnotation := annotation.Annotation{Name: annotation.Result, Value: res.GetName()}
if err := r.addResultsAnnotations(logging.WithLogger(ctx, logger), o, recordAnnotation, resultAnnotation); err != nil {
if err = r.addResultsAnnotations(logging.WithLogger(ctx, logger), o, recordAnnotation, resultAnnotation); err != nil {
// no grpc calls from addResultsAnnotation
if ctxCancel != nil {
ctxCancel()
}
return err
}

return r.deleteUponCompletion(logging.WithLogger(ctx, logger), o)
if err = r.deleteUponCompletion(logging.WithLogger(ctx, logger), o); err != nil {
// no grpc calls from addResultsAnnotation
if ctxCancel != nil {
ctxCancel()
}
return err
}
if ctxCancel != nil {
ctxCancel()
}
return nil
}

// addResultsAnnotations adds Results annotations to the object in question if
Expand Down Expand Up @@ -333,22 +434,21 @@ func (r *Reconciler) sendLog(ctx context.Context, o results.Object) error {
zap.String("name", o.GetName()),
)

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we pass context here and use select and ctx.done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the end result is the same; we block the reconcile thread until the operation is complete

no reason for the added complextiy

err := r.streamLogs(ctx, o, logType, logName)
if err != nil {
logger.Errorw("Error streaming log",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.Error(err),
)
}
logger.Debugw("Streaming log completed",
err = r.streamLogs(ctx, o, logType, logName)
if err != nil {
logger.Errorw("Error streaming log",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.Error(err),
)
}()
}
logger.Debugw("Streaming log completed",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
)

}

return nil
Expand Down Expand Up @@ -396,6 +496,13 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType,
Err: inMemWriteBufferStderr,
}, logChan, errChan)

// pull the first error that occurred and return on that; reminder - per https://golang.org/ref/spec#Channel_types
// channels act as FIFO queues
chanErr, ok := <-errChan
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since @khrm last review I've added here a simplified version of the error handling that @sayan-biswas used to do prior to pr #712

let's just return the first error seen; from what I see in the tkn code, multiple errors are not collected

with this though @sayan-biswas and the prior changes made with #712, plus moving the sendLogs call back on the reconciler thread, we now address one of the short comings of the old implementation, namely ignoring of errors and preventing retries on reconciliation.

Reminder: with the threadiness arg, users can adjust the number of threads when they enable log support.

I will add a separate PR to add the k8s client tuning shortly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#744 is the add tuning options PR

if ok && chanErr != nil {
return fmt.Errorf("error occurred while calling tkn client write: %w", chanErr)
}

bufStdout := inMemWriteBufferStdout.Bytes()
cntStdout, writeStdOutErr := writer.Write(bufStdout)
if writeStdOutErr != nil {
Expand Down Expand Up @@ -436,10 +543,24 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType,
logger.Error(flushErr)
return flushErr
}
if closeErr := logsClient.CloseSend(); closeErr != nil {
logger.Warnw("CloseSend ret err",
// so we use CloseAndRecv vs. just CloseSent to achieve a few things:
// 1) CloseAndRecv calls CloseSend under the covers, followed by a Recv call to obtain a LogSummary
// 2) LogSummary appears to have some stats on the state of operations
// 3) It also appears to be the best form of "confirmation" that the asynchronous operation of UpdateLog on the api
// server side has reached a terminal state
// 4) Hence, creating a child context which we cancel hopefully does not interrupt the UpdateLog call when this method exits,
// 5) However, we need the context cancel to close out the last goroutine launched in newClientStreamWithParams that does
// the final clean, otherwise we end up with our now familiar goroutine leak, which in the end is a memory leak

// comparing closeErr with io.EOF does not work; and I could not find code / desc etc. constants in the grpc code that handled
// the wrapped EOF error we expect to get from grpc when things are "OK"
if logSummary, closeErr := logsClient.CloseAndRecv(); closeErr != nil && !strings.Contains(closeErr.Error(), "EOF") {
logger.Warnw("CloseAndRecv ret err",
zap.String("name", o.GetName()),
zap.String("error", closeErr.Error()))
if logSummary != nil {
logger.Errorw("CloseAndRecv", zap.String("logSummary", logSummary.String()))
}
logger.Error(closeErr)
return closeErr
}
Expand Down