From 91adb70d6b239aa804bc3d4fcebcf5afbfd871a5 Mon Sep 17 00:00:00 2001 From: Alano Terblanche <18033717+Benehiko@users.noreply.github.com> Date: Mon, 2 Dec 2024 15:35:39 +0100 Subject: [PATCH] pkg/command: wrap `jsonmessage.DisplayJSONMessagesStream` with go context MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Allows for the `jsonmessage.DisplayJSONMessagesStream` function to correctly return when the context is cancelled with the appropriate reason (`ctx.Error()`) instead of just a nil error. Follow-up to https://github.com/docker/cli/commit/30a73ff19c407eee75de489355154ce1f35231a8 Signed-off-by: Alano Terblanche <18033717+Benehiko@users.noreply.github.com> Co-authored-by: Paweł Gronowski --- cli/command/container/create.go | 4 +- cli/command/container/run_test.go | 33 +++++------- cli/command/image/build.go | 9 ++-- cli/command/image/import.go | 4 +- cli/command/image/load.go | 4 +- cli/command/image/push.go | 12 ++--- cli/command/image/trust.go | 14 ++--- cli/command/plugin/install.go | 4 +- cli/command/plugin/push.go | 6 +-- cli/command/plugin/upgrade.go | 4 +- cli/command/service/helpers.go | 4 +- cli/command/swarm/ca.go | 4 +- cli/internal/jsonstream/display.go | 68 +++++++++++++++++++++++++ cli/internal/jsonstream/display_test.go | 67 ++++++++++++++++++++++++ 14 files changed, 184 insertions(+), 53 deletions(-) create mode 100644 cli/internal/jsonstream/display.go create mode 100644 cli/internal/jsonstream/display_test.go diff --git a/cli/command/container/create.go b/cli/command/container/create.go index 9f73b7f0afe3..be81c2173311 100644 --- a/cli/command/container/create.go +++ b/cli/command/container/create.go @@ -13,13 +13,13 @@ import ( "github.com/docker/cli/cli/command" "github.com/docker/cli/cli/command/completion" "github.com/docker/cli/cli/command/image" + "github.com/docker/cli/cli/internal/jsonstream" "github.com/docker/cli/cli/streams" "github.com/docker/cli/opts" "github.com/docker/docker/api/types/container" imagetypes "github.com/docker/docker/api/types/image" "github.com/docker/docker/api/types/versions" "github.com/docker/docker/errdefs" - "github.com/docker/docker/pkg/jsonmessage" specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -148,7 +148,7 @@ func pullImage(ctx context.Context, dockerCli command.Cli, img string, options * if options.quiet { out = streams.NewOut(io.Discard) } - return jsonmessage.DisplayJSONMessagesToStream(responseBody, out, nil) + return jsonstream.Display(ctx, responseBody, out) } type cidFile struct { diff --git a/cli/command/container/run_test.go b/cli/command/container/run_test.go index 81b176d904c9..4c2ced4f564f 100644 --- a/cli/command/container/run_test.go +++ b/cli/command/container/run_test.go @@ -230,12 +230,7 @@ func TestRunPullTermination(t *testing.T) { createContainerFunc: func(config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, platform *specs.Platform, containerName string, ) (container.CreateResponse, error) { - select { - case <-ctx.Done(): - return container.CreateResponse{}, ctx.Err() - default: - } - return container.CreateResponse{}, fakeNotFound{} + return container.CreateResponse{}, errors.New("shouldn't try to create a container") }, containerAttachFunc: func(ctx context.Context, containerID string, options container.AttachOptions) (types.HijackedResponse, error) { return types.HijackedResponse{}, errors.New("shouldn't try to attach to a container") @@ -253,19 +248,19 @@ func TestRunPullTermination(t *testing.T) { assert.NilError(t, server.Close(), "failed to close imageCreateFunc server") return default: + assert.NilError(t, enc.Encode(jsonmessage.JSONMessage{ + Status: "Downloading", + ID: fmt.Sprintf("id-%d", i), + TimeNano: time.Now().UnixNano(), + Time: time.Now().Unix(), + Progress: &jsonmessage.JSONProgress{ + Current: int64(i), + Total: 100, + Start: 0, + }, + })) + time.Sleep(100 * time.Millisecond) } - assert.NilError(t, enc.Encode(jsonmessage.JSONMessage{ - Status: "Downloading", - ID: fmt.Sprintf("id-%d", i), - TimeNano: time.Now().UnixNano(), - Time: time.Now().Unix(), - Progress: &jsonmessage.JSONProgress{ - Current: int64(i), - Total: 100, - Start: 0, - }, - })) - time.Sleep(100 * time.Millisecond) } }() attachCh <- struct{}{} @@ -277,7 +272,7 @@ func TestRunPullTermination(t *testing.T) { cmd := NewRunCommand(fakeCLI) cmd.SetOut(io.Discard) cmd.SetErr(io.Discard) - cmd.SetArgs([]string{"foobar:latest"}) + cmd.SetArgs([]string{"--pull", "always", "foobar:latest"}) cmdErrC := make(chan error, 1) go func() { diff --git a/cli/command/image/build.go b/cli/command/image/build.go index 4a258646dcce..f8355fbe96af 100644 --- a/cli/command/image/build.go +++ b/cli/command/image/build.go @@ -20,6 +20,8 @@ import ( "github.com/docker/cli/cli/command" "github.com/docker/cli/cli/command/completion" "github.com/docker/cli/cli/command/image/build" + "github.com/docker/cli/cli/internal/jsonstream" + "github.com/docker/cli/cli/streams" "github.com/docker/cli/opts" "github.com/docker/docker/api" "github.com/docker/docker/api/types" @@ -28,7 +30,6 @@ import ( "github.com/docker/docker/builder/remotecontext/urlutil" "github.com/docker/docker/pkg/archive" "github.com/docker/docker/pkg/idtools" - "github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/pkg/progress" "github.com/docker/docker/pkg/streamformatter" "github.com/pkg/errors" @@ -352,7 +353,7 @@ func runBuild(ctx context.Context, dockerCli command.Cli, options buildOptions) defer response.Body.Close() imageID := "" - aux := func(msg jsonmessage.JSONMessage) { + aux := func(msg jsonstream.JSONMessage) { var result types.BuildResult if err := json.Unmarshal(*msg.Aux, &result); err != nil { fmt.Fprintf(dockerCli.Err(), "Failed to parse aux message: %s", err) @@ -361,9 +362,9 @@ func runBuild(ctx context.Context, dockerCli command.Cli, options buildOptions) } } - err = jsonmessage.DisplayJSONMessagesStream(response.Body, buildBuff, dockerCli.Out().FD(), dockerCli.Out().IsTerminal(), aux) + err = jsonstream.Display(ctx, response.Body, streams.NewOut(buildBuff), jsonstream.WithAuxCallback(aux)) if err != nil { - if jerr, ok := err.(*jsonmessage.JSONError); ok { + if jerr, ok := err.(*jsonstream.JSONError); ok { // If no error code is set, default to 1 if jerr.Code == 0 { jerr.Code = 1 diff --git a/cli/command/image/import.go b/cli/command/image/import.go index f6ad73d9baa4..eb05d9f609a7 100644 --- a/cli/command/image/import.go +++ b/cli/command/image/import.go @@ -8,9 +8,9 @@ import ( "github.com/docker/cli/cli" "github.com/docker/cli/cli/command" "github.com/docker/cli/cli/command/completion" + "github.com/docker/cli/cli/internal/jsonstream" dockeropts "github.com/docker/cli/opts" "github.com/docker/docker/api/types/image" - "github.com/docker/docker/pkg/jsonmessage" "github.com/spf13/cobra" ) @@ -90,5 +90,5 @@ func runImport(ctx context.Context, dockerCli command.Cli, options importOptions } defer responseBody.Close() - return jsonmessage.DisplayJSONMessagesToStream(responseBody, dockerCli.Out(), nil) + return jsonstream.Display(ctx, responseBody, dockerCli.Out()) } diff --git a/cli/command/image/load.go b/cli/command/image/load.go index 3dff2da082e6..849edeaa7004 100644 --- a/cli/command/image/load.go +++ b/cli/command/image/load.go @@ -8,8 +8,8 @@ import ( "github.com/docker/cli/cli" "github.com/docker/cli/cli/command" "github.com/docker/cli/cli/command/completion" + "github.com/docker/cli/cli/internal/jsonstream" "github.com/docker/docker/api/types/image" - "github.com/docker/docker/pkg/jsonmessage" "github.com/moby/sys/sequential" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -89,7 +89,7 @@ func runLoad(ctx context.Context, dockerCli command.Cli, opts loadOptions) error defer response.Body.Close() if response.Body != nil && response.JSON { - return jsonmessage.DisplayJSONMessagesToStream(response.Body, dockerCli.Out(), nil) + return jsonstream.Display(ctx, response.Body, dockerCli.Out()) } _, err = io.Copy(dockerCli.Out(), response.Body) diff --git a/cli/command/image/push.go b/cli/command/image/push.go index 9ad2abd1ff87..d3868352fb09 100644 --- a/cli/command/image/push.go +++ b/cli/command/image/push.go @@ -15,11 +15,11 @@ import ( "github.com/docker/cli/cli" "github.com/docker/cli/cli/command" "github.com/docker/cli/cli/command/completion" + "github.com/docker/cli/cli/internal/jsonstream" "github.com/docker/cli/cli/streams" "github.com/docker/docker/api/types/auxprogress" "github.com/docker/docker/api/types/image" registrytypes "github.com/docker/docker/api/types/registry" - "github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/registry" "github.com/morikuni/aec" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -140,23 +140,23 @@ To push the complete multi-platform image, remove the --platform flag. defer responseBody.Close() if !opts.untrusted { // TODO PushTrustedReference currently doesn't respect `--quiet` - return PushTrustedReference(dockerCli, repoInfo, ref, authConfig, responseBody) + return PushTrustedReference(ctx, dockerCli, repoInfo, ref, authConfig, responseBody) } if opts.quiet { - err = jsonmessage.DisplayJSONMessagesToStream(responseBody, streams.NewOut(io.Discard), handleAux(dockerCli)) + err = jsonstream.Display(ctx, responseBody, streams.NewOut(io.Discard), jsonstream.WithAuxCallback(handleAux())) if err == nil { fmt.Fprintln(dockerCli.Out(), ref.String()) } return err } - return jsonmessage.DisplayJSONMessagesToStream(responseBody, dockerCli.Out(), handleAux(dockerCli)) + return jsonstream.Display(ctx, responseBody, dockerCli.Out(), jsonstream.WithAuxCallback(handleAux())) } var notes []string -func handleAux(dockerCli command.Cli) func(jm jsonmessage.JSONMessage) { - return func(jm jsonmessage.JSONMessage) { +func handleAux() func(jm jsonstream.JSONMessage) { + return func(jm jsonstream.JSONMessage) { b := []byte(*jm.Aux) var stripped auxprogress.ManifestPushedInsteadOfIndex diff --git a/cli/command/image/trust.go b/cli/command/image/trust.go index 6bb21fa556a7..3e06f9a90a7a 100644 --- a/cli/command/image/trust.go +++ b/cli/command/image/trust.go @@ -10,12 +10,12 @@ import ( "github.com/distribution/reference" "github.com/docker/cli/cli/command" + "github.com/docker/cli/cli/internal/jsonstream" "github.com/docker/cli/cli/streams" "github.com/docker/cli/cli/trust" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/image" registrytypes "github.com/docker/docker/api/types/registry" - "github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/registry" "github.com/opencontainers/go-digest" "github.com/pkg/errors" @@ -39,20 +39,20 @@ func TrustedPush(ctx context.Context, cli command.Cli, repoInfo *registry.Reposi defer responseBody.Close() - return PushTrustedReference(cli, repoInfo, ref, authConfig, responseBody) + return PushTrustedReference(ctx, cli, repoInfo, ref, authConfig, responseBody) } // PushTrustedReference pushes a canonical reference to the trust server. // //nolint:gocyclo -func PushTrustedReference(ioStreams command.Streams, repoInfo *registry.RepositoryInfo, ref reference.Named, authConfig registrytypes.AuthConfig, in io.Reader) error { +func PushTrustedReference(ctx context.Context, ioStreams command.Streams, repoInfo *registry.RepositoryInfo, ref reference.Named, authConfig registrytypes.AuthConfig, in io.Reader) error { // If it is a trusted push we would like to find the target entry which match the // tag provided in the function and then do an AddTarget later. target := &client.Target{} // Count the times of calling for handleTarget, // if it is called more that once, that should be considered an error in a trusted push. cnt := 0 - handleTarget := func(msg jsonmessage.JSONMessage) { + handleTarget := func(msg jsonstream.JSONMessage) { cnt++ if cnt > 1 { // handleTarget should only be called once. This will be treated as an error. @@ -84,14 +84,14 @@ func PushTrustedReference(ioStreams command.Streams, repoInfo *registry.Reposito default: // We want trust signatures to always take an explicit tag, // otherwise it will act as an untrusted push. - if err := jsonmessage.DisplayJSONMessagesToStream(in, ioStreams.Out(), nil); err != nil { + if err := jsonstream.Display(ctx, in, ioStreams.Out()); err != nil { return err } fmt.Fprintln(ioStreams.Err(), "No tag specified, skipping trust metadata push") return nil } - if err := jsonmessage.DisplayJSONMessagesToStream(in, ioStreams.Out(), handleTarget); err != nil { + if err := jsonstream.Display(ctx, in, ioStreams.Out(), jsonstream.WithAuxCallback(handleTarget)); err != nil { return err } @@ -283,7 +283,7 @@ func imagePullPrivileged(ctx context.Context, cli command.Cli, imgRefAndAuth tru if opts.quiet { out = streams.NewOut(io.Discard) } - return jsonmessage.DisplayJSONMessagesToStream(responseBody, out, nil) + return jsonstream.Display(ctx, responseBody, out) } // TrustedReference returns the canonical trusted reference for an image reference diff --git a/cli/command/plugin/install.go b/cli/command/plugin/install.go index 6d9ea7da5fda..cb366fd406af 100644 --- a/cli/command/plugin/install.go +++ b/cli/command/plugin/install.go @@ -9,9 +9,9 @@ import ( "github.com/docker/cli/cli" "github.com/docker/cli/cli/command" "github.com/docker/cli/cli/command/image" + "github.com/docker/cli/cli/internal/jsonstream" "github.com/docker/docker/api/types" registrytypes "github.com/docker/docker/api/types/registry" - "github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/registry" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -129,7 +129,7 @@ func runInstall(ctx context.Context, dockerCli command.Cli, opts pluginOptions) return err } defer responseBody.Close() - if err := jsonmessage.DisplayJSONMessagesToStream(responseBody, dockerCli.Out(), nil); err != nil { + if err := jsonstream.Display(ctx, responseBody, dockerCli.Out()); err != nil { return err } fmt.Fprintf(dockerCli.Out(), "Installed plugin %s\n", opts.remote) // todo: return proper values from the API for this result diff --git a/cli/command/plugin/push.go b/cli/command/plugin/push.go index 7f8b5dcca4e5..c8758a9ef1f9 100644 --- a/cli/command/plugin/push.go +++ b/cli/command/plugin/push.go @@ -7,8 +7,8 @@ import ( "github.com/docker/cli/cli" "github.com/docker/cli/cli/command" "github.com/docker/cli/cli/command/image" + "github.com/docker/cli/cli/internal/jsonstream" registrytypes "github.com/docker/docker/api/types/registry" - "github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/registry" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -66,8 +66,8 @@ func runPush(ctx context.Context, dockerCli command.Cli, opts pushOptions) error defer responseBody.Close() if !opts.untrusted { - return image.PushTrustedReference(dockerCli, repoInfo, named, authConfig, responseBody) + return image.PushTrustedReference(ctx, dockerCli, repoInfo, named, authConfig, responseBody) } - return jsonmessage.DisplayJSONMessagesToStream(responseBody, dockerCli.Out(), nil) + return jsonstream.Display(ctx, responseBody, dockerCli.Out()) } diff --git a/cli/command/plugin/upgrade.go b/cli/command/plugin/upgrade.go index d609c8d01e31..69b5c51b5f14 100644 --- a/cli/command/plugin/upgrade.go +++ b/cli/command/plugin/upgrade.go @@ -8,8 +8,8 @@ import ( "github.com/distribution/reference" "github.com/docker/cli/cli" "github.com/docker/cli/cli/command" + "github.com/docker/cli/cli/internal/jsonstream" "github.com/docker/docker/errdefs" - "github.com/docker/docker/pkg/jsonmessage" "github.com/pkg/errors" "github.com/spf13/cobra" ) @@ -86,7 +86,7 @@ func runUpgrade(ctx context.Context, dockerCli command.Cli, opts pluginOptions) return err } defer responseBody.Close() - if err := jsonmessage.DisplayJSONMessagesToStream(responseBody, dockerCli.Out(), nil); err != nil { + if err := jsonstream.Display(ctx, responseBody, dockerCli.Out()); err != nil { return err } fmt.Fprintf(dockerCli.Out(), "Upgraded plugin %s to %s\n", opts.localName, opts.remote) // todo: return proper values from the API for this result diff --git a/cli/command/service/helpers.go b/cli/command/service/helpers.go index a8ff310a57f3..3e752a36e184 100644 --- a/cli/command/service/helpers.go +++ b/cli/command/service/helpers.go @@ -6,7 +6,7 @@ import ( "github.com/docker/cli/cli/command" "github.com/docker/cli/cli/command/service/progress" - "github.com/docker/docker/pkg/jsonmessage" + "github.com/docker/cli/cli/internal/jsonstream" ) // WaitOnService waits for the service to converge. It outputs a progress bar, @@ -24,7 +24,7 @@ func WaitOnService(ctx context.Context, dockerCli command.Cli, serviceID string, return <-errChan } - err := jsonmessage.DisplayJSONMessagesToStream(pipeReader, dockerCli.Out(), nil) + err := jsonstream.Display(ctx, pipeReader, dockerCli.Out()) if err == nil { err = <-errChan } diff --git a/cli/command/swarm/ca.go b/cli/command/swarm/ca.go index 25c235a4503c..59974ac644e8 100644 --- a/cli/command/swarm/ca.go +++ b/cli/command/swarm/ca.go @@ -10,8 +10,8 @@ import ( "github.com/docker/cli/cli/command" "github.com/docker/cli/cli/command/completion" "github.com/docker/cli/cli/command/swarm/progress" + "github.com/docker/cli/cli/internal/jsonstream" "github.com/docker/docker/api/types/swarm" - "github.com/docker/docker/pkg/jsonmessage" "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -120,7 +120,7 @@ func attach(ctx context.Context, dockerCli command.Cli, opts caOptions) error { return <-errChan } - err := jsonmessage.DisplayJSONMessagesToStream(pipeReader, dockerCli.Out(), nil) + err := jsonstream.Display(ctx, pipeReader, dockerCli.Out()) if err == nil { err = <-errChan } diff --git a/cli/internal/jsonstream/display.go b/cli/internal/jsonstream/display.go new file mode 100644 index 000000000000..f4b27c20e44a --- /dev/null +++ b/cli/internal/jsonstream/display.go @@ -0,0 +1,68 @@ +package jsonstream + +import ( + "context" + "io" + + "github.com/docker/docker/pkg/jsonmessage" +) + +type ( + Stream = jsonmessage.Stream + JSONMessage = jsonmessage.JSONMessage + JSONError = jsonmessage.JSONError + JSONProgress = jsonmessage.JSONProgress +) + +type ctxReader struct { + err chan error + r io.Reader +} + +func (r *ctxReader) Read(p []byte) (n int, err error) { + select { + case err = <-r.err: + return 0, err + default: + return r.r.Read(p) + } +} + +type Options func(*options) + +type options struct { + AuxCallback func(JSONMessage) +} + +func WithAuxCallback(cb func(JSONMessage)) Options { + return func(o *options) { + o.AuxCallback = cb + } +} + +// Display prints the JSON messages from the given reader to the given stream. +// +// It wraps the [jsonmessage.DisplayJSONMessagesStream] function to make it +// "context aware" and appropriately returns why the function was canceled. +// +// It returns an error if the context is canceled, but not if the input reader / stream is closed. +func Display(ctx context.Context, in io.Reader, stream Stream, opts ...Options) error { + if ctx.Err() != nil { + return ctx.Err() + } + + ctxReader := &ctxReader{err: make(chan error, 1), r: in} + stopFunc := context.AfterFunc(ctx, func() { ctxReader.err <- ctx.Err() }) + defer stopFunc() + + o := options{} + for _, opt := range opts { + opt(&o) + } + + if err := jsonmessage.DisplayJSONMessagesStream(ctxReader, stream, stream.FD(), stream.IsTerminal(), o.AuxCallback); err != nil { + return err + } + + return ctx.Err() +} diff --git a/cli/internal/jsonstream/display_test.go b/cli/internal/jsonstream/display_test.go new file mode 100644 index 000000000000..7b14db2a9fac --- /dev/null +++ b/cli/internal/jsonstream/display_test.go @@ -0,0 +1,67 @@ +package jsonstream + +import ( + "context" + "encoding/json" + "fmt" + "io" + "testing" + "time" + + "github.com/docker/cli/cli/streams" + "gotest.tools/v3/assert" +) + +func TestDisplay(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + client, server := io.Pipe() + t.Cleanup(func() { + assert.NilError(t, server.Close()) + }) + + go func() { + enc := json.NewEncoder(server) + for i := 0; i < 100; i++ { + select { + case <-ctx.Done(): + assert.NilError(t, server.Close(), "failed to close jsonmessage server") + return + default: + err := enc.Encode(JSONMessage{ + Status: "Downloading", + ID: fmt.Sprintf("id-%d", i), + TimeNano: time.Now().UnixNano(), + Time: time.Now().Unix(), + Progress: &JSONProgress{ + Current: int64(i), + Total: 100, + Start: 0, + }, + }) + if err != nil { + break + } + time.Sleep(100 * time.Millisecond) + } + } + }() + + streamCtx, cancelStream := context.WithCancel(context.Background()) + t.Cleanup(cancelStream) + + done := make(chan error) + go func() { + out := streams.NewOut(io.Discard) + done <- Display(streamCtx, client, out) + }() + + cancelStream() + + select { + case <-time.After(time.Second * 3): + case err := <-done: + assert.ErrorIs(t, err, context.Canceled) + } +}