Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use request context instead of background #10519

Merged
merged 1 commit into from
Jun 1, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 52 additions & 60 deletions pkg/api/handlers/compat/images_push.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package compat

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
Expand All @@ -12,7 +11,6 @@ import (
"github.com/containers/podman/v3/libpod"
"github.com/containers/podman/v3/pkg/api/handlers/utils"
"github.com/containers/podman/v3/pkg/auth"
"github.com/containers/podman/v3/pkg/channel"
"github.com/containers/podman/v3/pkg/domain/entities"
"github.com/containers/podman/v3/pkg/domain/infra/abi"
"github.com/containers/storage"
Expand Down Expand Up @@ -101,46 +99,33 @@ func PushImage(w http.ResponseWriter, r *http.Request) {
destination = imageName
}

errorWriter := channel.NewWriter(make(chan []byte))
defer errorWriter.Close()

statusWriter := channel.NewWriter(make(chan []byte))
defer statusWriter.Close()

runCtx, cancel := context.WithCancel(context.Background())
var failed bool

go func() {
defer cancel()

statusWriter.Write([]byte(fmt.Sprintf("The push refers to repository [%s]", imageName)))

err := imageEngine.Push(runCtx, imageName, destination, options)
if err != nil {
if errors.Cause(err) != storage.ErrImageUnknown {
errorWriter.Write([]byte("An image does not exist locally with the tag: " + imageName))
} else {
errorWriter.Write([]byte(err.Error()))
}
}
}()

flush := func() {
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
flush := func() {}
if flusher, ok := w.(http.Flusher); ok {
flush = flusher.Flush
}

w.WriteHeader(http.StatusOK)
w.Header().Add("Content-Type", "application/json")
flush()

var report jsonmessage.JSONMessage
enc := json.NewEncoder(w)
enc.SetEscapeHTML(true)

report.Status = fmt.Sprintf("The push refers to repository [%s]", imageName)
if err := enc.Encode(report); err != nil {
logrus.Warnf("Failed to json encode error %q", err.Error())
}
flush()

pushErrChan := make(chan error)
go func() {
pushErrChan <- imageEngine.Push(r.Context(), imageName, destination, options)
}()

loop: // break out of for/select infinite loop
for {
var report jsonmessage.JSONMessage
report = jsonmessage.JSONMessage{}

select {
case e := <-options.Progress:
Expand All @@ -160,43 +145,50 @@ loop: // break out of for/select infinite loop
}
report.ID = e.Artifact.Digest.Encoded()[0:12]
if err := enc.Encode(report); err != nil {
errorWriter.Write([]byte(err.Error()))
logrus.Warnf("Failed to json encode error %q", err.Error())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit, but if this needs other changes, can you swap this to an Errorf?

}
flush()
case e := <-statusWriter.Chan():
report.Status = string(e)
if err := enc.Encode(report); err != nil {
errorWriter.Write([]byte(err.Error()))
case err := <-pushErrChan:
if err != nil {
var msg string
if errors.Cause(err) != storage.ErrImageUnknown {
msg = "An image does not exist locally with the tag: " + imageName
} else {
msg = err.Error()
}
report.Error = &jsonmessage.JSONError{
Message: msg,
}
report.ErrorMessage = msg
if err := enc.Encode(report); err != nil {
logrus.Warnf("Failed to json encode error %q", err.Error())
}
flush()
break loop
}
flush()
case e := <-errorWriter.Chan():
failed = true
report.Error = &jsonmessage.JSONError{
Message: string(e),

digestBytes, err := ioutil.ReadAll(digestFile)
if err != nil {
report.Error = &jsonmessage.JSONError{
Message: err.Error(),
}
report.ErrorMessage = err.Error()
if err := enc.Encode(report); err != nil {
logrus.Warnf("Failed to json encode error %q", err.Error())
}
flush()
break loop
}
report.ErrorMessage = string(e)
tag := query.Tag
if tag == "" {
tag = "latest"
}
report.Status = fmt.Sprintf("%s: digest: %s", tag, string(digestBytes))
if err := enc.Encode(report); err != nil {
logrus.Warnf("Failed to json encode error %q", err.Error())
}

flush()
case <-runCtx.Done():
if !failed {
digestBytes, err := ioutil.ReadAll(digestFile)
if err == nil {
tag := query.Tag
if tag == "" {
tag = "latest"
}
report.Status = fmt.Sprintf("%s: digest: %s", tag, string(digestBytes))
if err := enc.Encode(report); err != nil {
logrus.Warnf("Failed to json encode error %q", err.Error())
}
flush()
}
}
break loop // break out of for/select infinite loop
case <-r.Context().Done():
// Client has closed connection
break loop // break out of for/select infinite loop
}
}
Expand Down