From 4736e0dca94c8ac12bb4a9c4c5821ccdfa1d1dfa Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Fri, 30 Jun 2023 07:06:36 +0000 Subject: [PATCH 01/18] goroutined sbom image creation --- src/internal/packager/sbom/catalog.go | 62 ++++++++++++++++++--------- 1 file changed, 42 insertions(+), 20 deletions(-) diff --git a/src/internal/packager/sbom/catalog.go b/src/internal/packager/sbom/catalog.go index 550a664f47..07fa73df34 100755 --- a/src/internal/packager/sbom/catalog.go +++ b/src/internal/packager/sbom/catalog.go @@ -70,30 +70,52 @@ func Catalog(componentSBOMs map[string]*types.ComponentSBOM, imgList []string, t } builder.jsonList = json - // Generate SBOM for each image - currImage := 1 - for _, tag := range imgList { - builder.spinner.Updatef("Creating image SBOMs (%d of %d): %s", currImage, imageCount, tag) + // Generate SBOM for all images at once using goroutines - // Get the image that we are creating an SBOM for - img, err := utils.LoadOCIImage(tmpPaths.Images, tag) - if err != nil { - builder.spinner.Errorf(err, "Unable to load the image to generate an SBOM") - return err - } + // Create channel for letting us know when an image's SBOM is done + imageProgress := make(chan string, len(imgList)) - jsonData, err := builder.createImageSBOM(img, tag) - if err != nil { - builder.spinner.Errorf(err, "Unable to create SBOM for image %s", tag) - return err - } + // Create channel for letting us know when there was an error generating an SBOM + imageError := make(chan error, len(imgList)) - if err = builder.createSBOMViewerAsset(tag, jsonData); err != nil { - builder.spinner.Errorf(err, "Unable to create SBOM viewer for image %s", tag) - return err - } + // Call a goroutine for each image + for _, tag := range imgList { + go func(currentTag string) { + // Get the image that we are creating an SBOM for + img, err := utils.LoadOCIImage(tmpPaths.Images, currentTag) + if err != nil { + builder.spinner.Errorf(err, "Unable to load the image to generate an SBOM") + imageError <- err + } + + // Generate the SBOM JSON for the given image + jsonData, err := builder.createImageSBOM(img, currentTag) + if err != nil { + builder.spinner.Errorf(err, "Unable to create SBOM for image %s", currentTag) + imageError <- err + } + + // Create the SBOM viewer HTML for the given image + if err = builder.createSBOMViewerAsset(currentTag, jsonData); err != nil { + builder.spinner.Errorf(err, "Unable to create SBOM viewer for image %s", currentTag) + imageError <- err + } + // Call the imageProgress channel to let us know that the SBOM generation is done for this image + imageProgress <- currentTag + }(tag) + } - currImage++ + // Wait for all images to be done generating SBOMs + for i := 0; i < len(imgList); i++ { + select { + // If there was an error generating an SBOM, write the error to the spinner + case err := <-imageError: + builder.spinner.Errorf(err, "Unable to load the image to generate an SBOM") + // If there is a string in the imageProgress channel we know that an SBOM + // was generated for an image and we can update the spinner + case tag := <-imageProgress: + builder.spinner.Updatef("Creating image SBOMs (%d of %d): %s", i, len(imgList), tag) + } } currComponent := 1 From 2a1065f35fc40ba2d2163767c19c19bb393c9e28 Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Fri, 30 Jun 2023 18:01:29 +0000 Subject: [PATCH 02/18] keep same error behavior as before --- src/internal/packager/sbom/catalog.go | 32 ++++++++++++++++++--------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/src/internal/packager/sbom/catalog.go b/src/internal/packager/sbom/catalog.go index 07fa73df34..23f8f2733e 100755 --- a/src/internal/packager/sbom/catalog.go +++ b/src/internal/packager/sbom/catalog.go @@ -75,34 +75,43 @@ func Catalog(componentSBOMs map[string]*types.ComponentSBOM, imgList []string, t // Create channel for letting us know when an image's SBOM is done imageProgress := make(chan string, len(imgList)) + type errorWithMessage struct { + err error + message string + } + // Create channel for letting us know when there was an error generating an SBOM - imageError := make(chan error, len(imgList)) + imageError := make(chan errorWithMessage, len(imgList)) // Call a goroutine for each image - for _, tag := range imgList { - go func(currentTag string) { + for idx, tag := range imgList { + currentTag := tag + if idx == 3 || idx == 2 { + currentTag = currentTag + "54h3j5h43jk2l" + } + go func() { // Get the image that we are creating an SBOM for img, err := utils.LoadOCIImage(tmpPaths.Images, currentTag) if err != nil { - builder.spinner.Errorf(err, "Unable to load the image to generate an SBOM") - imageError <- err + imageError <- errorWithMessage{err, "Unable to load the image to generate an SBOM"} + return } // Generate the SBOM JSON for the given image jsonData, err := builder.createImageSBOM(img, currentTag) if err != nil { - builder.spinner.Errorf(err, "Unable to create SBOM for image %s", currentTag) - imageError <- err + imageError <- errorWithMessage{err, fmt.Sprintf("Unable to create SBOM for image %s", currentTag)} + return } // Create the SBOM viewer HTML for the given image if err = builder.createSBOMViewerAsset(currentTag, jsonData); err != nil { - builder.spinner.Errorf(err, "Unable to create SBOM viewer for image %s", currentTag) - imageError <- err + imageError <- errorWithMessage{err, fmt.Sprintf("Unable to create SBOM viewer for image %s", currentTag)} + return } // Call the imageProgress channel to let us know that the SBOM generation is done for this image imageProgress <- currentTag - }(tag) + }() } // Wait for all images to be done generating SBOMs @@ -110,7 +119,8 @@ func Catalog(componentSBOMs map[string]*types.ComponentSBOM, imgList []string, t select { // If there was an error generating an SBOM, write the error to the spinner case err := <-imageError: - builder.spinner.Errorf(err, "Unable to load the image to generate an SBOM") + builder.spinner.Errorf(err.err, err.message) + return err.err // If there is a string in the imageProgress channel we know that an SBOM // was generated for an image and we can update the spinner case tag := <-imageProgress: From b0a3e0dba377b1a3f179840e510ff45d38b2655b Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Fri, 30 Jun 2023 18:17:21 +0000 Subject: [PATCH 03/18] almost forgot to remove error gen code lol --- src/internal/packager/sbom/catalog.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/internal/packager/sbom/catalog.go b/src/internal/packager/sbom/catalog.go index 23f8f2733e..642bc6ae6b 100755 --- a/src/internal/packager/sbom/catalog.go +++ b/src/internal/packager/sbom/catalog.go @@ -84,11 +84,8 @@ func Catalog(componentSBOMs map[string]*types.ComponentSBOM, imgList []string, t imageError := make(chan errorWithMessage, len(imgList)) // Call a goroutine for each image - for idx, tag := range imgList { + for _, tag := range imgList { currentTag := tag - if idx == 3 || idx == 2 { - currentTag = currentTag + "54h3j5h43jk2l" - } go func() { // Get the image that we are creating an SBOM for img, err := utils.LoadOCIImage(tmpPaths.Images, currentTag) From 5a740039f8890c4ecdf129366075b66c59df27fd Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Tue, 4 Jul 2023 07:21:11 +0000 Subject: [PATCH 04/18] better concurrency for sboms --- src/internal/packager/sbom/catalog.go | 141 +++++++++++++++++++------- src/pkg/message/message.go | 6 ++ src/pkg/utils/concurrency.go | 53 ++++++++++ 3 files changed, 162 insertions(+), 38 deletions(-) create mode 100644 src/pkg/utils/concurrency.go diff --git a/src/internal/packager/sbom/catalog.go b/src/internal/packager/sbom/catalog.go index 642bc6ae6b..36b9b2c833 100755 --- a/src/internal/packager/sbom/catalog.go +++ b/src/internal/packager/sbom/catalog.go @@ -71,83 +71,148 @@ func Catalog(componentSBOMs map[string]*types.ComponentSBOM, imgList []string, t builder.jsonList = json // Generate SBOM for all images at once using goroutines + // Use the ConcurrencyTools part of the utils package to help with concurrency + imageSBOMConcurrency := utils.NewConcurrencyTools(len(imgList)) - // Create channel for letting us know when an image's SBOM is done - imageProgress := make(chan string, len(imgList)) - - type errorWithMessage struct { - err error - message string - } - - // Create channel for letting us know when there was an error generating an SBOM - imageError := make(chan errorWithMessage, len(imgList)) + // Make sure cancel is always called + defer imageSBOMConcurrency.Cancel() // Call a goroutine for each image for _, tag := range imgList { currentTag := tag go func() { + // Make sure to call Done() on the WaitGroup when the goroutine finishes + defer imageSBOMConcurrency.WaitGroup.Done() // Get the image that we are creating an SBOM for img, err := utils.LoadOCIImage(tmpPaths.Images, currentTag) if err != nil { - imageError <- errorWithMessage{err, "Unable to load the image to generate an SBOM"} + imageSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: "Unable to load the image to generate an SBOM"} + return + } + + // If the context has been cancelled end the goroutine + if utils.ContextDone(imageSBOMConcurrency.Context) { return } // Generate the SBOM JSON for the given image jsonData, err := builder.createImageSBOM(img, currentTag) if err != nil { - imageError <- errorWithMessage{err, fmt.Sprintf("Unable to create SBOM for image %s", currentTag)} + imageSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: fmt.Sprintf("Unable to create SBOM for image %s", currentTag)} + return + } + + // If the context has been cancelled end the goroutine + if utils.ContextDone(imageSBOMConcurrency.Context) { return } // Create the SBOM viewer HTML for the given image if err = builder.createSBOMViewerAsset(currentTag, jsonData); err != nil { - imageError <- errorWithMessage{err, fmt.Sprintf("Unable to create SBOM viewer for image %s", currentTag)} + imageSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: fmt.Sprintf("Unable to create SBOM viewer for image %s", currentTag)} + return + } + + // If the context has been cancelled end the goroutine + if utils.ContextDone(imageSBOMConcurrency.Context) { return } - // Call the imageProgress channel to let us know that the SBOM generation is done for this image - imageProgress <- currentTag + + // Call the progress channel to let us know that the SBOM generation is done for this image + imageSBOMConcurrency.ProgressChan <- currentTag }() } // Wait for all images to be done generating SBOMs for i := 0; i < len(imgList); i++ { select { - // If there was an error generating an SBOM, write the error to the spinner - case err := <-imageError: - builder.spinner.Errorf(err.err, err.message) - return err.err - // If there is a string in the imageProgress channel we know that an SBOM + // If there was an error generating an SBOM + case erroredImage := <-imageSBOMConcurrency.ErrorChan: + // Write the error to the spinner + builder.spinner.Errorf(erroredImage.Error, erroredImage.Message) + // Cancel the context to stop the goroutines + imageSBOMConcurrency.Cancel() + // Wait for all goroutines to finish + imageSBOMConcurrency.WaitGroup.Wait() + // Return the error + return erroredImage.Error + // If there is a string in the imageConcurrency.ProgressChan channel we know that an SBOM // was generated for an image and we can update the spinner - case tag := <-imageProgress: + case tag := <-imageSBOMConcurrency.ProgressChan: builder.spinner.Updatef("Creating image SBOMs (%d of %d): %s", i, len(imgList), tag) } } - currComponent := 1 + // Generate SBOM for all images at once using goroutines + + builder.spinner.Updatef("Creating component file SBOMs (0 of %d)", len(componentSBOMs)) + + // Use the ConcurrencyTools part of the utils package to help with concurrency + fileSBOMConcurrency := utils.NewConcurrencyTools(len(imgList)) - // Generate SBOM for each component for component := range componentSBOMs { - builder.spinner.Updatef("Creating component file SBOMs (%d of %d): %s", currComponent, componentCount, component) + currentComponent := component + go func() { + // Make sure to call Done() on the WaitGroup when the goroutine finishes + defer fileSBOMConcurrency.WaitGroup.Done() - if componentSBOMs[component] == nil { - message.Debugf("Component %s has invalid SBOM, skipping", component) - continue - } + // Check if component requires SBOM generation + if componentSBOMs[currentComponent] == nil { + message.Debugf("Component %s has invalid SBOM, skipping", component) + return + } - jsonData, err := builder.createFileSBOM(*componentSBOMs[component], component) - if err != nil { - builder.spinner.Errorf(err, "Unable to create SBOM for component %s", component) - return err - } + // If the context has been cancelled end the goroutine + if utils.ContextDone(fileSBOMConcurrency.Context) { + return + } - if err = builder.createSBOMViewerAsset(fmt.Sprintf("%s%s", componentPrefix, component), jsonData); err != nil { - builder.spinner.Errorf(err, "Unable to create SBOM viewer for component %s", component) - return err - } + // Generate the SBOM JSON for the given component + jsonData, err := builder.createFileSBOM(*componentSBOMs[currentComponent], currentComponent) + if err != nil { + fileSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: fmt.Sprintf("Unable to create SBOM for component %s", currentComponent)} + return + } + + // If the context has been cancelled end the goroutine + if utils.ContextDone(fileSBOMConcurrency.Context) { + return + } - currComponent++ + // Create the SBOM viewer HTML for the given component + if err = builder.createSBOMViewerAsset(fmt.Sprintf("%s%s", componentPrefix, currentComponent), jsonData); err != nil { + fileSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: fmt.Sprintf("Unable to create SBOM viewer for component %s", currentComponent)} + return + } + + // If the context has been cancelled end the goroutine + if utils.ContextDone(fileSBOMConcurrency.Context) { + return + } + + // Call the progress channel to let us know that the SBOM generation is done for this component + fileSBOMConcurrency.ProgressChan <- currentComponent + }() + } + + // Wait for all components to be done generating SBOMs + for i := 0; i < len(componentSBOMs); i++ { + select { + // If there was an error generating an SBOM + case erroredComponent := <-fileSBOMConcurrency.ErrorChan: + // Write the error to the spinner + builder.spinner.Errorf(erroredComponent.Error, erroredComponent.Message) + // Cancel the context to stop the goroutines + fileSBOMConcurrency.Cancel() + // Wait for all goroutines to finish + fileSBOMConcurrency.WaitGroup.Wait() + // Return the error + return erroredComponent.Error + // If there is a string in the fileConcurrency.ProgressChan channel we know that an SBOM + // was generated for a file and we can update the spinner + case component := <-fileSBOMConcurrency.ProgressChan: + builder.spinner.Updatef("Creating component file SBOMs (%d of %d): %s", i, len(componentSBOMs), component) + } } // Include the compare tool if there are any image SBOMs OR component SBOMs diff --git a/src/pkg/message/message.go b/src/pkg/message/message.go index 590de7d318..3428a1d814 100644 --- a/src/pkg/message/message.go +++ b/src/pkg/message/message.go @@ -55,6 +55,12 @@ var useLogFile bool // DebugWriter represents a writer interface that writes to message.Debug type DebugWriter struct{} +// ErrorWithMessage is a simple struct of an error and a string primarily intended for use with channels +type ErrorWithMessage struct { + Error error + Message string +} + func (d *DebugWriter) Write(raw []byte) (int, error) { Debug(string(raw)) return len(raw), nil diff --git a/src/pkg/utils/concurrency.go b/src/pkg/utils/concurrency.go new file mode 100644 index 0000000000..d7ed508735 --- /dev/null +++ b/src/pkg/utils/concurrency.go @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: 2021-Present The Zarf Authors +// forked from https://www.socketloop.com/tutorials/golang-byte-format-example + +// Package utils provides generic helper functions. +package utils + +import ( + "context" + "sync" + + "github.com/defenseunicorns/zarf/src/pkg/message" +) + +type ConcurrencyTools struct { + ProgressChan chan string + ErrorChan chan message.ErrorWithMessage + Context context.Context + Cancel context.CancelFunc + WaitGroup *sync.WaitGroup +} + +func NewConcurrencyTools(length int) *ConcurrencyTools { + ctx, cancel := context.WithCancel(context.Background()) + + progressChan := make(chan string, length) + + errorChan := make(chan message.ErrorWithMessage, length) + + waitGroup := sync.WaitGroup{} + + waitGroup.Add(length) + + concurrencyTools := ConcurrencyTools{ + ProgressChan: progressChan, + ErrorChan: errorChan, + Context: ctx, + Cancel: cancel, + WaitGroup: &waitGroup, + } + + return &concurrencyTools +} + + +func ContextDone(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} From a26a3d649c346bb16801abc0c98ecd0765fafe4d Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Tue, 4 Jul 2023 07:37:40 +0000 Subject: [PATCH 05/18] bugfix and generics --- src/internal/packager/sbom/catalog.go | 6 +++--- src/pkg/utils/concurrency.go | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/internal/packager/sbom/catalog.go b/src/internal/packager/sbom/catalog.go index 36b9b2c833..7bf583d50c 100755 --- a/src/internal/packager/sbom/catalog.go +++ b/src/internal/packager/sbom/catalog.go @@ -72,7 +72,7 @@ func Catalog(componentSBOMs map[string]*types.ComponentSBOM, imgList []string, t // Generate SBOM for all images at once using goroutines // Use the ConcurrencyTools part of the utils package to help with concurrency - imageSBOMConcurrency := utils.NewConcurrencyTools(len(imgList)) + imageSBOMConcurrency := utils.NewConcurrencyTools[string](len(imgList)) // Make sure cancel is always called defer imageSBOMConcurrency.Cancel() @@ -148,7 +148,7 @@ func Catalog(componentSBOMs map[string]*types.ComponentSBOM, imgList []string, t builder.spinner.Updatef("Creating component file SBOMs (0 of %d)", len(componentSBOMs)) // Use the ConcurrencyTools part of the utils package to help with concurrency - fileSBOMConcurrency := utils.NewConcurrencyTools(len(imgList)) + fileSBOMConcurrency := utils.NewConcurrencyTools[string](len(imgList)) for component := range componentSBOMs { currentComponent := component @@ -158,7 +158,7 @@ func Catalog(componentSBOMs map[string]*types.ComponentSBOM, imgList []string, t // Check if component requires SBOM generation if componentSBOMs[currentComponent] == nil { - message.Debugf("Component %s has invalid SBOM, skipping", component) + message.Debugf("Component %s has invalid SBOM, skipping", currentComponent) return } diff --git a/src/pkg/utils/concurrency.go b/src/pkg/utils/concurrency.go index d7ed508735..1477ad6d8e 100644 --- a/src/pkg/utils/concurrency.go +++ b/src/pkg/utils/concurrency.go @@ -12,18 +12,18 @@ import ( "github.com/defenseunicorns/zarf/src/pkg/message" ) -type ConcurrencyTools struct { - ProgressChan chan string +type ConcurrencyTools[T any] struct { + ProgressChan chan T ErrorChan chan message.ErrorWithMessage Context context.Context Cancel context.CancelFunc WaitGroup *sync.WaitGroup } -func NewConcurrencyTools(length int) *ConcurrencyTools { +func NewConcurrencyTools[T any](length int) *ConcurrencyTools[T] { ctx, cancel := context.WithCancel(context.Background()) - progressChan := make(chan string, length) + progressChan := make(chan T, length) errorChan := make(chan message.ErrorWithMessage, length) @@ -31,7 +31,7 @@ func NewConcurrencyTools(length int) *ConcurrencyTools { waitGroup.Add(length) - concurrencyTools := ConcurrencyTools{ + concurrencyTools := ConcurrencyTools[T]{ ProgressChan: progressChan, ErrorChan: errorChan, Context: ctx, From 099b28d35c9e0779178b27626e770754044c7310 Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Tue, 4 Jul 2023 10:23:51 +0000 Subject: [PATCH 06/18] missed a copypasta modification --- src/internal/packager/sbom/catalog.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/internal/packager/sbom/catalog.go b/src/internal/packager/sbom/catalog.go index 7bf583d50c..756567ab0d 100755 --- a/src/internal/packager/sbom/catalog.go +++ b/src/internal/packager/sbom/catalog.go @@ -148,7 +148,7 @@ func Catalog(componentSBOMs map[string]*types.ComponentSBOM, imgList []string, t builder.spinner.Updatef("Creating component file SBOMs (0 of %d)", len(componentSBOMs)) // Use the ConcurrencyTools part of the utils package to help with concurrency - fileSBOMConcurrency := utils.NewConcurrencyTools[string](len(imgList)) + fileSBOMConcurrency := utils.NewConcurrencyTools[string](len(componentSBOMs)) for component := range componentSBOMs { currentComponent := component From 420dc0abf3d5cc3918a6081a6dc07e002b9fe070 Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Tue, 4 Jul 2023 10:59:09 +0000 Subject: [PATCH 07/18] better genericism --- src/internal/packager/sbom/catalog.go | 4 ++-- src/pkg/utils/concurrency.go | 17 +++++++---------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/src/internal/packager/sbom/catalog.go b/src/internal/packager/sbom/catalog.go index 756567ab0d..a471d4f34d 100755 --- a/src/internal/packager/sbom/catalog.go +++ b/src/internal/packager/sbom/catalog.go @@ -72,7 +72,7 @@ func Catalog(componentSBOMs map[string]*types.ComponentSBOM, imgList []string, t // Generate SBOM for all images at once using goroutines // Use the ConcurrencyTools part of the utils package to help with concurrency - imageSBOMConcurrency := utils.NewConcurrencyTools[string](len(imgList)) + imageSBOMConcurrency := utils.NewConcurrencyTools[string, message.ErrorWithMessage](len(imgList)) // Make sure cancel is always called defer imageSBOMConcurrency.Cancel() @@ -148,7 +148,7 @@ func Catalog(componentSBOMs map[string]*types.ComponentSBOM, imgList []string, t builder.spinner.Updatef("Creating component file SBOMs (0 of %d)", len(componentSBOMs)) // Use the ConcurrencyTools part of the utils package to help with concurrency - fileSBOMConcurrency := utils.NewConcurrencyTools[string](len(componentSBOMs)) + fileSBOMConcurrency := utils.NewConcurrencyTools[string, message.ErrorWithMessage](len(componentSBOMs)) for component := range componentSBOMs { currentComponent := component diff --git a/src/pkg/utils/concurrency.go b/src/pkg/utils/concurrency.go index 1477ad6d8e..d499ebce40 100644 --- a/src/pkg/utils/concurrency.go +++ b/src/pkg/utils/concurrency.go @@ -8,30 +8,28 @@ package utils import ( "context" "sync" - - "github.com/defenseunicorns/zarf/src/pkg/message" ) -type ConcurrencyTools[T any] struct { - ProgressChan chan T - ErrorChan chan message.ErrorWithMessage +type ConcurrencyTools[P any, E any] struct { + ProgressChan chan P + ErrorChan chan E Context context.Context Cancel context.CancelFunc WaitGroup *sync.WaitGroup } -func NewConcurrencyTools[T any](length int) *ConcurrencyTools[T] { +func NewConcurrencyTools[P any, E any](length int) *ConcurrencyTools[P, E] { ctx, cancel := context.WithCancel(context.Background()) - progressChan := make(chan T, length) + progressChan := make(chan P, length) - errorChan := make(chan message.ErrorWithMessage, length) + errorChan := make(chan E, length) waitGroup := sync.WaitGroup{} waitGroup.Add(length) - concurrencyTools := ConcurrencyTools[T]{ + concurrencyTools := ConcurrencyTools[P, E]{ ProgressChan: progressChan, ErrorChan: errorChan, Context: ctx, @@ -42,7 +40,6 @@ func NewConcurrencyTools[T any](length int) *ConcurrencyTools[T] { return &concurrencyTools } - func ContextDone(ctx context.Context) bool { select { case <-ctx.Done(): From 80c4a2649fcad53f49cfedac668ad460ebe91a03 Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Tue, 4 Jul 2023 12:29:01 +0000 Subject: [PATCH 08/18] prettier concurrency --- src/internal/packager/sbom/catalog.go | 254 +++++++++++++------------- src/pkg/utils/concurrency.go | 22 +++ 2 files changed, 145 insertions(+), 131 deletions(-) diff --git a/src/internal/packager/sbom/catalog.go b/src/internal/packager/sbom/catalog.go index a471d4f34d..6a4e9caf14 100755 --- a/src/internal/packager/sbom/catalog.go +++ b/src/internal/packager/sbom/catalog.go @@ -70,149 +70,141 @@ func Catalog(componentSBOMs map[string]*types.ComponentSBOM, imgList []string, t } builder.jsonList = json - // Generate SBOM for all images at once using goroutines - // Use the ConcurrencyTools part of the utils package to help with concurrency - imageSBOMConcurrency := utils.NewConcurrencyTools[string, message.ErrorWithMessage](len(imgList)) - - // Make sure cancel is always called - defer imageSBOMConcurrency.Cancel() - - // Call a goroutine for each image - for _, tag := range imgList { - currentTag := tag - go func() { - // Make sure to call Done() on the WaitGroup when the goroutine finishes - defer imageSBOMConcurrency.WaitGroup.Done() - // Get the image that we are creating an SBOM for - img, err := utils.LoadOCIImage(tmpPaths.Images, currentTag) - if err != nil { - imageSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: "Unable to load the image to generate an SBOM"} - return - } - - // If the context has been cancelled end the goroutine - if utils.ContextDone(imageSBOMConcurrency.Context) { - return - } - - // Generate the SBOM JSON for the given image - jsonData, err := builder.createImageSBOM(img, currentTag) - if err != nil { - imageSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: fmt.Sprintf("Unable to create SBOM for image %s", currentTag)} - return - } - - // If the context has been cancelled end the goroutine - if utils.ContextDone(imageSBOMConcurrency.Context) { - return - } - - // Create the SBOM viewer HTML for the given image - if err = builder.createSBOMViewerAsset(currentTag, jsonData); err != nil { - imageSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: fmt.Sprintf("Unable to create SBOM viewer for image %s", currentTag)} - return - } - - // If the context has been cancelled end the goroutine - if utils.ContextDone(imageSBOMConcurrency.Context) { - return - } - - // Call the progress channel to let us know that the SBOM generation is done for this image - imageSBOMConcurrency.ProgressChan <- currentTag - }() - } + if len(imgList) > 0 { + // Generate SBOM for all images at once using goroutines + // Use the ConcurrencyTools part of the utils package to help with concurrency + imageSBOMConcurrency := utils.NewConcurrencyTools[string, message.ErrorWithMessage](len(imgList)) + + // Make sure cancel is always called + defer imageSBOMConcurrency.Cancel() + + // Call a goroutine for each image + for _, tag := range imgList { + currentTag := tag + go func() { + // Make sure to call Done() on the WaitGroup when the goroutine finishes + defer imageSBOMConcurrency.WaitGroup.Done() + // Get the image that we are creating an SBOM for + img, err := utils.LoadOCIImage(tmpPaths.Images, currentTag) + if err != nil { + imageSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: "Unable to load the image to generate an SBOM"} + return + } + + // If the context has been cancelled end the goroutine + if utils.ContextDone(imageSBOMConcurrency.Context) { + return + } + + // Generate the SBOM JSON for the given image + jsonData, err := builder.createImageSBOM(img, currentTag) + if err != nil { + imageSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: fmt.Sprintf("Unable to create SBOM for image %s", currentTag)} + return + } + + // If the context has been cancelled end the goroutine + if utils.ContextDone(imageSBOMConcurrency.Context) { + return + } + + // Create the SBOM viewer HTML for the given image + if err = builder.createSBOMViewerAsset(currentTag, jsonData); err != nil { + imageSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: fmt.Sprintf("Unable to create SBOM viewer for image %s", currentTag)} + return + } + + // If the context has been cancelled end the goroutine + if utils.ContextDone(imageSBOMConcurrency.Context) { + return + } + + // Call the progress channel to let us know that the SBOM generation is done for this image + imageSBOMConcurrency.ProgressChan <- currentTag + }() + } - // Wait for all images to be done generating SBOMs - for i := 0; i < len(imgList); i++ { - select { - // If there was an error generating an SBOM - case erroredImage := <-imageSBOMConcurrency.ErrorChan: - // Write the error to the spinner + imageSBOMErrorFunc := func(erroredImage message.ErrorWithMessage) error { builder.spinner.Errorf(erroredImage.Error, erroredImage.Message) - // Cancel the context to stop the goroutines - imageSBOMConcurrency.Cancel() - // Wait for all goroutines to finish - imageSBOMConcurrency.WaitGroup.Wait() - // Return the error return erroredImage.Error - // If there is a string in the imageConcurrency.ProgressChan channel we know that an SBOM - // was generated for an image and we can update the spinner - case tag := <-imageSBOMConcurrency.ProgressChan: + } + + imageSBOMProgressFunc := func(tag string, i int) { builder.spinner.Updatef("Creating image SBOMs (%d of %d): %s", i, len(imgList), tag) } - } - // Generate SBOM for all images at once using goroutines - - builder.spinner.Updatef("Creating component file SBOMs (0 of %d)", len(componentSBOMs)) - - // Use the ConcurrencyTools part of the utils package to help with concurrency - fileSBOMConcurrency := utils.NewConcurrencyTools[string, message.ErrorWithMessage](len(componentSBOMs)) - - for component := range componentSBOMs { - currentComponent := component - go func() { - // Make sure to call Done() on the WaitGroup when the goroutine finishes - defer fileSBOMConcurrency.WaitGroup.Done() - - // Check if component requires SBOM generation - if componentSBOMs[currentComponent] == nil { - message.Debugf("Component %s has invalid SBOM, skipping", currentComponent) - return - } - - // If the context has been cancelled end the goroutine - if utils.ContextDone(fileSBOMConcurrency.Context) { - return - } - - // Generate the SBOM JSON for the given component - jsonData, err := builder.createFileSBOM(*componentSBOMs[currentComponent], currentComponent) - if err != nil { - fileSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: fmt.Sprintf("Unable to create SBOM for component %s", currentComponent)} - return - } - - // If the context has been cancelled end the goroutine - if utils.ContextDone(fileSBOMConcurrency.Context) { - return - } - - // Create the SBOM viewer HTML for the given component - if err = builder.createSBOMViewerAsset(fmt.Sprintf("%s%s", componentPrefix, currentComponent), jsonData); err != nil { - fileSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: fmt.Sprintf("Unable to create SBOM viewer for component %s", currentComponent)} - return - } - - // If the context has been cancelled end the goroutine - if utils.ContextDone(fileSBOMConcurrency.Context) { - return - } - - // Call the progress channel to let us know that the SBOM generation is done for this component - fileSBOMConcurrency.ProgressChan <- currentComponent - }() + err = utils.WaitForConcurrencyTools(imageSBOMConcurrency, imageSBOMProgressFunc, imageSBOMErrorFunc) + if err != nil { + return err + } } - // Wait for all components to be done generating SBOMs - for i := 0; i < len(componentSBOMs); i++ { - select { - // If there was an error generating an SBOM - case erroredComponent := <-fileSBOMConcurrency.ErrorChan: - // Write the error to the spinner + // Generate SBOM for all components' files/dataInjections at once using goroutines + + if len(componentSBOMs) > 0 { + builder.spinner.Updatef("Creating component file SBOMs (0 of %d)", len(componentSBOMs)) + + // Use the ConcurrencyTools part of the utils package to help with concurrency + fileSBOMConcurrency := utils.NewConcurrencyTools[string, message.ErrorWithMessage](len(componentSBOMs)) + + for component := range componentSBOMs { + currentComponent := component + go func() { + // Make sure to call Done() on the WaitGroup when the goroutine finishes + defer fileSBOMConcurrency.WaitGroup.Done() + + // Check if component requires SBOM generation + if componentSBOMs[currentComponent] == nil { + message.Debugf("Component %s has invalid SBOM, skipping", currentComponent) + return + } + + // If the context has been cancelled end the goroutine + if utils.ContextDone(fileSBOMConcurrency.Context) { + return + } + + // Generate the SBOM JSON for the given component + jsonData, err := builder.createFileSBOM(*componentSBOMs[currentComponent], currentComponent) + if err != nil { + fileSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: fmt.Sprintf("Unable to create SBOM for component %s", currentComponent)} + return + } + + // If the context has been cancelled end the goroutine + if utils.ContextDone(fileSBOMConcurrency.Context) { + return + } + + // Create the SBOM viewer HTML for the given component + if err = builder.createSBOMViewerAsset(fmt.Sprintf("%s%s", componentPrefix, currentComponent), jsonData); err != nil { + fileSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: fmt.Sprintf("Unable to create SBOM viewer for component %s", currentComponent)} + return + } + + // If the context has been cancelled end the goroutine + if utils.ContextDone(fileSBOMConcurrency.Context) { + return + } + + // Call the progress channel to let us know that the SBOM generation is done for this component + fileSBOMConcurrency.ProgressChan <- currentComponent + }() + } + + fileSBOMErrorFunc := func(erroredComponent message.ErrorWithMessage) error { builder.spinner.Errorf(erroredComponent.Error, erroredComponent.Message) - // Cancel the context to stop the goroutines - fileSBOMConcurrency.Cancel() - // Wait for all goroutines to finish - fileSBOMConcurrency.WaitGroup.Wait() - // Return the error return erroredComponent.Error - // If there is a string in the fileConcurrency.ProgressChan channel we know that an SBOM - // was generated for a file and we can update the spinner - case component := <-fileSBOMConcurrency.ProgressChan: + } + + fileSBOMProgressFunc := func(component string, i int) { builder.spinner.Updatef("Creating component file SBOMs (%d of %d): %s", i, len(componentSBOMs), component) } + + err = utils.WaitForConcurrencyTools(fileSBOMConcurrency, fileSBOMProgressFunc, fileSBOMErrorFunc) + if err != nil { + return err + } } // Include the compare tool if there are any image SBOMs OR component SBOMs diff --git a/src/pkg/utils/concurrency.go b/src/pkg/utils/concurrency.go index d499ebce40..6d3e56c5bb 100644 --- a/src/pkg/utils/concurrency.go +++ b/src/pkg/utils/concurrency.go @@ -16,6 +16,7 @@ type ConcurrencyTools[P any, E any] struct { Context context.Context Cancel context.CancelFunc WaitGroup *sync.WaitGroup + RoutineCount int } func NewConcurrencyTools[P any, E any](length int) *ConcurrencyTools[P, E] { @@ -35,6 +36,7 @@ func NewConcurrencyTools[P any, E any](length int) *ConcurrencyTools[P, E] { Context: ctx, Cancel: cancel, WaitGroup: &waitGroup, + RoutineCount: length, } return &concurrencyTools @@ -48,3 +50,23 @@ func ContextDone(ctx context.Context) bool { return false } } + +func ReturnError(err error) error { + return err +} + +func WaitForConcurrencyTools[P any, E any, PF func(P, int), EF func(E) error](concurrencyTools *ConcurrencyTools[P, E], progressFunc PF, errorFunc EF) error { + for i := 0; i < concurrencyTools.RoutineCount; i++ { + select { + case err := <-concurrencyTools.ErrorChan: + concurrencyTools.Cancel() + errResult := errorFunc(err) + concurrencyTools.WaitGroup.Done() + return errResult + case progress := <-concurrencyTools.ProgressChan: + progressFunc(progress, i) + } + } + concurrencyTools.WaitGroup.Wait() + return nil +} From 56bdadc1ce6e791f13da638631a745dc56a59daa Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Tue, 4 Jul 2023 12:35:05 +0000 Subject: [PATCH 09/18] image pull concurrency! --- src/internal/packager/images/pull.go | 144 ++++++++++++++++++++++----- 1 file changed, 119 insertions(+), 25 deletions(-) diff --git a/src/internal/packager/images/pull.go b/src/internal/packager/images/pull.go index 6d57733529..3ebc8bd8e6 100644 --- a/src/internal/packager/images/pull.go +++ b/src/internal/packager/images/pull.go @@ -24,6 +24,9 @@ import ( v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/cache" "github.com/google/go-containerregistry/pkg/v1/daemon" + "github.com/google/go-containerregistry/pkg/v1/empty" + "github.com/google/go-containerregistry/pkg/v1/layout" + "github.com/google/go-containerregistry/pkg/v1/partial" "github.com/moby/moby/client" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pterm/pterm" @@ -52,31 +55,61 @@ func (i *ImgConfig) PullAll() error { logs.Warn.SetOutput(&message.DebugWriter{}) logs.Progress.SetOutput(&message.DebugWriter{}) - for idx, src := range i.ImgList { - spinner.Updatef("Fetching image metadata (%d of %d): %s", idx+1, imgCount, src) + type srcAndImg struct { + src string + img v1.Image + } - srcParsed, err := transform.ParseImageRef(src) - if err != nil { - return fmt.Errorf("failed to parse image ref %s: %w", src, err) - } + metadataImageConcurrency := utils.NewConcurrencyTools[srcAndImg, error](imgCount) + + defer metadataImageConcurrency.Cancel() - actualSrc := src - if overrideHost, present := i.RegistryOverrides[srcParsed.Host]; present { - actualSrc, err = transform.ImageTransformHostWithoutChecksum(overrideHost, src) + spinner.Updatef("Fetching image metadata (0 of %d)", imgCount) + + // Spawn a goroutine for each image to load its metadata + for _, src := range i.ImgList { + // Create a closure so that we can pass the src into the goroutine + src := src + go func() { + // Make sure to call Done() on the WaitGroup when the goroutine finishes + defer metadataImageConcurrency.WaitGroup.Done() + + srcParsed, err := transform.ParseImageRef(src) if err != nil { - return fmt.Errorf("failed to swap override host %s for %s: %w", overrideHost, src, err) + metadataImageConcurrency.ErrorChan <- fmt.Errorf("failed to parse image ref %s: %w", src, err) + return } - } - img, err := i.PullImage(actualSrc, spinner) - if err != nil { - return fmt.Errorf("failed to pull image %s: %w", actualSrc, err) - } - imageMap[src] = img + actualSrc := src + if overrideHost, present := i.RegistryOverrides[srcParsed.Host]; present { + actualSrc, err = transform.ImageTransformHostWithoutChecksum(overrideHost, src) + if err != nil { + metadataImageConcurrency.ErrorChan <- fmt.Errorf("failed to swap override host %s for %s: %w", overrideHost, src, err) + return + } + } + + img, err := i.PullImage(actualSrc, spinner) + if err != nil { + metadataImageConcurrency.ErrorChan <- fmt.Errorf("failed to pull image %s: %w", actualSrc, err) + return + } + metadataImageConcurrency.ProgressChan <- srcAndImg{src: src, img: img} + }() + } + + progressFunc := func(finishedImage srcAndImg, iteration int) { + spinner.Updatef("Fetching image metadata (%d of %d): %s", iteration+1, imgCount, finishedImage.src) + imageMap[finishedImage.src] = finishedImage.img + } + + err := utils.WaitForConcurrencyTools(metadataImageConcurrency, progressFunc, utils.ReturnError) + if err != nil { + return err } // Create the ImagePath directory - err := os.Mkdir(i.ImagesPath, 0755) + err = os.Mkdir(i.ImagesPath, 0755) if err != nil && !errors.Is(err, os.ErrExist) { return fmt.Errorf("failed to create image path %s: %w", i.ImagesPath, err) } @@ -121,22 +154,83 @@ func (i *ImgConfig) PullAll() error { wg.Add(1) go utils.RenderProgressBarForLocalDirWrite(i.ImagesPath, totalBytes, &wg, doneSaving, fmt.Sprintf("Pulling %d images", imgCount)) - for tag, img := range tagToImage { - // Save the image - err := crane.SaveOCI(img, i.ImagesPath) + type digestAndTag struct { + digest string + tag string + } + + // Create special sauce crane Path object + + // If it already exists use it + cranePath, err := layout.FromPath(i.ImagesPath) + // Use crane pattern for creating OCI layout if it doesn't exist + if err != nil { + // If it doesn't exist create it + cranePath, err = layout.Write(i.ImagesPath, empty.Index) if err != nil { - // Check if the cache has been invalidated, and warn the user if so - if strings.HasPrefix(err.Error(), "error writing layer: expected blob size") { - message.Warnf("Potential image cache corruption: %s - try clearing cache with \"zarf tools clear-cache\"", err.Error()) + return err + } + } + + imageSavingConcurrency := utils.NewConcurrencyTools[digestAndTag, error](len(tagToImage)) + + defer imageSavingConcurrency.Cancel() + + // Spawn a goroutine for each image to write it's layers/manifests/etc to disk using crane + for tag, img := range tagToImage { + // Create a closure so that we can pass the tag and img into the goroutine + tag, img := tag, img + go func() { + // Make sure to call Done() on the WaitGroup when the goroutine finishes + defer imageSavingConcurrency.WaitGroup.Done() + // Save the image via crane + err := cranePath.WriteImage(img) + if err != nil { + // Check if the cache has been invalidated, and warn the user if so + if strings.HasPrefix(err.Error(), "error writing layer: expected blob size") { + message.Warnf("Potential image cache corruption: %s - try clearing cache with \"zarf tools clear-cache\"", err.Error()) + } + imageSavingConcurrency.ErrorChan <- fmt.Errorf("error when trying to save the img (%s): %w", tag.Name(), err) + return } - return fmt.Errorf("error when trying to save the img (%s): %w", tag.Name(), err) + + // Get the image digest so we can set an annotation in the image.json later + imgDigest, err := img.Digest() + if err != nil { + imageSavingConcurrency.ErrorChan <- err + return + } + imageSavingConcurrency.ProgressChan <- digestAndTag{digest: imgDigest.String(), tag: tag.String()} + }() + } + + imageProgressFunc := func(finishedImage digestAndTag, iteration int) { + tagToDigest[finishedImage.tag] = finishedImage.digest + } + + err = utils.WaitForConcurrencyTools(imageSavingConcurrency, imageProgressFunc, utils.ReturnError) + if err != nil { + return err + } + + // for every image sequentially append OCI descriptor + + for tag, img := range tagToImage { + desc, err := partial.Descriptor(img) + if err != nil { + return err + } + + cranePath.AppendDescriptor(*desc) + if err != nil { + return err } - // Get the image digest so we can set an annotation in the image.json later imgDigest, err := img.Digest() if err != nil { return err } + tagToDigest[tag.String()] = imgDigest.String() } From 8d7057eef640a9fc689b580ea917378579cc193e Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Wed, 5 Jul 2023 17:27:50 +0000 Subject: [PATCH 10/18] wrong length ref --- src/internal/packager/images/pull.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/internal/packager/images/pull.go b/src/internal/packager/images/pull.go index 3ebc8bd8e6..2ade646269 100644 --- a/src/internal/packager/images/pull.go +++ b/src/internal/packager/images/pull.go @@ -60,11 +60,11 @@ func (i *ImgConfig) PullAll() error { img v1.Image } - metadataImageConcurrency := utils.NewConcurrencyTools[srcAndImg, error](imgCount) + metadataImageConcurrency := utils.NewConcurrencyTools[srcAndImg, error](len(i.ImgList)) defer metadataImageConcurrency.Cancel() - spinner.Updatef("Fetching image metadata (0 of %d)", imgCount) + spinner.Updatef("Fetching image metadata (0 of %d)", len(i.ImgList)) // Spawn a goroutine for each image to load its metadata for _, src := range i.ImgList { @@ -99,7 +99,7 @@ func (i *ImgConfig) PullAll() error { } progressFunc := func(finishedImage srcAndImg, iteration int) { - spinner.Updatef("Fetching image metadata (%d of %d): %s", iteration+1, imgCount, finishedImage.src) + spinner.Updatef("Fetching image metadata (%d of %d): %s", iteration+1, len(i.ImgList), finishedImage.src) imageMap[finishedImage.src] = finishedImage.img } From f28e8d5994df50ac0a444fb8a7c085b4475d94fa Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Thu, 17 Aug 2023 01:29:43 +0000 Subject: [PATCH 11/18] remove sbom concurrency and fix image concurrency --- src/internal/packager/images/pull.go | 145 ++++++++++++++++++++++++- src/internal/packager/sbom/catalog.go | 150 ++++++-------------------- 2 files changed, 172 insertions(+), 123 deletions(-) diff --git a/src/internal/packager/images/pull.go b/src/internal/packager/images/pull.go index 0bdde91f45..cbce1381cd 100644 --- a/src/internal/packager/images/pull.go +++ b/src/internal/packager/images/pull.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "io" "os" "path/filepath" "strings" @@ -26,6 +27,7 @@ import ( "github.com/google/go-containerregistry/pkg/v1/empty" "github.com/google/go-containerregistry/pkg/v1/layout" "github.com/google/go-containerregistry/pkg/v1/partial" + "github.com/google/go-containerregistry/pkg/v1/stream" "github.com/moby/moby/client" "github.com/pterm/pterm" ) @@ -54,8 +56,8 @@ func (i *ImgConfig) PullAll() error { logs.Progress.SetOutput(&message.DebugWriter{}) type srcAndImg struct { - src string - img v1.Image + src string + img v1.Image } metadataImageConcurrency := utils.NewConcurrencyTools[srcAndImg, error](len(i.ImgList)) @@ -74,7 +76,7 @@ func (i *ImgConfig) PullAll() error { srcParsed, err := transform.ParseImageRef(src) if err != nil { - metadataImageConcurrency.ErrorChan <- fmt.Errorf("failed to parse image ref %s: %w", src, err) + metadataImageConcurrency.ErrorChan <- fmt.Errorf("failed to parse image ref %s: %w", src, err) return } @@ -144,7 +146,6 @@ func (i *ImgConfig) PullAll() error { } } spinner.Updatef("Preparing image sources and cache for image pulling") - spinner.Success() // Create a thread to update a progress bar as we save the image files to disk doneSaving := make(chan int) @@ -170,11 +171,145 @@ func (i *ImgConfig) PullAll() error { } } + dedupedLayers := make(map[string]v1.Layer) + + for tag, img := range tagToImage { + imgDigest, err := img.Digest() + if err != nil { + return fmt.Errorf("unable to get digest for image %s: %w", tag, err) + } + tagToDigest[tag.String()] = imgDigest.String() + + layers, err := img.Layers() + if err != nil { + return fmt.Errorf("unable to get layers for image %s: %w", tag, err) + } + for _, layer := range layers { + hash, err := layer.Digest() + if err != nil { + return fmt.Errorf("unable to get digest for image layer: %w", err) + } + dedupedLayers[hash.Hex] = layer + } + } + + spinner.Success() + + // Spawn a goroutine for each layer to write it to disk using crane + + layerWritingConcurrency := utils.NewConcurrencyTools[bool, error](len(dedupedLayers)) + + defer layerWritingConcurrency.Cancel() + + for _, layer := range dedupedLayers { + layer := layer + // Function is a combination of https://github.com/google/go-containerregistry/blob/v0.15.2/pkg/v1/layout/write.go#L270-L305 + // and https://github.com/google/go-containerregistry/blob/v0.15.2/pkg/v1/layout/write.go#L198-L262 + // with modifications. This allows us to dedupe layers for all images and write them concurrently. + go func() { + defer layerWritingConcurrency.WaitGroup.Done() + digest, err := layer.Digest() + if errors.Is(err, stream.ErrNotComputed) { + // Allow digest errors, since streams may not have calculated the hash + // yet. Instead, use an empty value, which will be transformed into a + // random file name with `os.CreateTemp` and the final digest will be + // calculated after writing to a temp file and before renaming to the + // final path. + digest = v1.Hash{Algorithm: "sha256", Hex: ""} + } else if err != nil { + layerWritingConcurrency.ErrorChan <- err + } + + size, err := layer.Size() + if errors.Is(err, stream.ErrNotComputed) { + // Allow size errors, since streams may not have calculated the size + // yet. Instead, use zero as a sentinel value meaning that no size + // comparison can be done and any sized blob file should be considered + // valid and not overwritten. + // + // TODO: Provide an option to always overwrite blobs. + size = -1 + } else if err != nil { + layerWritingConcurrency.ErrorChan <- err + } + + readCloser, err := layer.Compressed() + if err != nil { + layerWritingConcurrency.ErrorChan <- err + } + + // Get the file path for the cranePath + completePath := []string{string(cranePath)} + + // Create the directory for the blob if it doesn't exist + dir := filepath.Join(append(completePath, "blobs", digest.Algorithm)...) + if err := os.MkdirAll(dir, os.ModePerm); err != nil && !os.IsExist(err) { + layerWritingConcurrency.ErrorChan <- err + } + + // Check if blob already exists and is the correct size + file := filepath.Join(dir, digest.Hex) + if s, err := os.Stat(file); err == nil && !s.IsDir() && (s.Size() == size || size == -1) { + return + } + + // Write to a temporary file + w, err := os.CreateTemp(dir, digest.Hex) + if err != nil { + layerWritingConcurrency.ErrorChan <- err + } + // Delete temp file if an error is encountered before renaming + defer func() { + if err := os.Remove(w.Name()); err != nil && !errors.Is(err, os.ErrNotExist) { + logs.Warn.Printf("error removing temporary file after encountering an error while writing blob: %v", err) + } + }() + + defer w.Close() + + // Write to file rename + if n, err := io.Copy(w, readCloser); err != nil { + layerWritingConcurrency.ErrorChan <- err + } else if size != -1 && n != size { + layerWritingConcurrency.ErrorChan <- fmt.Errorf("expected blob size %d, but only wrote %d", size, n) + } + + // Always close reader before renaming, since Close computes the digest in + // the case of streaming layers. If Close is not called explicitly, it will + // occur in a goroutine that is not guaranteed to succeed before renamer is + // called. When renamer is the layer's Digest method, it can return + // ErrNotComputed. + if err := readCloser.Close(); err != nil { + layerWritingConcurrency.ErrorChan <- err + } + + // Always close file before renaming + if err := w.Close(); err != nil { + layerWritingConcurrency.ErrorChan <- err + } + + // Rename file based on the final hash + renamePath := filepath.Join(append(completePath, "blobs", digest.Algorithm, digest.Hex)...) + os.Rename(w.Name(), renamePath) + + layerWritingConcurrency.ProgressChan <- true + }() + } + + err = utils.WaitForConcurrencyTools(layerWritingConcurrency, func(b bool, i int) {}, utils.ReturnError) + if err != nil { + if strings.HasPrefix(err.Error(), "error writing layer: expected blob size") { + message.Warnf("Potential image cache corruption: %s - try clearing cache with \"zarf tools clear-cache\"", err.Error()) + } + return err + } + imageSavingConcurrency := utils.NewConcurrencyTools[digestAndTag, error](len(tagToImage)) defer imageSavingConcurrency.Cancel() - // Spawn a goroutine for each image to write it's layers/manifests/etc to disk using crane + // Spawn a goroutine for each image to write it's config and manifest to disk using crane + // All layers should already be in place so this should be extremely fast for tag, img := range tagToImage { // Create a closure so that we can pass the tag and img into the goroutine tag, img := tag, img diff --git a/src/internal/packager/sbom/catalog.go b/src/internal/packager/sbom/catalog.go index bf2f24514b..e0fee676f6 100755 --- a/src/internal/packager/sbom/catalog.go +++ b/src/internal/packager/sbom/catalog.go @@ -70,141 +70,55 @@ func Catalog(componentSBOMs map[string]*types.ComponentSBOM, imgList []string, t } builder.jsonList = json - if len(imgList) > 0 { - // Generate SBOM for all images at once using goroutines - // Use the ConcurrencyTools part of the utils package to help with concurrency - imageSBOMConcurrency := utils.NewConcurrencyTools[string, message.ErrorWithMessage](len(imgList)) - - // Make sure cancel is always called - defer imageSBOMConcurrency.Cancel() - - // Call a goroutine for each image - for _, tag := range imgList { - currentTag := tag - go func() { - // Make sure to call Done() on the WaitGroup when the goroutine finishes - defer imageSBOMConcurrency.WaitGroup.Done() - // Get the image that we are creating an SBOM for - img, err := utils.LoadOCIImage(tmpPaths.Images, currentTag) - if err != nil { - imageSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: "Unable to load the image to generate an SBOM"} - return - } - - // If the context has been cancelled end the goroutine - if utils.ContextDone(imageSBOMConcurrency.Context) { - return - } - - // Generate the SBOM JSON for the given image - jsonData, err := builder.createImageSBOM(img, currentTag) - if err != nil { - imageSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: fmt.Sprintf("Unable to create SBOM for image %s", currentTag)} - return - } - - // If the context has been cancelled end the goroutine - if utils.ContextDone(imageSBOMConcurrency.Context) { - return - } - - // Create the SBOM viewer HTML for the given image - if err = builder.createSBOMViewerAsset(currentTag, jsonData); err != nil { - imageSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: fmt.Sprintf("Unable to create SBOM viewer for image %s", currentTag)} - return - } - - // If the context has been cancelled end the goroutine - if utils.ContextDone(imageSBOMConcurrency.Context) { - return - } - - // Call the progress channel to let us know that the SBOM generation is done for this image - imageSBOMConcurrency.ProgressChan <- currentTag - }() - } - - imageSBOMErrorFunc := func(erroredImage message.ErrorWithMessage) error { - builder.spinner.Errorf(erroredImage.Error, erroredImage.Message) - return erroredImage.Error - } + // Generate SBOM for each image + currImage := 1 + for _, tag := range imgList { + builder.spinner.Updatef("Creating image SBOMs (%d of %d): %s", currImage, imageCount, tag) - imageSBOMProgressFunc := func(tag string, i int) { - builder.spinner.Updatef("Creating image SBOMs (%d of %d): %s", i, len(imgList), tag) + // Get the image that we are creating an SBOM for + img, err := utils.LoadOCIImage(tmpPaths.Images, tag) + if err != nil { + builder.spinner.Errorf(err, "Unable to load the image to generate an SBOM") + return err } - err = utils.WaitForConcurrencyTools(imageSBOMConcurrency, imageSBOMProgressFunc, imageSBOMErrorFunc) + jsonData, err := builder.createImageSBOM(img, tag) if err != nil { + builder.spinner.Errorf(err, "Unable to create SBOM for image %s", tag) return err } - } - - // Generate SBOM for all components' files/dataInjections at once using goroutines - - if len(componentSBOMs) > 0 { - builder.spinner.Updatef("Creating component file SBOMs (0 of %d)", len(componentSBOMs)) - // Use the ConcurrencyTools part of the utils package to help with concurrency - fileSBOMConcurrency := utils.NewConcurrencyTools[string, message.ErrorWithMessage](len(componentSBOMs)) - - for component := range componentSBOMs { - currentComponent := component - go func() { - // Make sure to call Done() on the WaitGroup when the goroutine finishes - defer fileSBOMConcurrency.WaitGroup.Done() - - // Check if component requires SBOM generation - if componentSBOMs[currentComponent] == nil { - message.Debugf("Component %s has invalid SBOM, skipping", currentComponent) - return - } - - // If the context has been cancelled end the goroutine - if utils.ContextDone(fileSBOMConcurrency.Context) { - return - } - - // Generate the SBOM JSON for the given component - jsonData, err := builder.createFileSBOM(*componentSBOMs[currentComponent], currentComponent) - if err != nil { - fileSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: fmt.Sprintf("Unable to create SBOM for component %s", currentComponent)} - return - } - - // If the context has been cancelled end the goroutine - if utils.ContextDone(fileSBOMConcurrency.Context) { - return - } + if err = builder.createSBOMViewerAsset(tag, jsonData); err != nil { + builder.spinner.Errorf(err, "Unable to create SBOM viewer for image %s", tag) + return err + } - // Create the SBOM viewer HTML for the given component - if err = builder.createSBOMViewerAsset(fmt.Sprintf("%s%s", componentPrefix, currentComponent), jsonData); err != nil { - fileSBOMConcurrency.ErrorChan <- message.ErrorWithMessage{Error: err, Message: fmt.Sprintf("Unable to create SBOM viewer for component %s", currentComponent)} - return - } + currImage++ + } - // If the context has been cancelled end the goroutine - if utils.ContextDone(fileSBOMConcurrency.Context) { - return - } + currComponent := 1 - // Call the progress channel to let us know that the SBOM generation is done for this component - fileSBOMConcurrency.ProgressChan <- currentComponent - }() - } + // Generate SBOM for each component + for component := range componentSBOMs { + builder.spinner.Updatef("Creating component file SBOMs (%d of %d): %s", currComponent, componentCount, component) - fileSBOMErrorFunc := func(erroredComponent message.ErrorWithMessage) error { - builder.spinner.Errorf(erroredComponent.Error, erroredComponent.Message) - return erroredComponent.Error + if componentSBOMs[component] == nil { + message.Debugf("Component %s has invalid SBOM, skipping", component) + continue } - fileSBOMProgressFunc := func(component string, i int) { - builder.spinner.Updatef("Creating component file SBOMs (%d of %d): %s", i, len(componentSBOMs), component) + jsonData, err := builder.createFileSBOM(*componentSBOMs[component], component) + if err != nil { + builder.spinner.Errorf(err, "Unable to create SBOM for component %s", component) + return err } - err = utils.WaitForConcurrencyTools(fileSBOMConcurrency, fileSBOMProgressFunc, fileSBOMErrorFunc) - if err != nil { + if err = builder.createSBOMViewerAsset(fmt.Sprintf("%s%s", componentPrefix, component), jsonData); err != nil { + builder.spinner.Errorf(err, "Unable to create SBOM viewer for component %s", component) return err } + + currComponent++ } // Include the compare tool if there are any image SBOMs OR component SBOMs From a11d8127d8c90a323e13b5250f0973bc1e3d18b2 Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Thu, 17 Aug 2023 01:47:11 +0000 Subject: [PATCH 12/18] cleanup image pulling --- src/internal/packager/images/pull.go | 36 +++++++++------------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/src/internal/packager/images/pull.go b/src/internal/packager/images/pull.go index cbce1381cd..6fadf65fbb 100644 --- a/src/internal/packager/images/pull.go +++ b/src/internal/packager/images/pull.go @@ -115,7 +115,7 @@ func (i *ImgConfig) PullAll() error { } totalBytes := int64(0) - processedLayers := make(map[string]bool) + processedLayers := make(map[string]v1.Layer) for src, img := range imageMap { tag, err := name.NewTag(src, name.WeakValidation) if err != nil { @@ -134,25 +134,19 @@ func (i *ImgConfig) PullAll() error { } // Only calculate this layer size if we haven't already looked at it - if !processedLayers[layerDigest.Hex] { + if processedLayers[layerDigest.Hex] == nil { size, err := layer.Size() if err != nil { return fmt.Errorf("unable to get size of layer: %w", err) } totalBytes += size - processedLayers[layerDigest.Hex] = true + processedLayers[layerDigest.Hex] = layer } } } spinner.Updatef("Preparing image sources and cache for image pulling") - // Create a thread to update a progress bar as we save the image files to disk - doneSaving := make(chan int) - var wg sync.WaitGroup - wg.Add(1) - go utils.RenderProgressBarForLocalDirWrite(i.ImagesPath, totalBytes, &wg, doneSaving, fmt.Sprintf("Pulling %d images", imgCount)) - type digestAndTag struct { digest string tag string @@ -171,37 +165,29 @@ func (i *ImgConfig) PullAll() error { } } - dedupedLayers := make(map[string]v1.Layer) - for tag, img := range tagToImage { imgDigest, err := img.Digest() if err != nil { return fmt.Errorf("unable to get digest for image %s: %w", tag, err) } tagToDigest[tag.String()] = imgDigest.String() - - layers, err := img.Layers() - if err != nil { - return fmt.Errorf("unable to get layers for image %s: %w", tag, err) - } - for _, layer := range layers { - hash, err := layer.Digest() - if err != nil { - return fmt.Errorf("unable to get digest for image layer: %w", err) - } - dedupedLayers[hash.Hex] = layer - } } spinner.Success() + // Create a thread to update a progress bar as we save the image files to disk + doneSaving := make(chan int) + var wg sync.WaitGroup + wg.Add(1) + go utils.RenderProgressBarForLocalDirWrite(i.ImagesPath, totalBytes, &wg, doneSaving, fmt.Sprintf("Pulling %d images", imgCount)) + // Spawn a goroutine for each layer to write it to disk using crane - layerWritingConcurrency := utils.NewConcurrencyTools[bool, error](len(dedupedLayers)) + layerWritingConcurrency := utils.NewConcurrencyTools[bool, error](len(processedLayers)) defer layerWritingConcurrency.Cancel() - for _, layer := range dedupedLayers { + for _, layer := range processedLayers { layer := layer // Function is a combination of https://github.com/google/go-containerregistry/blob/v0.15.2/pkg/v1/layout/write.go#L270-L305 // and https://github.com/google/go-containerregistry/blob/v0.15.2/pkg/v1/layout/write.go#L198-L262 From 67c14e73e8602feeaa92ac055fc6cf836c09e36a Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Thu, 17 Aug 2023 02:59:27 +0000 Subject: [PATCH 13/18] bugfix and cleanliness --- src/internal/packager/images/pull.go | 79 +++++++++++++++++++++++++++- src/pkg/utils/concurrency.go | 1 - 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/src/internal/packager/images/pull.go b/src/internal/packager/images/pull.go index 6fadf65fbb..bad4498491 100644 --- a/src/internal/packager/images/pull.go +++ b/src/internal/packager/images/pull.go @@ -80,6 +80,10 @@ func (i *ImgConfig) PullAll() error { return } + if utils.ContextDone(metadataImageConcurrency.Context) { + return + } + actualSrc := src if overrideHost, present := i.RegistryOverrides[srcParsed.Host]; present { actualSrc, err = transform.ImageTransformHostWithoutChecksum(overrideHost, src) @@ -89,11 +93,20 @@ func (i *ImgConfig) PullAll() error { } } + if utils.ContextDone(metadataImageConcurrency.Context) { + return + } + img, err := i.PullImage(actualSrc, spinner) if err != nil { metadataImageConcurrency.ErrorChan <- fmt.Errorf("failed to pull image %s: %w", actualSrc, err) return } + + if utils.ContextDone(metadataImageConcurrency.Context) { + return + } + metadataImageConcurrency.ProgressChan <- srcAndImg{src: src, img: img} }() } @@ -105,6 +118,7 @@ func (i *ImgConfig) PullAll() error { err := utils.WaitForConcurrencyTools(metadataImageConcurrency, progressFunc, utils.ReturnError) if err != nil { + spinner.Warnf("Failed to load metadata for all images. This may be due to a network error or an invalid image reference.") return err } @@ -179,7 +193,7 @@ func (i *ImgConfig) PullAll() error { doneSaving := make(chan int) var wg sync.WaitGroup wg.Add(1) - go utils.RenderProgressBarForLocalDirWrite(i.ImagesPath, totalBytes, &wg, doneSaving, fmt.Sprintf("Pulling %d images", imgCount)) + go utils.RenderProgressBarForLocalDirWrite(i.ImagesPath, totalBytes, &wg, doneSaving, fmt.Sprintf("Pulling %d images", imgCount)) // Send a signal to the progress bar that we're done and ait for the thread to finish // Spawn a goroutine for each layer to write it to disk using crane @@ -204,6 +218,7 @@ func (i *ImgConfig) PullAll() error { digest = v1.Hash{Algorithm: "sha256", Hex: ""} } else if err != nil { layerWritingConcurrency.ErrorChan <- err + return } size, err := layer.Size() @@ -217,11 +232,17 @@ func (i *ImgConfig) PullAll() error { size = -1 } else if err != nil { layerWritingConcurrency.ErrorChan <- err + return + } + + if utils.ContextDone(layerWritingConcurrency.Context) { + return } readCloser, err := layer.Compressed() if err != nil { layerWritingConcurrency.ErrorChan <- err + return } // Get the file path for the cranePath @@ -231,11 +252,21 @@ func (i *ImgConfig) PullAll() error { dir := filepath.Join(append(completePath, "blobs", digest.Algorithm)...) if err := os.MkdirAll(dir, os.ModePerm); err != nil && !os.IsExist(err) { layerWritingConcurrency.ErrorChan <- err + return + } + + if utils.ContextDone(layerWritingConcurrency.Context) { + return } // Check if blob already exists and is the correct size file := filepath.Join(dir, digest.Hex) if s, err := os.Stat(file); err == nil && !s.IsDir() && (s.Size() == size || size == -1) { + layerWritingConcurrency.ProgressChan <- true + return + } + + if utils.ContextDone(layerWritingConcurrency.Context) { return } @@ -243,6 +274,7 @@ func (i *ImgConfig) PullAll() error { w, err := os.CreateTemp(dir, digest.Hex) if err != nil { layerWritingConcurrency.ErrorChan <- err + return } // Delete temp file if an error is encountered before renaming defer func() { @@ -253,11 +285,21 @@ func (i *ImgConfig) PullAll() error { defer w.Close() + if utils.ContextDone(layerWritingConcurrency.Context) { + return + } + // Write to file rename if n, err := io.Copy(w, readCloser); err != nil { layerWritingConcurrency.ErrorChan <- err + return } else if size != -1 && n != size { layerWritingConcurrency.ErrorChan <- fmt.Errorf("expected blob size %d, but only wrote %d", size, n) + return + } + + if utils.ContextDone(layerWritingConcurrency.Context) { + return } // Always close reader before renaming, since Close computes the digest in @@ -267,23 +309,33 @@ func (i *ImgConfig) PullAll() error { // ErrNotComputed. if err := readCloser.Close(); err != nil { layerWritingConcurrency.ErrorChan <- err + return } // Always close file before renaming if err := w.Close(); err != nil { layerWritingConcurrency.ErrorChan <- err + return } // Rename file based on the final hash renamePath := filepath.Join(append(completePath, "blobs", digest.Algorithm, digest.Hex)...) os.Rename(w.Name(), renamePath) + if utils.ContextDone(layerWritingConcurrency.Context) { + return + } + layerWritingConcurrency.ProgressChan <- true }() } err = utils.WaitForConcurrencyTools(layerWritingConcurrency, func(b bool, i int) {}, utils.ReturnError) if err != nil { + // Send a signal to the progress bar that we're done and wait for the thread to finish + doneSaving <- 1 + wg.Wait() + message.WarnErr(err, "Failed to download and write layers, trying again up to 3 times...") if strings.HasPrefix(err.Error(), "error writing layer: expected blob size") { message.Warnf("Potential image cache corruption: %s - try clearing cache with \"zarf tools clear-cache\"", err.Error()) } @@ -302,8 +354,18 @@ func (i *ImgConfig) PullAll() error { go func() { // Make sure to call Done() on the WaitGroup when the goroutine finishes defer imageSavingConcurrency.WaitGroup.Done() + + if utils.ContextDone(imageSavingConcurrency.Context) { + return + } + // Save the image via crane err := cranePath.WriteImage(img) + + if utils.ContextDone(imageSavingConcurrency.Context) { + return + } + if err != nil { // Check if the cache has been invalidated, and warn the user if so if strings.HasPrefix(err.Error(), "error writing layer: expected blob size") { @@ -313,12 +375,21 @@ func (i *ImgConfig) PullAll() error { return } + if utils.ContextDone(imageSavingConcurrency.Context) { + return + } + // Get the image digest so we can set an annotation in the image.json later imgDigest, err := img.Digest() if err != nil { imageSavingConcurrency.ErrorChan <- err return } + + if utils.ContextDone(imageSavingConcurrency.Context) { + return + } + imageSavingConcurrency.ProgressChan <- digestAndTag{digest: imgDigest.String(), tag: tag.String()} }() } @@ -329,6 +400,10 @@ func (i *ImgConfig) PullAll() error { err = utils.WaitForConcurrencyTools(imageSavingConcurrency, imageProgressFunc, utils.ReturnError) if err != nil { + // Send a signal to the progress bar that we're done and wait for the thread to finish + doneSaving <- 1 + wg.Wait() + message.WarnErr(err, "Failed to write image config or manifest, trying again up to 3 times...") return err } @@ -357,7 +432,7 @@ func (i *ImgConfig) PullAll() error { return fmt.Errorf("unable to format OCI layout: %w", err) } - // Send a signal to the progress bar that we're done and ait for the thread to finish + // Send a signal to the progress bar that we're done and wait for the thread to finish doneSaving <- 1 wg.Wait() diff --git a/src/pkg/utils/concurrency.go b/src/pkg/utils/concurrency.go index 6d3e56c5bb..492f67f298 100644 --- a/src/pkg/utils/concurrency.go +++ b/src/pkg/utils/concurrency.go @@ -61,7 +61,6 @@ func WaitForConcurrencyTools[P any, E any, PF func(P, int), EF func(E) error](co case err := <-concurrencyTools.ErrorChan: concurrencyTools.Cancel() errResult := errorFunc(err) - concurrencyTools.WaitGroup.Done() return errResult case progress := <-concurrencyTools.ProgressChan: progressFunc(progress, i) From cdb100246a2468b58d1d6ed2637b6da2212a3900 Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Thu, 7 Sep 2023 01:23:02 +0000 Subject: [PATCH 14/18] small bugfixes and cleanup --- src/internal/packager/images/pull.go | 34 +++++++++++----------------- src/pkg/message/message.go | 6 ----- src/pkg/utils/concurrency.go | 4 ++++ 3 files changed, 17 insertions(+), 27 deletions(-) diff --git a/src/internal/packager/images/pull.go b/src/internal/packager/images/pull.go index bad4498491..877b73a42d 100644 --- a/src/internal/packager/images/pull.go +++ b/src/internal/packager/images/pull.go @@ -148,7 +148,7 @@ func (i *ImgConfig) PullAll() error { } // Only calculate this layer size if we haven't already looked at it - if processedLayers[layerDigest.Hex] == nil { + if _, ok := processedLayers[layerDigest.Hex]; !ok { size, err := layer.Size() if err != nil { return fmt.Errorf("unable to get size of layer: %w", err) @@ -167,7 +167,6 @@ func (i *ImgConfig) PullAll() error { } // Create special sauce crane Path object - // If it already exists use it cranePath, err := layout.FromPath(i.ImagesPath) // Use crane pattern for creating OCI layout if it doesn't exist @@ -191,9 +190,9 @@ func (i *ImgConfig) PullAll() error { // Create a thread to update a progress bar as we save the image files to disk doneSaving := make(chan int) - var wg sync.WaitGroup - wg.Add(1) - go utils.RenderProgressBarForLocalDirWrite(i.ImagesPath, totalBytes, &wg, doneSaving, fmt.Sprintf("Pulling %d images", imgCount)) // Send a signal to the progress bar that we're done and ait for the thread to finish + var progressBarWaitGroup sync.WaitGroup + progressBarWaitGroup.Add(1) + go utils.RenderProgressBarForLocalDirWrite(i.ImagesPath, totalBytes, &progressBarWaitGroup, doneSaving, fmt.Sprintf("Pulling %d images", imgCount)) // Spawn a goroutine for each layer to write it to disk using crane @@ -224,7 +223,7 @@ func (i *ImgConfig) PullAll() error { size, err := layer.Size() if errors.Is(err, stream.ErrNotComputed) { // Allow size errors, since streams may not have calculated the size - // yet. Instead, use zero as a sentinel value meaning that no size + // yet. Instead, use -1 as a sentinel value meaning that no size // comparison can be done and any sized blob file should be considered // valid and not overwritten. // @@ -245,11 +244,8 @@ func (i *ImgConfig) PullAll() error { return } - // Get the file path for the cranePath - completePath := []string{string(cranePath)} - // Create the directory for the blob if it doesn't exist - dir := filepath.Join(append(completePath, "blobs", digest.Algorithm)...) + dir := filepath.Join(string(cranePath), "blobs", digest.Algorithm) if err := os.MkdirAll(dir, os.ModePerm); err != nil && !os.IsExist(err) { layerWritingConcurrency.ErrorChan <- err return @@ -279,7 +275,7 @@ func (i *ImgConfig) PullAll() error { // Delete temp file if an error is encountered before renaming defer func() { if err := os.Remove(w.Name()); err != nil && !errors.Is(err, os.ErrNotExist) { - logs.Warn.Printf("error removing temporary file after encountering an error while writing blob: %v", err) + message.Warnf("error removing temporary file after encountering an error while writing blob: %v", err) } }() @@ -319,7 +315,7 @@ func (i *ImgConfig) PullAll() error { } // Rename file based on the final hash - renamePath := filepath.Join(append(completePath, "blobs", digest.Algorithm, digest.Hex)...) + renamePath := filepath.Join(string(cranePath), "blobs", digest.Algorithm, digest.Hex) os.Rename(w.Name(), renamePath) if utils.ContextDone(layerWritingConcurrency.Context) { @@ -334,9 +330,9 @@ func (i *ImgConfig) PullAll() error { if err != nil { // Send a signal to the progress bar that we're done and wait for the thread to finish doneSaving <- 1 - wg.Wait() - message.WarnErr(err, "Failed to download and write layers, trying again up to 3 times...") - if strings.HasPrefix(err.Error(), "error writing layer: expected blob size") { + progressBarWaitGroup.Wait() + message.WarnErr(err, "Failed to write image layers, trying again up to 3 times...") + if strings.HasPrefix(err.Error(), "expected blob size") { message.Warnf("Potential image cache corruption: %s - try clearing cache with \"zarf tools clear-cache\"", err.Error()) } return err @@ -355,10 +351,6 @@ func (i *ImgConfig) PullAll() error { // Make sure to call Done() on the WaitGroup when the goroutine finishes defer imageSavingConcurrency.WaitGroup.Done() - if utils.ContextDone(imageSavingConcurrency.Context) { - return - } - // Save the image via crane err := cranePath.WriteImage(img) @@ -402,7 +394,7 @@ func (i *ImgConfig) PullAll() error { if err != nil { // Send a signal to the progress bar that we're done and wait for the thread to finish doneSaving <- 1 - wg.Wait() + progressBarWaitGroup.Wait() message.WarnErr(err, "Failed to write image config or manifest, trying again up to 3 times...") return err } @@ -434,7 +426,7 @@ func (i *ImgConfig) PullAll() error { // Send a signal to the progress bar that we're done and wait for the thread to finish doneSaving <- 1 - wg.Wait() + progressBarWaitGroup.Wait() return err } diff --git a/src/pkg/message/message.go b/src/pkg/message/message.go index b8038dffef..32151f7a68 100644 --- a/src/pkg/message/message.go +++ b/src/pkg/message/message.go @@ -57,12 +57,6 @@ var useLogFile bool // DebugWriter represents a writer interface that writes to message.Debug type DebugWriter struct{} -// ErrorWithMessage is a simple struct of an error and a string primarily intended for use with channels -type ErrorWithMessage struct { - Error error - Message string -} - func (d *DebugWriter) Write(raw []byte) (int, error) { debugPrinter(2, string(raw)) return len(raw), nil diff --git a/src/pkg/utils/concurrency.go b/src/pkg/utils/concurrency.go index 492f67f298..e05d266079 100644 --- a/src/pkg/utils/concurrency.go +++ b/src/pkg/utils/concurrency.go @@ -19,6 +19,7 @@ type ConcurrencyTools[P any, E any] struct { RoutineCount int } +// NewConcurrencyTools returns a ConcurrencyTools struct that has the given length set for concurrency iterations func NewConcurrencyTools[P any, E any](length int) *ConcurrencyTools[P, E] { ctx, cancel := context.WithCancel(context.Background()) @@ -42,6 +43,7 @@ func NewConcurrencyTools[P any, E any](length int) *ConcurrencyTools[P, E] { return &concurrencyTools } +// ContextDone returns true if the context has been marked as done func ContextDone(ctx context.Context) bool { select { case <-ctx.Done(): @@ -51,10 +53,12 @@ func ContextDone(ctx context.Context) bool { } } +// ReturnError returns the error passed in func ReturnError(err error) error { return err } +// WaitForConcurrencyTools waits for the concurrencyTools passed in to finish or returns the first error it encounters, it calls the errorFunc if an error is encountered and the progressFunc if a progress update is received func WaitForConcurrencyTools[P any, E any, PF func(P, int), EF func(E) error](concurrencyTools *ConcurrencyTools[P, E], progressFunc PF, errorFunc EF) error { for i := 0; i < concurrencyTools.RoutineCount; i++ { select { From 09d5790996a4dc08d30ce732e268a8ad47a5e1fa Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Thu, 7 Sep 2023 01:25:32 +0000 Subject: [PATCH 15/18] forgot a comment --- src/pkg/utils/concurrency.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pkg/utils/concurrency.go b/src/pkg/utils/concurrency.go index e05d266079..a02bfedc25 100644 --- a/src/pkg/utils/concurrency.go +++ b/src/pkg/utils/concurrency.go @@ -10,6 +10,7 @@ import ( "sync" ) +// ConcurrencyTools is a struct that facilitates easier concurrency by providing a context, cancel function, wait group, progress channel, and error channel that is compatible with the WaitForConcurrencyTools function type ConcurrencyTools[P any, E any] struct { ProgressChan chan P ErrorChan chan E From 90a70f267f52b3b2f79a9581ea783428c7d2627c Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Thu, 7 Sep 2023 02:50:40 +0000 Subject: [PATCH 16/18] concurrencyTools refactor --- src/internal/packager/images/pull.go | 58 +++++++++++++----------- src/pkg/utils/concurrency.go | 66 ++++++++++++++++++---------- 2 files changed, 77 insertions(+), 47 deletions(-) diff --git a/src/internal/packager/images/pull.go b/src/internal/packager/images/pull.go index 877b73a42d..1fc61800fa 100644 --- a/src/internal/packager/images/pull.go +++ b/src/internal/packager/images/pull.go @@ -72,7 +72,7 @@ func (i *ImgConfig) PullAll() error { src := src go func() { // Make sure to call Done() on the WaitGroup when the goroutine finishes - defer metadataImageConcurrency.WaitGroup.Done() + defer metadataImageConcurrency.WaitGroupDone() srcParsed, err := transform.ParseImageRef(src) if err != nil { @@ -80,7 +80,7 @@ func (i *ImgConfig) PullAll() error { return } - if utils.ContextDone(metadataImageConcurrency.Context) { + if metadataImageConcurrency.IsDone() { return } @@ -93,7 +93,7 @@ func (i *ImgConfig) PullAll() error { } } - if utils.ContextDone(metadataImageConcurrency.Context) { + if metadataImageConcurrency.IsDone() { return } @@ -103,7 +103,7 @@ func (i *ImgConfig) PullAll() error { return } - if utils.ContextDone(metadataImageConcurrency.Context) { + if metadataImageConcurrency.IsDone() { return } @@ -111,19 +111,21 @@ func (i *ImgConfig) PullAll() error { }() } - progressFunc := func(finishedImage srcAndImg, iteration int) { + onMetadataProgress := func(finishedImage srcAndImg, iteration int) { spinner.Updatef("Fetching image metadata (%d of %d): %s", iteration+1, len(i.ImgList), finishedImage.src) imageMap[finishedImage.src] = finishedImage.img } - err := utils.WaitForConcurrencyTools(metadataImageConcurrency, progressFunc, utils.ReturnError) - if err != nil { - spinner.Warnf("Failed to load metadata for all images. This may be due to a network error or an invalid image reference.") + onMetadataError := func(err error) error { + return fmt.Errorf("Failed to load metadata for all images. This may be due to a network error or an invalid image reference: %w", err) + } + + if err := metadataImageConcurrency.WaitWithProgress(onMetadataProgress, onMetadataError); err != nil { return err } // Create the ImagePath directory - err = os.Mkdir(i.ImagesPath, 0755) + err := os.Mkdir(i.ImagesPath, 0755) if err != nil && !errors.Is(err, os.ErrExist) { return fmt.Errorf("failed to create image path %s: %w", i.ImagesPath, err) } @@ -206,7 +208,7 @@ func (i *ImgConfig) PullAll() error { // and https://github.com/google/go-containerregistry/blob/v0.15.2/pkg/v1/layout/write.go#L198-L262 // with modifications. This allows us to dedupe layers for all images and write them concurrently. go func() { - defer layerWritingConcurrency.WaitGroup.Done() + defer layerWritingConcurrency.WaitGroupDone() digest, err := layer.Digest() if errors.Is(err, stream.ErrNotComputed) { // Allow digest errors, since streams may not have calculated the hash @@ -234,7 +236,7 @@ func (i *ImgConfig) PullAll() error { return } - if utils.ContextDone(layerWritingConcurrency.Context) { + if layerWritingConcurrency.IsDone() { return } @@ -251,7 +253,7 @@ func (i *ImgConfig) PullAll() error { return } - if utils.ContextDone(layerWritingConcurrency.Context) { + if layerWritingConcurrency.IsDone() { return } @@ -262,7 +264,7 @@ func (i *ImgConfig) PullAll() error { return } - if utils.ContextDone(layerWritingConcurrency.Context) { + if layerWritingConcurrency.IsDone() { return } @@ -281,7 +283,7 @@ func (i *ImgConfig) PullAll() error { defer w.Close() - if utils.ContextDone(layerWritingConcurrency.Context) { + if layerWritingConcurrency.IsDone() { return } @@ -294,7 +296,7 @@ func (i *ImgConfig) PullAll() error { return } - if utils.ContextDone(layerWritingConcurrency.Context) { + if layerWritingConcurrency.IsDone() { return } @@ -318,7 +320,7 @@ func (i *ImgConfig) PullAll() error { renamePath := filepath.Join(string(cranePath), "blobs", digest.Algorithm, digest.Hex) os.Rename(w.Name(), renamePath) - if utils.ContextDone(layerWritingConcurrency.Context) { + if layerWritingConcurrency.IsDone() { return } @@ -326,8 +328,7 @@ func (i *ImgConfig) PullAll() error { }() } - err = utils.WaitForConcurrencyTools(layerWritingConcurrency, func(b bool, i int) {}, utils.ReturnError) - if err != nil { + onLayerWritingError := func(err error) error { // Send a signal to the progress bar that we're done and wait for the thread to finish doneSaving <- 1 progressBarWaitGroup.Wait() @@ -338,6 +339,10 @@ func (i *ImgConfig) PullAll() error { return err } + if err := layerWritingConcurrency.WaitWithoutProgress(onLayerWritingError); err != nil { + return err + } + imageSavingConcurrency := utils.NewConcurrencyTools[digestAndTag, error](len(tagToImage)) defer imageSavingConcurrency.Cancel() @@ -349,12 +354,12 @@ func (i *ImgConfig) PullAll() error { tag, img := tag, img go func() { // Make sure to call Done() on the WaitGroup when the goroutine finishes - defer imageSavingConcurrency.WaitGroup.Done() + defer imageSavingConcurrency.WaitGroupDone() // Save the image via crane err := cranePath.WriteImage(img) - if utils.ContextDone(imageSavingConcurrency.Context) { + if imageSavingConcurrency.IsDone() { return } @@ -367,7 +372,7 @@ func (i *ImgConfig) PullAll() error { return } - if utils.ContextDone(imageSavingConcurrency.Context) { + if imageSavingConcurrency.IsDone() { return } @@ -378,7 +383,7 @@ func (i *ImgConfig) PullAll() error { return } - if utils.ContextDone(imageSavingConcurrency.Context) { + if imageSavingConcurrency.IsDone() { return } @@ -386,12 +391,11 @@ func (i *ImgConfig) PullAll() error { }() } - imageProgressFunc := func(finishedImage digestAndTag, iteration int) { + onImageSavingProgress := func(finishedImage digestAndTag, iteration int) { tagToDigest[finishedImage.tag] = finishedImage.digest } - err = utils.WaitForConcurrencyTools(imageSavingConcurrency, imageProgressFunc, utils.ReturnError) - if err != nil { + onImageSavingError := func(err error) error { // Send a signal to the progress bar that we're done and wait for the thread to finish doneSaving <- 1 progressBarWaitGroup.Wait() @@ -399,6 +403,10 @@ func (i *ImgConfig) PullAll() error { return err } + if err := imageSavingConcurrency.WaitWithProgress(onImageSavingProgress, onImageSavingError); err != nil { + return err + } + // for every image sequentially append OCI descriptor for tag, img := range tagToImage { diff --git a/src/pkg/utils/concurrency.go b/src/pkg/utils/concurrency.go index a02bfedc25..9b103319d1 100644 --- a/src/pkg/utils/concurrency.go +++ b/src/pkg/utils/concurrency.go @@ -10,19 +10,19 @@ import ( "sync" ) -// ConcurrencyTools is a struct that facilitates easier concurrency by providing a context, cancel function, wait group, progress channel, and error channel that is compatible with the WaitForConcurrencyTools function +// ConcurrencyTools is a struct that contains channels and a context for use in concurrent routines type ConcurrencyTools[P any, E any] struct { ProgressChan chan P ErrorChan chan E - Context context.Context + context context.Context Cancel context.CancelFunc - WaitGroup *sync.WaitGroup - RoutineCount int + waitGroup *sync.WaitGroup + routineCount int } // NewConcurrencyTools returns a ConcurrencyTools struct that has the given length set for concurrency iterations func NewConcurrencyTools[P any, E any](length int) *ConcurrencyTools[P, E] { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.TODO()) progressChan := make(chan P, length) @@ -35,17 +35,18 @@ func NewConcurrencyTools[P any, E any](length int) *ConcurrencyTools[P, E] { concurrencyTools := ConcurrencyTools[P, E]{ ProgressChan: progressChan, ErrorChan: errorChan, - Context: ctx, + context: ctx, Cancel: cancel, - WaitGroup: &waitGroup, - RoutineCount: length, + waitGroup: &waitGroup, + routineCount: length, } return &concurrencyTools } -// ContextDone returns true if the context has been marked as done -func ContextDone(ctx context.Context) bool { +// IsDone returns true if the context is done. +func (ct *ConcurrencyTools[P, E]) IsDone() bool { + ctx := ct.context select { case <-ctx.Done(): return true @@ -54,23 +55,44 @@ func ContextDone(ctx context.Context) bool { } } -// ReturnError returns the error passed in -func ReturnError(err error) error { - return err +// WaitGroupDone decrements the internal WaitGroup counter by one. +func (ct *ConcurrencyTools[P, E]) WaitGroupDone() { + ct.waitGroup.Done() } -// WaitForConcurrencyTools waits for the concurrencyTools passed in to finish or returns the first error it encounters, it calls the errorFunc if an error is encountered and the progressFunc if a progress update is received -func WaitForConcurrencyTools[P any, E any, PF func(P, int), EF func(E) error](concurrencyTools *ConcurrencyTools[P, E], progressFunc PF, errorFunc EF) error { - for i := 0; i < concurrencyTools.RoutineCount; i++ { +// WaitWithProgress waits for all routines to finish +// +// onProgress is a callback function that is called when a routine sends a progress update +// +// onError is a callback function that is called when a routine sends an error +func (ct *ConcurrencyTools[P, E]) WaitWithProgress(onProgress func(P, int), onError func(E) error) error { + for i := 0; i < ct.routineCount; i++ { select { - case err := <-concurrencyTools.ErrorChan: - concurrencyTools.Cancel() - errResult := errorFunc(err) + case err := <-ct.ErrorChan: + ct.Cancel() + errResult := onError(err) return errResult - case progress := <-concurrencyTools.ProgressChan: - progressFunc(progress, i) + case progress := <-ct.ProgressChan: + onProgress(progress, i) } } - concurrencyTools.WaitGroup.Wait() + ct.waitGroup.Wait() + return nil +} + +// WaitWithoutProgress waits for all routines to finish without a progress callback +// +// onError is a callback function that is called when a routine sends an error +func (ct *ConcurrencyTools[P, E]) WaitWithoutProgress(onError func(E) error) error { + for i := 0; i < ct.routineCount; i++ { + select { + case err := <-ct.ErrorChan: + ct.Cancel() + errResult := onError(err) + return errResult + case <-ct.ProgressChan: + } + } + ct.waitGroup.Wait() return nil } From 461d889afc5ebe7d46016012ceba24ea53034f26 Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Thu, 7 Sep 2023 02:52:05 +0000 Subject: [PATCH 17/18] small lint addition --- src/pkg/utils/concurrency.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/pkg/utils/concurrency.go b/src/pkg/utils/concurrency.go index 9b103319d1..7fc80e7004 100644 --- a/src/pkg/utils/concurrency.go +++ b/src/pkg/utils/concurrency.go @@ -20,7 +20,9 @@ type ConcurrencyTools[P any, E any] struct { routineCount int } -// NewConcurrencyTools returns a ConcurrencyTools struct that has the given length set for concurrency iterations +// NewConcurrencyTools creates a new ConcurrencyTools struct +// +// Length is the number of iterations that will be performed concurrently func NewConcurrencyTools[P any, E any](length int) *ConcurrencyTools[P, E] { ctx, cancel := context.WithCancel(context.TODO()) From 093bd637f558c14768dc91bae1a408102c80da06 Mon Sep 17 00:00:00 2001 From: Jordan McClintock Date: Thu, 21 Sep 2023 20:58:55 +0000 Subject: [PATCH 18/18] change to builtin util --- src/internal/packager/images/pull.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/internal/packager/images/pull.go b/src/internal/packager/images/pull.go index 1fc61800fa..ce13f134f0 100644 --- a/src/internal/packager/images/pull.go +++ b/src/internal/packager/images/pull.go @@ -248,7 +248,7 @@ func (i *ImgConfig) PullAll() error { // Create the directory for the blob if it doesn't exist dir := filepath.Join(string(cranePath), "blobs", digest.Algorithm) - if err := os.MkdirAll(dir, os.ModePerm); err != nil && !os.IsExist(err) { + if err := utils.CreateDirectory(dir, os.ModePerm); err != nil { layerWritingConcurrency.ErrorChan <- err return }