From 3d6e3f4a45a7e101e7741ed00330926bcd27e484 Mon Sep 17 00:00:00 2001 From: Terry Howe Date: Mon, 22 Jul 2024 18:28:39 -0600 Subject: [PATCH 1/2] refactor: Isolate progress channel in Messenger (#1447) Signed-off-by: Terry Howe --- .../display/status/progress/manager.go | 20 ++-- .../display/status/progress/messenger.go | 95 +++++++++++++++++++ .../display/status/progress/messenger_test.go | 94 ++++++++++++++++++ .../display/status/progress/status.go | 39 +------- .../display/status/progress/status_test.go | 8 +- .../internal/display/status/track/reader.go | 28 +++--- 6 files changed, 214 insertions(+), 70 deletions(-) create mode 100644 cmd/oras/internal/display/status/progress/messenger.go create mode 100644 cmd/oras/internal/display/status/progress/messenger_test.go diff --git a/cmd/oras/internal/display/status/progress/manager.go b/cmd/oras/internal/display/status/progress/manager.go index a6723e23b..2b526e47e 100644 --- a/cmd/oras/internal/display/status/progress/manager.go +++ b/cmd/oras/internal/display/status/progress/manager.go @@ -34,12 +34,9 @@ const ( var errManagerStopped = errors.New("progress output manager has already been stopped") -// Status is print message channel -type Status chan *status - // Manager is progress view master type Manager interface { - Add() (Status, error) + Add() (*Messenger, error) SendAndStop(desc ocispec.Descriptor, prompt string) error Close() error } @@ -107,7 +104,7 @@ func (m *manager) render() { } // Add appends a new status with 2-line space for rendering. -func (m *manager) Add() (Status, error) { +func (m *manager) Add() (*Messenger, error) { if m.closed() { return nil, errManagerStopped } @@ -124,26 +121,25 @@ func (m *manager) Add() (Status, error) { // SendAndStop send message for descriptor and stop timing. func (m *manager) SendAndStop(desc ocispec.Descriptor, prompt string) error { - status, err := m.Add() + messenger, err := m.Add() if err != nil { return err } - defer close(status) - status <- NewStatusMessage(prompt, desc, desc.Size) - status <- EndTiming() + messenger.Send(prompt, desc, desc.Size) + messenger.Stop() return nil } -func (m *manager) statusChan(s *status) Status { +func (m *manager) statusChan(s *status) *Messenger { ch := make(chan *status, BufferSize) m.updating.Add(1) go func() { defer m.updating.Done() for newStatus := range ch { - s.Update(newStatus) + s.update(newStatus) } }() - return ch + return &Messenger{ch: ch} } // Close stops all status and waits for updating and rendering. diff --git a/cmd/oras/internal/display/status/progress/messenger.go b/cmd/oras/internal/display/status/progress/messenger.go new file mode 100644 index 000000000..9f0188b5a --- /dev/null +++ b/cmd/oras/internal/display/status/progress/messenger.go @@ -0,0 +1,95 @@ +/* +Copyright The ORAS Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package progress + +import ( + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "oras.land/oras/cmd/oras/internal/display/status/progress/humanize" + "time" +) + +// Messenger is progress message channel. +type Messenger struct { + ch chan *status + closed bool +} + +// Start initializes the messenger. +func (sm *Messenger) Start() { + if sm.ch == nil { + return + } + sm.ch <- startTiming() +} + +// Send a status message for the specified descriptor. +func (sm *Messenger) Send(prompt string, descriptor ocispec.Descriptor, offset int64) { + for { + select { + case sm.ch <- newStatusMessage(prompt, descriptor, offset): + return + case <-sm.ch: + // purge the channel until successfully pushed + default: + // ch is nil + return + } + } +} + +// Stop the messenger after sending a end message. +func (sm *Messenger) Stop() { + if sm.closed { + return + } + sm.ch <- endTiming() + close(sm.ch) + sm.closed = true +} + +// newStatus generates a base empty status. +func newStatus() *status { + return &status{ + offset: -1, + total: humanize.ToBytes(0), + speedWindow: newSpeedWindow(framePerSecond), + } +} + +// newStatusMessage generates a status for messaging. +func newStatusMessage(prompt string, descriptor ocispec.Descriptor, offset int64) *status { + return &status{ + prompt: prompt, + descriptor: descriptor, + offset: offset, + } +} + +// startTiming creates start timing message. +func startTiming() *status { + return &status{ + offset: -1, + startTime: time.Now(), + } +} + +// endTiming creates end timing message. +func endTiming() *status { + return &status{ + offset: -1, + endTime: time.Now(), + } +} diff --git a/cmd/oras/internal/display/status/progress/messenger_test.go b/cmd/oras/internal/display/status/progress/messenger_test.go new file mode 100644 index 000000000..a8b782e55 --- /dev/null +++ b/cmd/oras/internal/display/status/progress/messenger_test.go @@ -0,0 +1,94 @@ +/* +Copyright The ORAS Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package progress + +import ( + v1 "github.com/opencontainers/image-spec/specs-go/v1" + "testing" +) + +func Test_Messenger(t *testing.T) { + var msg *status + ch := make(chan *status, BufferSize) + messenger := &Messenger{ch: ch} + + messenger.Start() + select { + case msg = <-ch: + if msg.offset != -1 { + t.Errorf("Expected start message with offset -1, got %d", msg.offset) + } + default: + t.Error("Expected start message") + } + + desc := v1.Descriptor{ + Digest: "mouse", + Size: 100, + } + expected := int64(50) + messenger.Send("Reading", desc, expected) + select { + case msg = <-ch: + if msg.offset != expected { + t.Errorf("Expected status message with offset %d, got %d", expected, msg.offset) + } + if msg.prompt != "Reading" { + t.Errorf("Expected status message prompt Reading, got %s", msg.prompt) + } + default: + t.Error("Expected status message") + } + + messenger.Send("Reading", desc, expected) + messenger.Send("Read", desc, desc.Size) + select { + case msg = <-ch: + if msg.offset != desc.Size { + t.Errorf("Expected status message with offset %d, got %d", expected, msg.offset) + } + if msg.prompt != "Read" { + t.Errorf("Expected status message prompt Read, got %s", msg.prompt) + } + default: + t.Error("Expected status message") + } + select { + case msg = <-ch: + t.Errorf("Unexpected status message %v", msg) + default: + } + + expected = int64(-1) + messenger.Stop() + select { + case msg = <-ch: + if msg.offset != expected { + t.Errorf("Expected END status message with offset %d, got %d", expected, msg.offset) + } + default: + t.Error("Expected END status message") + } + + messenger.Stop() + select { + case msg = <-ch: + if msg != nil { + t.Errorf("Unexpected status message %v", msg) + } + default: + } +} diff --git a/cmd/oras/internal/display/status/progress/status.go b/cmd/oras/internal/display/status/progress/status.go index b2abb6ee4..a1074e1e3 100644 --- a/cmd/oras/internal/display/status/progress/status.go +++ b/cmd/oras/internal/display/status/progress/status.go @@ -56,40 +56,6 @@ type status struct { lock sync.Mutex } -// newStatus generates a base empty status. -func newStatus() *status { - return &status{ - offset: -1, - total: humanize.ToBytes(0), - speedWindow: newSpeedWindow(framePerSecond), - } -} - -// NewStatusMessage generates a status for messaging. -func NewStatusMessage(prompt string, descriptor ocispec.Descriptor, offset int64) *status { - return &status{ - prompt: prompt, - descriptor: descriptor, - offset: offset, - } -} - -// StartTiming starts timing. -func StartTiming() *status { - return &status{ - offset: -1, - startTime: time.Now(), - } -} - -// EndTiming ends timing and set status to done. -func EndTiming() *status { - return &status{ - offset: -1, - endTime: time.Now(), - } -} - func (s *status) isZero() bool { return s.offset < 0 && s.startTime.IsZero() && s.endTime.IsZero() } @@ -121,7 +87,7 @@ func (s *status) String(width int) (string, string) { percent = 1 default: // 0% ~ 99%, show 2-digit precision if total != 0 && s.offset >= 0 { - // percentage calculatable + // calculate percentage percent = float64(s.offset) / float64(total) } offset = fmt.Sprintf("%.2f", humanize.RoundTo(s.total.Size*percent)) @@ -190,8 +156,7 @@ func (s *status) durationString() string { return d.String() } -// Update updates a status. -func (s *status) Update(n *status) { +func (s *status) update(n *status) { s.lock.Lock() defer s.lock.Unlock() diff --git a/cmd/oras/internal/display/status/progress/status_test.go b/cmd/oras/internal/display/status/progress/status_test.go index e0b3bb912..9f8223f6e 100644 --- a/cmd/oras/internal/display/status/progress/status_test.go +++ b/cmd/oras/internal/display/status/progress/status_test.go @@ -35,7 +35,7 @@ func Test_status_String(t *testing.T) { } // not done - s.Update(&status{ + s.update(&status{ prompt: "test", descriptor: ocispec.Descriptor{ MediaType: "application/vnd.oci.empty.oras.test.v1+json", @@ -57,7 +57,7 @@ func Test_status_String(t *testing.T) { t.Error(err) } // done - s.Update(&status{ + s.update(&status{ endTime: time.Now(), offset: s.descriptor.Size, descriptor: s.descriptor, @@ -76,7 +76,7 @@ func Test_status_String_zeroWidth(t *testing.T) { } // not done - s.Update(&status{ + s.update(&status{ prompt: "test", descriptor: ocispec.Descriptor{ MediaType: "application/vnd.oci.empty.oras.test.v1+json", @@ -93,7 +93,7 @@ func Test_status_String_zeroWidth(t *testing.T) { t.Error(err) } // done - s.Update(&status{ + s.update(&status{ endTime: time.Now(), offset: s.descriptor.Size, descriptor: s.descriptor, diff --git a/cmd/oras/internal/display/status/track/reader.go b/cmd/oras/internal/display/status/track/reader.go index 28f647d36..93919381f 100644 --- a/cmd/oras/internal/display/status/track/reader.go +++ b/cmd/oras/internal/display/status/track/reader.go @@ -30,7 +30,7 @@ type reader struct { donePrompt string descriptor ocispec.Descriptor manager progress.Manager - status progress.Status + messenger *progress.Messenger } // NewReader returns a new reader with tracked progress. @@ -43,7 +43,7 @@ func NewReader(r io.Reader, descriptor ocispec.Descriptor, actionPrompt string, } func managedReader(r io.Reader, descriptor ocispec.Descriptor, manager progress.Manager, actionPrompt string, donePrompt string) (*reader, error) { - ch, err := manager.Add() + messenger, err := manager.Add() if err != nil { return nil, err } @@ -54,11 +54,11 @@ func managedReader(r io.Reader, descriptor ocispec.Descriptor, manager progress. actionPrompt: actionPrompt, donePrompt: donePrompt, manager: manager, - status: ch, + messenger: messenger, }, nil } -// StopManager stops the status channel and related manager. +// StopManager stops the messenger channel and related manager. func (r *reader) StopManager() { r.Close() _ = r.manager.Close() @@ -66,18 +66,18 @@ func (r *reader) StopManager() { // Done sends message to mark the tracked progress as complete. func (r *reader) Done() { - r.status <- progress.NewStatusMessage(r.donePrompt, r.descriptor, r.descriptor.Size) - r.status <- progress.EndTiming() + r.messenger.Send(r.donePrompt, r.descriptor, r.descriptor.Size) + r.messenger.Stop() } // Close closes the update channel. func (r *reader) Close() { - close(r.status) + r.messenger.Stop() } -// Start sends the start timing to the status channel. +// Start sends the start timing to the messenger channel. func (r *reader) Start() { - r.status <- progress.StartTiming() + r.messenger.Start() } // Read reads from the underlying reader and updates the progress. @@ -93,12 +93,6 @@ func (r *reader) Read(p []byte) (int, error) { return n, io.ErrUnexpectedEOF } } - for { - select { - case r.status <- progress.NewStatusMessage(r.actionPrompt, r.descriptor, r.offset): - // purge the channel until successfully pushed - return n, err - case <-r.status: - } - } + r.messenger.Send(r.actionPrompt, r.descriptor, r.offset) + return n, err } From 18f496ca12ee3eea6fbba401df47f787b4e06ead Mon Sep 17 00:00:00 2001 From: Terry Howe Date: Mon, 22 Jul 2024 18:58:01 -0600 Subject: [PATCH 2/2] refactor: Take private memory store out of test function (#1453) Signed-off-by: Terry Howe --- internal/graph/graph_test.go | 69 +++++++++++++++++------------------- 1 file changed, 33 insertions(+), 36 deletions(-) diff --git a/internal/graph/graph_test.go b/internal/graph/graph_test.go index 85c23cbfc..2a15c5bba 100644 --- a/internal/graph/graph_test.go +++ b/internal/graph/graph_test.go @@ -19,32 +19,37 @@ import ( "bytes" "context" "encoding/json" + "github.com/opencontainers/go-digest" + "oras.land/oras-go/v2/content/memory" "reflect" "testing" - "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras-go/v2/content" - "oras.land/oras-go/v2/content/memory" "oras.land/oras/internal/docker" ) -type fetcher struct { +type contentFetcher struct { content.Fetcher } -func TestSuccessors(t *testing.T) { +func newTestFetcher(t *testing.T) (subject, config, ociImage, dockerImage, index ocispec.Descriptor, fetcher content.Fetcher) { var blobs [][]byte - var descs []ocispec.Descriptor - appendBlob := func(mediaType string, blob []byte) { + ctx := context.Background() + memoryStorage := memory.New() + appendBlob := func(mediaType string, blob []byte) ocispec.Descriptor { blobs = append(blobs, blob) - descs = append(descs, ocispec.Descriptor{ + desc := ocispec.Descriptor{ MediaType: mediaType, Digest: digest.FromBytes(blob), Size: int64(len(blob)), - }) + } + if err := memoryStorage.Push(ctx, desc, bytes.NewReader(blob)); err != nil { + t.Errorf("Error pushing %v\n", err) + } + return desc } - generateImage := func(subject *ocispec.Descriptor, mediaType string, config ocispec.Descriptor, layers ...ocispec.Descriptor) { + generateImage := func(subject *ocispec.Descriptor, mediaType string, config ocispec.Descriptor, layers ...ocispec.Descriptor) ocispec.Descriptor { manifest := ocispec.Manifest{ MediaType: mediaType, Subject: subject, @@ -55,40 +60,32 @@ func TestSuccessors(t *testing.T) { if err != nil { t.Fatal(err) } - appendBlob(mediaType, manifestJSON) + return appendBlob(mediaType, manifestJSON) } - generateIndex := func(manifests ...ocispec.Descriptor) { + generateIndex := func(manifests ...ocispec.Descriptor) ocispec.Descriptor { index := ocispec.Index{ Manifests: manifests, } - manifestJSON, err := json.Marshal(index) + indexJSON, err := json.Marshal(index) if err != nil { t.Fatal(err) } - appendBlob(ocispec.MediaTypeImageIndex, manifestJSON) + return appendBlob(ocispec.MediaTypeImageIndex, indexJSON) } - const ( - subject = iota - config - ociImage - dockerImage - index - ) - appendBlob(ocispec.MediaTypeImageLayer, []byte("blob")) + + subject = appendBlob(ocispec.MediaTypeImageLayer, []byte("blob")) imageType := "test.image" - appendBlob(imageType, []byte("config content")) - generateImage(&descs[subject], ocispec.MediaTypeImageManifest, descs[config]) - generateImage(&descs[subject], docker.MediaTypeManifest, descs[config]) - generateIndex(descs[subject]) - memory := memory.New() - ctx := context.Background() - for i := range descs { - if err := memory.Push(ctx, descs[i], bytes.NewReader(blobs[i])); err != nil { - t.Errorf("Error pushing %v\n", err) - } - } - fetcher := &fetcher{Fetcher: memory} + config = appendBlob(imageType, []byte("config content")) + ociImage = generateImage(&subject, ocispec.MediaTypeImageManifest, config) + dockerImage = generateImage(&subject, docker.MediaTypeManifest, config) + index = generateIndex(subject) + return subject, config, ociImage, dockerImage, index, &contentFetcher{Fetcher: memoryStorage} +} + +func TestSuccessors(t *testing.T) { + subject, config, ociImage, dockerImage, index, fetcher := newTestFetcher(t) + ctx := context.Background() type args struct { ctx context.Context fetcher content.Fetcher @@ -104,9 +101,9 @@ func TestSuccessors(t *testing.T) { }{ {"should failed to get non-existent OCI image", args{ctx, fetcher, ocispec.Descriptor{MediaType: ocispec.MediaTypeImageManifest}}, nil, nil, nil, true}, {"should failed to get non-existent docker image", args{ctx, fetcher, ocispec.Descriptor{MediaType: docker.MediaTypeManifest}}, nil, nil, nil, true}, - {"should get success of a docker image", args{ctx, fetcher, descs[dockerImage]}, nil, &descs[subject], &descs[config], false}, - {"should get success of an OCI image", args{ctx, fetcher, descs[ociImage]}, nil, &descs[subject], &descs[config], false}, - {"should get success of an index", args{ctx, fetcher, descs[index]}, []ocispec.Descriptor{descs[subject]}, nil, nil, false}, + {"should get success of a docker image", args{ctx, fetcher, dockerImage}, nil, &subject, &config, false}, + {"should get success of an OCI image", args{ctx, fetcher, ociImage}, nil, &subject, &config, false}, + {"should get success of an index", args{ctx, fetcher, index}, []ocispec.Descriptor{subject}, nil, nil, false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {