diff --git a/pkg/filesystem/registry/sync.go b/pkg/filesystem/registry/sync.go index a3704d73325..0c442bb8814 100644 --- a/pkg/filesystem/registry/sync.go +++ b/pkg/filesystem/registry/sync.go @@ -19,13 +19,10 @@ package registry import ( "context" "fmt" - "io" "path/filepath" "strings" - stdsync "sync" "time" - "github.com/containers/common/pkg/auth" "github.com/containers/image/v5/copy" "github.com/containers/image/v5/types" "golang.org/x/sync/errgroup" @@ -48,6 +45,11 @@ const ( defaultTemporaryPort = "5050" ) +const ( + httpMode int = iota + sshMode +) + type impl struct { pathResolver constants.PathResolver execer exec.Interface @@ -77,50 +79,58 @@ func (s *impl) Sync(ctx context.Context, hosts ...string) error { logger.Debug("running temporary registry on host %s", host) if err := s.execer.CmdAsyncWithContext(ctx, host, getRegistryServeCommand(s.pathResolver, defaultTemporaryPort)); err != nil { // ignore expected signal killed error when context cancel - if !strings.Contains(err.Error(), "signal: killed") { + if !strings.Contains(err.Error(), "signal: killed") && !strings.Contains(err.Error(), "context canceled") { logger.Error(err) } } }(cmdCtx, hosts[i]) } - var endpoints []string - probeCtx, cancel := context.WithTimeout(ctx, 3*time.Second) - defer cancel() - eg, _ := errgroup.WithContext(probeCtx) - mutex := &stdsync.Mutex{} - for i := range hosts { - host := hosts[i] - eg.Go(func() error { - ep := sync.ParseRegistryAddress(trimPortStr(host), defaultTemporaryPort) - if err := httputils.WaitUntilEndpointAlive(probeCtx, "http://"+ep); err != nil { - return err - } - mutex.Lock() - endpoints = append(endpoints, ep) - mutex.Unlock() - return nil - }) - } - var syncFn func(context.Context, string) error - if err := eg.Wait(); err != nil { - logger.Warn("cannot connect to remote temporary registry: %v, fallback using ssh mode instead", err) - syncFn = syncViaSSH(s, hosts) - } else { - syncFn = syncViaHTTP(endpoints) + type syncOption struct { + target string + typ int } - outerEg, ctx := errgroup.WithContext(ctx) - for i := range s.mounts { - registryDir := filepath.Join(s.mounts[i].MountPoint, constants.RegistryDirName) - if !file.IsDir(registryDir) { - continue + syncOptionChan := make(chan *syncOption, len(hosts)) + go func() { + for i := range hosts { + go func(target string) { + probeCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + ep := sync.ParseRegistryAddress(trimPortStr(target), defaultTemporaryPort) + if err := httputils.WaitUntilEndpointAlive(probeCtx, "http://"+ep); err != nil { + logger.Warn("cannot connect to remote temporary registry %s: %v, fallback using ssh mode instead", ep, err) + syncOptionChan <- &syncOption{target: target, typ: sshMode} + } else { + syncOptionChan <- &syncOption{target: ep, typ: httpMode} + } + }(hosts[i]) + } + }() + + eg, _ := errgroup.WithContext(ctx) + for i := 0; i < len(hosts); i++ { + opt, ok := <-syncOptionChan + if !ok { + break + } + for j := range s.mounts { + registryDir := filepath.Join(s.mounts[j].MountPoint, constants.RegistryDirName) + if !file.IsDir(registryDir) { + continue + } + eg.Go(func() (err error) { + switch opt.typ { + case httpMode: + err = syncViaHTTP(ctx, opt.target, registryDir) + case sshMode: + err = syncViaSSH(ctx, s, opt.target, registryDir) + } + return + }) } - outerEg.Go(func() error { - return syncFn(ctx, registryDir) - }) } - return outerEg.Wait() + return eg.Wait() } func trimPortStr(s string) string { @@ -136,76 +146,46 @@ func getRegistryServeCommand(pathResolver constants.PathResolver, port string) s ) } -//lint:ignore U1000 Ignore unused function temporarily for debugging -func loginRegistry(ctx context.Context, sys *types.SystemContext, username, password, registry string) error { - return auth.Login(ctx, sys, &auth.LoginOptions{ - Username: username, - Password: password, - Stdout: io.Discard, - }, []string{registry}) +func syncViaSSH(_ context.Context, s *impl, target string, localDir string) error { + return ssh.CopyDir(s.execer, target, localDir, s.pathResolver.RootFSPath(), nil) } -func syncViaSSH(s *impl, targets []string) func(context.Context, string) error { - return func(ctx context.Context, localDir string) error { - eg, _ := errgroup.WithContext(ctx) - for i := range targets { - target := targets[i] - eg.Go(func() error { - return ssh.CopyDir(s.execer, target, localDir, s.pathResolver.RootFSPath(), constants.IsRegistryDir) - }) - } - return eg.Wait() - } -} - -func syncViaHTTP(targets []string) func(context.Context, string) error { +func syncViaHTTP(ctx context.Context, target string, localDir string) error { sys := &types.SystemContext{ DockerInsecureSkipTLSVerify: types.OptionalBoolTrue, } - return func(ctx context.Context, localDir string) error { - config, err := handler.NewConfig(localDir, 0) - if err != nil { - return err - } - config.Log.AccessLog.Disabled = true - errCh := handler.Run(ctx, config) - - eg, inner := errgroup.WithContext(ctx) - for i := range targets { - target := targets[i] - eg.Go(func() error { - src := sync.ParseRegistryAddress(localhost, config.HTTP.Addr) - probeCtx, cancel := context.WithTimeout(inner, time.Second*3) - defer cancel() - if err = httputils.WaitUntilEndpointAlive(probeCtx, "http://"+src); err != nil { - return err - } - opts := &sync.Options{ - SystemContext: sys, - Source: src, - Target: target, - SelectionOptions: []copy.ImageListSelection{ - copy.CopyAllImages, copy.CopySystemImage, - }, - OmitError: true, - } - if err = sync.ToRegistry(inner, opts); err == nil { - return nil - } - if !strings.Contains(err.Error(), "manifest unknown") { - return err - } - return nil - }) - } - err = eg.Wait() - go func() { - // for notifying shutdown http Server - errCh <- err - }() + config, err := handler.NewConfig(localDir, 0) + if err != nil { + return err + } + config.Log.AccessLog.Disabled = true + errCh := handler.Run(ctx, config) + defer func() { + // for notifying shutdown http Server + errCh <- nil + }() + + src := sync.ParseRegistryAddress(localhost, config.HTTP.Addr) + probeCtx, cancel := context.WithTimeout(ctx, time.Second*3) + defer cancel() + if err = httputils.WaitUntilEndpointAlive(probeCtx, "http://"+src); err != nil { + return err + } + opts := &sync.Options{ + SystemContext: sys, + Source: src, + Target: target, + SelectionOptions: []copy.ImageListSelection{ + copy.CopyAllImages, copy.CopySystemImage, + }, + OmitError: true, + } + + if err = sync.ToRegistry(ctx, opts); err != nil && !strings.Contains(err.Error(), "manifest unknown") { return err } + return nil } func New(pathResolver constants.PathResolver, execer exec.Interface, mounts []v2.MountImage) filesystem.RegistrySyncer {