Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Image push concurrency #477

Merged
merged 1 commit into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions cmd/mindthegap/create/imagebundle/image_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ func NewCommand(out output.Output) *cobra.Command {

out.StartOperationWithProgress(pullGauge)

for idx := range regNames {
registryName := regNames[idx]
for registryIdx := range regNames {
registryName := regNames[registryIdx]

registryConfig := cfg[registryName]

Expand Down Expand Up @@ -183,8 +183,8 @@ func NewCommand(out output.Output) *cobra.Command {

wg := new(sync.WaitGroup)

for i := range imageNames {
imageName := imageNames[i]
for imageIdx := range imageNames {
imageName := imageNames[imageIdx]
imageTags := registryConfig.Images[imageName]

wg.Add(len(imageTags))
Expand All @@ -194,7 +194,12 @@ func NewCommand(out output.Output) *cobra.Command {
eg.Go(func() error {
defer wg.Done()

srcImageName := fmt.Sprintf("%s/%s:%s", registryName, imageName, imageTag)
srcImageName := fmt.Sprintf(
"%s/%s:%s",
registryName,
imageName,
imageTag,
)

imageIndex, err := images.ManifestListForImage(
srcImageName,
Expand All @@ -205,7 +210,12 @@ func NewCommand(out output.Output) *cobra.Command {
return err
}

destImageName := fmt.Sprintf("%s/%s:%s", reg.Address(), imageName, imageTag)
destImageName := fmt.Sprintf(
"%s/%s:%s",
reg.Address(),
imageName,
imageTag,
)
ref, err := name.ParseReference(destImageName, name.StrictValidation)
if err != nil {
return err
Expand Down
146 changes: 98 additions & 48 deletions cmd/mindthegap/push/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"os"
"strings"
"sync"

"github.com/containers/image/v5/types"
"github.com/google/go-containerregistry/pkg/authn"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
"github.com/spf13/cobra"
"github.com/thediveo/enumflag/v2"
"golang.org/x/sync/errgroup"

"github.com/mesosphere/dkp-cli-runtime/core/output"

Expand Down Expand Up @@ -56,6 +58,7 @@ func NewCommand(out output.Output, bundleCmdName string) *cobra.Command {
destRegistryPassword string
ecrLifecyclePolicy string
onExistingTag = Overwrite
imagePushConcurrency int
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -203,6 +206,7 @@ func NewCommand(out output.Output, bundleCmdName string) *cobra.Command {
destRegistryURI.Path(),
destRemoteOpts,
onExistingTag,
imagePushConcurrency,
out,
prePushFuncs...,
)
Expand Down Expand Up @@ -271,6 +275,8 @@ func NewCommand(out output.Output, bundleCmdName string) *cobra.Command {
"on-existing-tag",
`how to handle existing tags: one of "overwrite", "error", or "skip"`,
)
cmd.Flags().
IntVar(&imagePushConcurrency, "image-push-concurrency", 1, "Image push concurrency")

return cmd
}
Expand All @@ -282,6 +288,7 @@ func pushImages(
sourceRegistry name.Registry, sourceRemoteOpts []remote.Option,
destRegistry name.Registry, destRegistryPath string, destRemoteOpts []remote.Option,
onExistingTag onExistingTagMode,
imagePushConcurrency int,
out output.Output,
prePushFuncs ...prePushFunc,
) error {
Expand All @@ -293,79 +300,122 @@ func pushImages(
// Sort registries for deterministic ordering.
regNames := cfg.SortedRegistryNames()

for _, registryName := range regNames {
eg, egCtx := errgroup.WithContext(context.Background())
eg.SetLimit(imagePushConcurrency)

sourceRemoteOpts = append(sourceRemoteOpts, remote.WithContext(egCtx))
destRemoteOpts = append(destRemoteOpts, remote.WithContext(egCtx))

pushGauge := &output.ProgressGauge{}
pushGauge.SetCapacity(cfg.TotalImages())
pushGauge.SetStatus("Pushing bundled images")

out.StartOperationWithProgress(pushGauge)

for registryIdx := range regNames {
registryName := regNames[registryIdx]

registryConfig := cfg[registryName]

// Sort images for deterministic ordering.
imageNames := registryConfig.SortedImageNames()

for _, imageName := range imageNames {
for imageIdx := range imageNames {
imageName := imageNames[imageIdx]

srcRepository := sourceRegistry.Repo(imageName)
destRepository := destRegistry.Repo(strings.TrimLeft(destRegistryPath, "/"), imageName)

imageTags := registryConfig.Images[imageName]

for _, prePush := range prePushFuncs {
if err := prePush(destRepository, imageTags...); err != nil {
return fmt.Errorf("pre-push func failed: %w", err)
}
}

existingImageTags, err := getExistingImages(
context.Background(),
onExistingTag,
puller,
destRepository,
var (
imageTagPrePushSync sync.Once
imageTagPrePushErr error
existingImageTags map[string]struct{}
)
if err != nil {
return err
}

for _, imageTag := range imageTags {
srcImage := srcRepository.Tag(imageTag)
destImage := destRepository.Tag(imageTag)

out.StartOperation(
fmt.Sprintf("Copying %s (from bundle) to %s",
srcImage.Name(),
destImage.Name(),
),
)
for tagIdx := range imageTags {
imageTag := imageTags[tagIdx]

eg.Go(func() error {
imageTagPrePushSync.Do(func() {
for _, prePush := range prePushFuncs {
if err := prePush(destRepository, imageTags...); err != nil {
imageTagPrePushErr = fmt.Errorf("pre-push func failed: %w", err)
}
}

existingImageTags, imageTagPrePushErr = getExistingImages(
context.Background(),
onExistingTag,
puller,
destRepository,
)
})

switch onExistingTag {
case Overwrite:
// Do nothing, just attempt to overwrite
case Skip:
if _, exists := existingImageTags[imageTag]; exists {
out.EndOperationWithStatus(output.Skipped())
continue
if imageTagPrePushErr != nil {
return imageTagPrePushErr
}
case Error:
if _, exists := existingImageTags[imageTag]; exists {
out.EndOperationWithStatus(output.Failure())
return fmt.Errorf("image tag already exists in destination registry")

srcImage := srcRepository.Tag(imageTag)
destImage := destRepository.Tag(imageTag)

pushFn := pushTag

switch onExistingTag {
case Overwrite:
// Do nothing, just attempt to overwrite
case Skip:
// If tag exists already then do nothing.
if _, exists := existingImageTags[imageTag]; exists {
pushFn = func(_ name.Reference, _ []remote.Option, _ name.Reference, _ []remote.Option) error {
return nil
}
}
case Error:
if _, exists := existingImageTags[imageTag]; exists {
return fmt.Errorf(
"image tag already exists in destination registry",
)
}
}
}

idx, err := remote.Index(srcImage, sourceRemoteOpts...)
if err != nil {
out.EndOperationWithStatus(output.Failure())
return err
}
if err := pushFn(srcImage, sourceRemoteOpts, destImage, destRemoteOpts); err != nil {
return err
}

if err := remote.WriteIndex(destImage, idx, destRemoteOpts...); err != nil {
out.EndOperationWithStatus(output.Failure())
return err
}
pushGauge.Inc()

out.EndOperationWithStatus(output.Success())
return nil
})
}
}
}

if err := eg.Wait(); err != nil {
out.EndOperationWithStatus(output.Failure())
return err
}

out.EndOperationWithStatus(output.Success())

return nil
}

func pushTag(
srcImage name.Reference,
sourceRemoteOpts []remote.Option,
destImage name.Reference,
destRemoteOpts []remote.Option,
) error {
idx, err := remote.Index(srcImage, sourceRemoteOpts...)
if err != nil {
return err
}

return remote.WriteIndex(destImage, idx, destRemoteOpts...)
}

func pushOCIArtifacts(
cfg config.HelmChartsConfig,
sourceRegistry name.Registry, sourceRegistryPath string, sourceRemoteOpts []remote.Option,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/elazarl/goproxy v0.0.0-20221015165544-a0805db90819
github.com/google/go-containerregistry v0.15.2
github.com/hashicorp/go-getter v1.7.2
github.com/mesosphere/dkp-cli-runtime/core v0.7.2
github.com/mesosphere/dkp-cli-runtime/core v0.7.3
github.com/mholt/archiver/v3 v3.5.1
github.com/onsi/ginkgo/v2 v2.11.0
github.com/onsi/gomega v1.27.10
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -698,8 +698,8 @@ github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/mesosphere/dkp-cli-runtime/core v0.7.2 h1:dWzl9mdIS14DV7GQbTgVy8EyjFTr5re4V4UUzdqNqzA=
github.com/mesosphere/dkp-cli-runtime/core v0.7.2/go.mod h1:hIC+ZZFofDtkRs1v+TnnGxAhFT5IIXuqVvXMe00zOvw=
github.com/mesosphere/dkp-cli-runtime/core v0.7.3 h1:oHRPvWdZgNOJxXCPFPRIqKWppB2cXx+HZeV8RVtwAsg=
github.com/mesosphere/dkp-cli-runtime/core v0.7.3/go.mod h1:hIC+ZZFofDtkRs1v+TnnGxAhFT5IIXuqVvXMe00zOvw=
github.com/mholt/archiver/v3 v3.5.1 h1:rDjOBX9JSF5BvoJGvjqK479aL70qh9DIpZCl+k7Clwo=
github.com/mholt/archiver/v3 v3.5.1/go.mod h1:e3dqJ7H78uzsRSEACH1joayhuSyhnonssnDhppzS1L4=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
Expand Down
3 changes: 3 additions & 0 deletions make/go.mk
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ E2E_FLAKE_ATTEMPTS ?= 1
.PHONY: e2e-test
e2e-test: ## Runs e2e tests
$(info $(M) running e2e tests$(if $(E2E_LABEL), labelled "$(E2E_LABEL)")$(if $(E2E_FOCUS), matching "$(E2E_FOCUS)"))
ifndef E2E_SKIP_BUILD
$(MAKE) GORELEASER_FLAGS=$$'--config=<(env GOOS=$(shell go env GOOS) GOARCH=$(shell go env GOARCH) gojq --yaml-input --yaml-output \'del(.builds[0].goarch) | del(.builds[0].goos) | .builds[0].targets|=(["linux_amd64","linux_arm64",env.GOOS+"_"+env.GOARCH] | unique | map(. | sub("_amd64";"_amd64_v1")))\' .goreleaser.yml) --clean --skip-validate --skip-publish' release
endif
ginkgo run \
--r \
--race \
Expand Down Expand Up @@ -120,6 +122,7 @@ endif
.PHONY: lint.%
lint.%: ## Runs golangci-lint for a specific module
lint.%: ; $(info $(M) linting $* module)
$(if $(filter-out root,$*),cd $* && )golines -w $$(go list ./... | sed "s|^$$(go list -m)|.|")
$(if $(filter-out root,$*),cd $* && )golangci-lint run --fix --config=$(GOLANGCI_CONFIG_FILE)
$(if $(filter-out root,$*),cd $* && )golines -w $$(go list ./... | sed "s|^$$(go list -m)|.|")
$(if $(filter-out root,$*),cd $* && )go fix ./...
Expand Down
16 changes: 7 additions & 9 deletions test/e2e/imagebundle/push_bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,31 +288,30 @@ var _ = Describe("Push Bundle", func() {
"--image-bundle", bundleFile,
"--to-registry", registryAddress,
"--to-registry-insecure-skip-tls-verify",
"--on-existing-tag=skip",
"--image-push-concurrency=4",
}

cmd.SetArgs(args)

Expect(cmd.Execute()).To(Succeed())

Expect(outputBuf.String()).To(ContainSubstring("✓"))
Expect(outputBuf.String()).ToNot(ContainSubstring("∅"))
Expect(outputBuf.String()).ToNot(ContainSubstring("✗"))
})

It("Successful push without on-existing-tag flag (default to overwrite)", func() {
args := []string{
"--image-bundle", bundleFile,
"--to-registry", registryAddress,
"--to-registry-insecure-skip-tls-verify",
"--image-push-concurrency=4",
}

cmd.SetArgs(args)

Expect(cmd.Execute()).To(Succeed())

Expect(outputBuf.String()).To(ContainSubstring("✓"))
Expect(outputBuf.String()).ToNot(ContainSubstring("∅"))
Expect(outputBuf.String()).ToNot(ContainSubstring("✗"))
})

It("Successful push with explicit --on-existing-tag=overwrite", func() {
Expand All @@ -321,15 +320,14 @@ var _ = Describe("Push Bundle", func() {
"--to-registry", registryAddress,
"--to-registry-insecure-skip-tls-verify",
"--on-existing-tag=overwrite",
"--image-push-concurrency=4",
}

cmd.SetArgs(args)

Expect(cmd.Execute()).To(Succeed())

Expect(outputBuf.String()).To(ContainSubstring("✓"))
Expect(outputBuf.String()).ToNot(ContainSubstring("∅"))
Expect(outputBuf.String()).ToNot(ContainSubstring("✗"))
})

It("Successful push with explicit --on-existing-tag=skip", func() {
Expand All @@ -338,14 +336,14 @@ var _ = Describe("Push Bundle", func() {
"--to-registry", registryAddress,
"--to-registry-insecure-skip-tls-verify",
"--on-existing-tag=skip",
"--image-push-concurrency=4",
}

cmd.SetArgs(args)

Expect(cmd.Execute()).To(Succeed())

Expect(outputBuf.String()).To(ContainSubstring("∅"))
Expect(outputBuf.String()).ToNot(ContainSubstring("✗"))
Expect(outputBuf.String()).To(ContainSubstring("✓"))
})

It("Failed push with explicit --on-existing-tag=error", func() {
Expand All @@ -354,14 +352,14 @@ var _ = Describe("Push Bundle", func() {
"--to-registry", registryAddress,
"--to-registry-insecure-skip-tls-verify",
"--on-existing-tag=error",
"--image-push-concurrency=4",
}

cmd.SetArgs(args)

Expect(cmd.Execute()).To(HaveOccurred())

Expect(outputBuf.String()).To(ContainSubstring("✗"))
Expect(outputBuf.String()).ToNot(ContainSubstring("∅"))
})
})
})