From ad182976b6bcdccd32a31f89a765657b0d1b76da Mon Sep 17 00:00:00 2001 From: Matej Vasek Date: Tue, 1 Jun 2021 15:05:13 +0200 Subject: [PATCH] Use request context instead of background This prevents goroutine leak: If background context were used then push operation would continue even if client aborted request by closing connection. [NO TESTS NEEDED] Signed-off-by: Matej Vasek --- pkg/api/handlers/compat/images_push.go | 112 ++++++++++++------------- 1 file changed, 52 insertions(+), 60 deletions(-) diff --git a/pkg/api/handlers/compat/images_push.go b/pkg/api/handlers/compat/images_push.go index db02af4459..62f8cdc772 100644 --- a/pkg/api/handlers/compat/images_push.go +++ b/pkg/api/handlers/compat/images_push.go @@ -1,7 +1,6 @@ package compat import ( - "context" "encoding/json" "fmt" "io/ioutil" @@ -12,7 +11,6 @@ import ( "github.com/containers/podman/v3/libpod" "github.com/containers/podman/v3/pkg/api/handlers/utils" "github.com/containers/podman/v3/pkg/auth" - "github.com/containers/podman/v3/pkg/channel" "github.com/containers/podman/v3/pkg/domain/entities" "github.com/containers/podman/v3/pkg/domain/infra/abi" "github.com/containers/storage" @@ -101,46 +99,33 @@ func PushImage(w http.ResponseWriter, r *http.Request) { destination = imageName } - errorWriter := channel.NewWriter(make(chan []byte)) - defer errorWriter.Close() - - statusWriter := channel.NewWriter(make(chan []byte)) - defer statusWriter.Close() - - runCtx, cancel := context.WithCancel(context.Background()) - var failed bool - - go func() { - defer cancel() - - statusWriter.Write([]byte(fmt.Sprintf("The push refers to repository [%s]", imageName))) - - err := imageEngine.Push(runCtx, imageName, destination, options) - if err != nil { - if errors.Cause(err) != storage.ErrImageUnknown { - errorWriter.Write([]byte("An image does not exist locally with the tag: " + imageName)) - } else { - errorWriter.Write([]byte(err.Error())) - } - } - }() - - flush := func() { - if flusher, ok := w.(http.Flusher); ok { - flusher.Flush() - } + flush := func() {} + if flusher, ok := w.(http.Flusher); ok { + flush = flusher.Flush } w.WriteHeader(http.StatusOK) w.Header().Add("Content-Type", "application/json") flush() + var report jsonmessage.JSONMessage enc := json.NewEncoder(w) enc.SetEscapeHTML(true) + report.Status = fmt.Sprintf("The push refers to repository [%s]", imageName) + if err := enc.Encode(report); err != nil { + logrus.Warnf("Failed to json encode error %q", err.Error()) + } + flush() + + pushErrChan := make(chan error) + go func() { + pushErrChan <- imageEngine.Push(r.Context(), imageName, destination, options) + }() + loop: // break out of for/select infinite loop for { - var report jsonmessage.JSONMessage + report = jsonmessage.JSONMessage{} select { case e := <-options.Progress: @@ -160,43 +145,50 @@ loop: // break out of for/select infinite loop } report.ID = e.Artifact.Digest.Encoded()[0:12] if err := enc.Encode(report); err != nil { - errorWriter.Write([]byte(err.Error())) + logrus.Warnf("Failed to json encode error %q", err.Error()) } flush() - case e := <-statusWriter.Chan(): - report.Status = string(e) - if err := enc.Encode(report); err != nil { - errorWriter.Write([]byte(err.Error())) + case err := <-pushErrChan: + if err != nil { + var msg string + if errors.Cause(err) != storage.ErrImageUnknown { + msg = "An image does not exist locally with the tag: " + imageName + } else { + msg = err.Error() + } + report.Error = &jsonmessage.JSONError{ + Message: msg, + } + report.ErrorMessage = msg + if err := enc.Encode(report); err != nil { + logrus.Warnf("Failed to json encode error %q", err.Error()) + } + flush() + break loop } - flush() - case e := <-errorWriter.Chan(): - failed = true - report.Error = &jsonmessage.JSONError{ - Message: string(e), + + digestBytes, err := ioutil.ReadAll(digestFile) + if err != nil { + report.Error = &jsonmessage.JSONError{ + Message: err.Error(), + } + report.ErrorMessage = err.Error() + if err := enc.Encode(report); err != nil { + logrus.Warnf("Failed to json encode error %q", err.Error()) + } + flush() + break loop } - report.ErrorMessage = string(e) + tag := query.Tag + if tag == "" { + tag = "latest" + } + report.Status = fmt.Sprintf("%s: digest: %s", tag, string(digestBytes)) if err := enc.Encode(report); err != nil { logrus.Warnf("Failed to json encode error %q", err.Error()) } + flush() - case <-runCtx.Done(): - if !failed { - digestBytes, err := ioutil.ReadAll(digestFile) - if err == nil { - tag := query.Tag - if tag == "" { - tag = "latest" - } - report.Status = fmt.Sprintf("%s: digest: %s", tag, string(digestBytes)) - if err := enc.Encode(report); err != nil { - logrus.Warnf("Failed to json encode error %q", err.Error()) - } - flush() - } - } - break loop // break out of for/select infinite loop - case <-r.Context().Done(): - // Client has closed connection break loop // break out of for/select infinite loop } }