Skip to content

Commit

Permalink
fix: sync registry contents via ssh (#4106)
Browse files Browse the repository at this point in the history
Signed-off-by: fengxsong <[email protected]>
  • Loading branch information
fengxsong authored and [email protected] committed Oct 17, 2023
1 parent 33aee73 commit 655e558
Showing 1 changed file with 79 additions and 99 deletions.
178 changes: 79 additions & 99 deletions pkg/filesystem/registry/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ package registry
import (
"context"
"fmt"
"io"
"path/filepath"
"strings"
stdsync "sync"
"time"

"github.com/containers/common/pkg/auth"
"github.com/containers/image/v5/copy"
"github.com/containers/image/v5/types"
"golang.org/x/sync/errgroup"
Expand All @@ -48,6 +45,11 @@ const (
defaultTemporaryPort = "5050"
)

const (
httpMode int = iota
sshMode
)

type impl struct {
pathResolver constants.PathResolver
execer exec.Interface
Expand Down Expand Up @@ -77,50 +79,58 @@ func (s *impl) Sync(ctx context.Context, hosts ...string) error {
logger.Debug("running temporary registry on host %s", host)
if err := s.execer.CmdAsyncWithContext(ctx, host, getRegistryServeCommand(s.pathResolver, defaultTemporaryPort)); err != nil {
// ignore expected signal killed error when context cancel
if !strings.Contains(err.Error(), "signal: killed") {
if !strings.Contains(err.Error(), "signal: killed") && !strings.Contains(err.Error(), "context canceled") {
logger.Error(err)
}
}
}(cmdCtx, hosts[i])
}

var endpoints []string
probeCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
eg, _ := errgroup.WithContext(probeCtx)
mutex := &stdsync.Mutex{}
for i := range hosts {
host := hosts[i]
eg.Go(func() error {
ep := sync.ParseRegistryAddress(trimPortStr(host), defaultTemporaryPort)
if err := httputils.WaitUntilEndpointAlive(probeCtx, "http://"+ep); err != nil {
return err
}
mutex.Lock()
endpoints = append(endpoints, ep)
mutex.Unlock()
return nil
})
}
var syncFn func(context.Context, string) error
if err := eg.Wait(); err != nil {
logger.Warn("cannot connect to remote temporary registry: %v, fallback using ssh mode instead", err)
syncFn = syncViaSSH(s, hosts)
} else {
syncFn = syncViaHTTP(endpoints)
type syncOption struct {
target string
typ int
}

outerEg, ctx := errgroup.WithContext(ctx)
for i := range s.mounts {
registryDir := filepath.Join(s.mounts[i].MountPoint, constants.RegistryDirName)
if !file.IsDir(registryDir) {
continue
syncOptionChan := make(chan *syncOption, len(hosts))
go func() {
for i := range hosts {
go func(target string) {
probeCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
ep := sync.ParseRegistryAddress(trimPortStr(target), defaultTemporaryPort)
if err := httputils.WaitUntilEndpointAlive(probeCtx, "http://"+ep); err != nil {
logger.Warn("cannot connect to remote temporary registry %s: %v, fallback using ssh mode instead", ep, err)
syncOptionChan <- &syncOption{target: target, typ: sshMode}
} else {
syncOptionChan <- &syncOption{target: ep, typ: httpMode}
}
}(hosts[i])
}
}()

eg, _ := errgroup.WithContext(ctx)
for i := 0; i < len(hosts); i++ {
opt, ok := <-syncOptionChan
if !ok {
break
}
for j := range s.mounts {
registryDir := filepath.Join(s.mounts[j].MountPoint, constants.RegistryDirName)
if !file.IsDir(registryDir) {
continue
}
eg.Go(func() (err error) {
switch opt.typ {
case httpMode:
err = syncViaHTTP(ctx, opt.target, registryDir)
case sshMode:
err = syncViaSSH(ctx, s, opt.target, registryDir)
}
return
})
}
outerEg.Go(func() error {
return syncFn(ctx, registryDir)
})
}
return outerEg.Wait()
return eg.Wait()
}

func trimPortStr(s string) string {
Expand All @@ -136,76 +146,46 @@ func getRegistryServeCommand(pathResolver constants.PathResolver, port string) s
)
}

//lint:ignore U1000 Ignore unused function temporarily for debugging
func loginRegistry(ctx context.Context, sys *types.SystemContext, username, password, registry string) error {
return auth.Login(ctx, sys, &auth.LoginOptions{
Username: username,
Password: password,
Stdout: io.Discard,
}, []string{registry})
func syncViaSSH(_ context.Context, s *impl, target string, localDir string) error {
return ssh.CopyDir(s.execer, target, localDir, s.pathResolver.RootFSPath(), nil)
}

func syncViaSSH(s *impl, targets []string) func(context.Context, string) error {
return func(ctx context.Context, localDir string) error {
eg, _ := errgroup.WithContext(ctx)
for i := range targets {
target := targets[i]
eg.Go(func() error {
return ssh.CopyDir(s.execer, target, localDir, s.pathResolver.RootFSPath(), constants.IsRegistryDir)
})
}
return eg.Wait()
}
}

func syncViaHTTP(targets []string) func(context.Context, string) error {
func syncViaHTTP(ctx context.Context, target string, localDir string) error {
sys := &types.SystemContext{
DockerInsecureSkipTLSVerify: types.OptionalBoolTrue,
}
return func(ctx context.Context, localDir string) error {
config, err := handler.NewConfig(localDir, 0)
if err != nil {
return err
}
config.Log.AccessLog.Disabled = true
errCh := handler.Run(ctx, config)

eg, inner := errgroup.WithContext(ctx)
for i := range targets {
target := targets[i]
eg.Go(func() error {
src := sync.ParseRegistryAddress(localhost, config.HTTP.Addr)
probeCtx, cancel := context.WithTimeout(inner, time.Second*3)
defer cancel()
if err = httputils.WaitUntilEndpointAlive(probeCtx, "http://"+src); err != nil {
return err
}
opts := &sync.Options{
SystemContext: sys,
Source: src,
Target: target,
SelectionOptions: []copy.ImageListSelection{
copy.CopyAllImages, copy.CopySystemImage,
},
OmitError: true,
}

if err = sync.ToRegistry(inner, opts); err == nil {
return nil
}
if !strings.Contains(err.Error(), "manifest unknown") {
return err
}
return nil
})
}
err = eg.Wait()
go func() {
// for notifying shutdown http Server
errCh <- err
}()
config, err := handler.NewConfig(localDir, 0)
if err != nil {
return err
}
config.Log.AccessLog.Disabled = true
errCh := handler.Run(ctx, config)
defer func() {
// for notifying shutdown http Server
errCh <- nil
}()

src := sync.ParseRegistryAddress(localhost, config.HTTP.Addr)
probeCtx, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
if err = httputils.WaitUntilEndpointAlive(probeCtx, "http://"+src); err != nil {
return err
}
opts := &sync.Options{
SystemContext: sys,
Source: src,
Target: target,
SelectionOptions: []copy.ImageListSelection{
copy.CopyAllImages, copy.CopySystemImage,
},
OmitError: true,
}

if err = sync.ToRegistry(ctx, opts); err != nil && !strings.Contains(err.Error(), "manifest unknown") {
return err
}
return nil
}

func New(pathResolver constants.PathResolver, execer exec.Interface, mounts []v2.MountImage) filesystem.RegistrySyncer {
Expand Down

0 comments on commit 655e558

Please sign in to comment.