From c76a2ed6ff3402451bf5a741ca8e1676e8381ab2 Mon Sep 17 00:00:00 2001 From: Gergely Brautigam <182850+Skarlso@users.noreply.github.com> Date: Thu, 24 Oct 2024 12:28:16 +0200 Subject: [PATCH] Revert "feat: replace docker with oras (#904)" (#1005) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 3cfa228726daebb061989b27b620698d24c555d2. #### What this PR does / why we need it #### Which issue(s) this PR fixes --------- Co-authored-by: Jakob Möller --- .../extensions/repositories/ocireg/blobs.go | 109 +++ .../repositories/ocireg/namespace.go | 178 ++--- .../repositories/ocireg/repository.go | 107 ++- .../extensions/repositories/ocireg/utils.go | 60 +- .../pubsub/providers/ocireg/provider.go | 14 +- api/tech/docker/README.md | 4 + api/tech/docker/errors/errors.go | 58 ++ api/tech/docker/fetcher.go | 202 ++++++ api/tech/docker/handler.go | 136 ++++ api/tech/docker/httpreadseeker.go | 157 +++++ api/tech/docker/lister.go | 130 ++++ api/tech/docker/orig.go | 44 ++ api/tech/docker/pusher.go | 433 ++++++++++++ api/tech/docker/registry.go | 234 +++++++ api/tech/docker/resolve/interface.go | 75 ++ api/tech/docker/resolver.go | 656 ++++++++++++++++++ go.mod | 4 +- go.sum | 2 - 18 files changed, 2414 insertions(+), 189 deletions(-) create mode 100644 api/oci/extensions/repositories/ocireg/blobs.go create mode 100644 api/tech/docker/README.md create mode 100644 api/tech/docker/errors/errors.go create mode 100644 api/tech/docker/fetcher.go create mode 100644 api/tech/docker/handler.go create mode 100644 api/tech/docker/httpreadseeker.go create mode 100644 api/tech/docker/lister.go create mode 100644 api/tech/docker/orig.go create mode 100644 api/tech/docker/pusher.go create mode 100644 api/tech/docker/registry.go create mode 100644 api/tech/docker/resolve/interface.go create mode 100644 api/tech/docker/resolver.go diff --git a/api/oci/extensions/repositories/ocireg/blobs.go b/api/oci/extensions/repositories/ocireg/blobs.go new file mode 100644 index 000000000..0ecf1b299 --- /dev/null +++ b/api/oci/extensions/repositories/ocireg/blobs.go @@ -0,0 +1,109 @@ +package ocireg + +import ( + "sync" + + "github.com/containerd/containerd/remotes" + "github.com/mandelsoft/goutils/errors" + "github.com/opencontainers/go-digest" + "github.com/sirupsen/logrus" + + "ocm.software/ocm/api/oci/cpi" + "ocm.software/ocm/api/oci/extensions/attrs/cacheattr" + "ocm.software/ocm/api/tech/docker/resolve" + "ocm.software/ocm/api/utils/accessio" + "ocm.software/ocm/api/utils/blobaccess/blobaccess" +) + +type BlobContainer interface { + GetBlobData(digest digest.Digest) (int64, cpi.DataAccess, error) + AddBlob(blob cpi.BlobAccess) (int64, digest.Digest, error) + Unref() error +} + +type blobContainer struct { + accessio.StaticAllocatable + fetcher resolve.Fetcher + pusher resolve.Pusher + mime string +} + +type BlobContainers struct { + lock sync.Mutex + cache accessio.BlobCache + fetcher resolve.Fetcher + pusher resolve.Pusher + mimes map[string]BlobContainer +} + +func NewBlobContainers(ctx cpi.Context, fetcher remotes.Fetcher, pusher resolve.Pusher) *BlobContainers { + return &BlobContainers{ + cache: cacheattr.Get(ctx), + fetcher: fetcher, + pusher: pusher, + mimes: map[string]BlobContainer{}, + } +} + +func (c *BlobContainers) Get(mime string) (BlobContainer, error) { + c.lock.Lock() + defer c.lock.Unlock() + + found := c.mimes[mime] + if found == nil { + container, err := NewBlobContainer(c.cache, mime, c.fetcher, c.pusher) + if err != nil { + return nil, err + } + c.mimes[mime] = container + + return container, nil + } + + return found, nil +} + +func (c *BlobContainers) Release() error { + c.lock.Lock() + defer c.lock.Unlock() + list := errors.ErrListf("releasing mime block caches") + for _, b := range c.mimes { + list.Add(b.Unref()) + } + return list.Result() +} + +func newBlobContainer(mime string, fetcher resolve.Fetcher, pusher resolve.Pusher) *blobContainer { + return &blobContainer{ + mime: mime, + fetcher: fetcher, + pusher: pusher, + } +} + +func NewBlobContainer(cache accessio.BlobCache, mime string, fetcher resolve.Fetcher, pusher resolve.Pusher) (BlobContainer, error) { + c := newBlobContainer(mime, fetcher, pusher) + + if cache == nil { + return c, nil + } + r, err := accessio.CachedAccess(c, c, cache) + if err != nil { + return nil, err + } + return r, nil +} + +func (n *blobContainer) GetBlobData(digest digest.Digest) (int64, cpi.DataAccess, error) { + logrus.Debugf("orig get %s %s\n", n.mime, digest) + acc, err := NewDataAccess(n.fetcher, digest, n.mime, false) + return blobaccess.BLOB_UNKNOWN_SIZE, acc, err +} + +func (n *blobContainer) AddBlob(blob cpi.BlobAccess) (int64, digest.Digest, error) { + err := push(dummyContext, n.pusher, blob) + if err != nil { + return blobaccess.BLOB_UNKNOWN_SIZE, blobaccess.BLOB_UNKNOWN_DIGEST, err + } + return blob.Size(), blob.Digest(), err +} diff --git a/api/oci/extensions/repositories/ocireg/namespace.go b/api/oci/extensions/repositories/ocireg/namespace.go index cd46662e7..9ef823997 100644 --- a/api/oci/extensions/repositories/ocireg/namespace.go +++ b/api/oci/extensions/repositories/ocireg/namespace.go @@ -4,15 +4,15 @@ import ( "context" "fmt" + "github.com/containerd/errdefs" "github.com/mandelsoft/goutils/errors" "github.com/opencontainers/go-digest" - "oras.land/oras-go/v2/errdef" - "oras.land/oras-go/v2/registry" "ocm.software/ocm/api/oci/artdesc" "ocm.software/ocm/api/oci/cpi" "ocm.software/ocm/api/oci/cpi/support" "ocm.software/ocm/api/oci/extensions/actions/oci-repository-prepare" + "ocm.software/ocm/api/tech/docker/resolve" "ocm.software/ocm/api/utils/accessio" "ocm.software/ocm/api/utils/blobaccess/blobaccess" "ocm.software/ocm/api/utils/logging" @@ -20,108 +20,153 @@ import ( ) type NamespaceContainer struct { - impl support.NamespaceAccessImpl - repo *RepositoryImpl - checked bool - ociRepo registry.Repository + impl support.NamespaceAccessImpl + repo *RepositoryImpl + resolver resolve.Resolver + lister resolve.Lister + fetcher resolve.Fetcher + pusher resolve.Pusher + blobs *BlobContainers + checked bool } var _ support.NamespaceContainer = (*NamespaceContainer)(nil) func NewNamespace(repo *RepositoryImpl, name string) (cpi.NamespaceAccess, error) { ref := repo.GetRef(name, "") - ociRepo, err := repo.getResolver(ref, name) + resolver, err := repo.getResolver(name) + if err != nil { + return nil, err + } + fetcher, err := resolver.Fetcher(context.Background(), ref) + if err != nil { + return nil, err + } + pusher, err := resolver.Pusher(context.Background(), ref) + if err != nil { + return nil, err + } + lister, err := resolver.Lister(context.Background(), ref) if err != nil { return nil, err } - c := &NamespaceContainer{ - repo: repo, - ociRepo: ociRepo, + repo: repo, + resolver: resolver, + lister: lister, + fetcher: fetcher, + pusher: pusher, + blobs: NewBlobContainers(repo.GetContext(), fetcher, pusher), } return support.NewNamespaceAccess(name, c, repo) } func (n *NamespaceContainer) Close() error { - return n.repo.Close() + return n.blobs.Release() } func (n *NamespaceContainer) SetImplementation(impl support.NamespaceAccessImpl) { n.impl = impl } +func (n *NamespaceContainer) getPusher(vers string) (resolve.Pusher, error) { + err := n.assureCreated() + if err != nil { + return nil, err + } + + ref := n.repo.GetRef(n.impl.GetNamespace(), vers) + resolver := n.resolver + + n.repo.GetContext().Logger().Trace("get pusher", "ref", ref) + if ok, _ := artdesc.IsDigest(vers); !ok { + var err error + + resolver, err = n.repo.getResolver(n.impl.GetNamespace()) + if err != nil { + return nil, fmt.Errorf("unable get resolver: %w", err) + } + } + + return resolver.Pusher(dummyContext, ref) +} + +func (n *NamespaceContainer) push(vers string, blob cpi.BlobAccess) error { + p, err := n.getPusher(vers) + if err != nil { + return fmt.Errorf("unable to get pusher: %w", err) + } + n.repo.GetContext().Logger().Trace("pushing", "version", vers) + return push(dummyContext, p, blob) +} + func (n *NamespaceContainer) IsReadOnly() bool { return n.repo.IsReadOnly() } func (n *NamespaceContainer) GetBlobData(digest digest.Digest) (int64, cpi.DataAccess, error) { n.repo.GetContext().Logger().Debug("getting blob", "digest", digest) - - acc, err := NewDataAccess(n.ociRepo, digest, false) + blob, err := n.blobs.Get("") if err != nil { - return -1, nil, fmt.Errorf("failed to construct data access: %w", err) + return -1, nil, fmt.Errorf("failed to retrieve blob data: %w", err) } - - n.repo.GetContext().Logger().Debug("getting blob done", "digest", digest, "size", blobaccess.BLOB_UNKNOWN_SIZE, "error", logging.ErrorMessage(err)) - return blobaccess.BLOB_UNKNOWN_SIZE, acc, err + size, acc, err := blob.GetBlobData(digest) + n.repo.GetContext().Logger().Debug("getting blob done", "digest", digest, "size", size, "error", logging.ErrorMessage(err)) + return size, acc, err } func (n *NamespaceContainer) AddBlob(blob cpi.BlobAccess) error { log := n.repo.GetContext().Logger() log.Debug("adding blob", "digest", blob.Digest()) - - if err := n.assureCreated(); err != nil { - return err + blobData, err := n.blobs.Get("") + if err != nil { + return fmt.Errorf("failed to retrieve blob data: %w", err) } - - if err := push(dummyContext, n.ociRepo, blob); err != nil { + err = n.assureCreated() + if err != nil { return err } - + if _, _, err := blobData.AddBlob(blob); err != nil { + log.Debug("adding blob failed", "digest", blob.Digest(), "error", err.Error()) + return fmt.Errorf("unable to add blob (OCI repository %s): %w", n.impl.GetNamespace(), err) + } log.Debug("adding blob done", "digest", blob.Digest()) return nil } func (n *NamespaceContainer) ListTags() ([]string, error) { - var result []string - if err := n.ociRepo.Tags(dummyContext, "", func(tags []string) error { - result = append(result, tags...) - - return nil - }); err != nil { - return nil, err - } - - return result, nil + return n.lister.List(dummyContext) } func (n *NamespaceContainer) GetArtifact(i support.NamespaceAccessImpl, vers string) (cpi.ArtifactAccess, error) { ref := n.repo.GetRef(n.impl.GetNamespace(), vers) n.repo.GetContext().Logger().Debug("get artifact", "ref", ref) - desc, err := n.ociRepo.Resolve(context.Background(), ref) + _, desc, err := n.resolver.Resolve(context.Background(), ref) n.repo.GetContext().Logger().Debug("done", "digest", desc.Digest, "size", desc.Size, "mimetype", desc.MediaType, "error", logging.ErrorMessage(err)) if err != nil { - if errors.Is(err, errdef.ErrNotFound) { + if errdefs.IsNotFound(err) { return nil, errors.ErrNotFound(cpi.KIND_OCIARTIFACT, ref, n.impl.GetNamespace()) } return nil, err } - - acc, err := NewDataAccess(n.ociRepo, desc.Digest, false) + blobData, err := n.blobs.Get(desc.MediaType) if err != nil { - return nil, fmt.Errorf("failed to construct data access: %w", err) + return nil, fmt.Errorf("failed to retrieve blob data, blob data was empty: %w", err) + } + _, acc, err := blobData.GetBlobData(desc.Digest) + if err != nil { + return nil, err } - return support.NewArtifactForBlob(i, blobaccess.ForDataAccess(desc.Digest, desc.Size, desc.MediaType, acc)) } func (n *NamespaceContainer) HasArtifact(vers string) (bool, error) { ref := n.repo.GetRef(n.impl.GetNamespace(), vers) n.repo.GetContext().Logger().Debug("check artifact", "ref", ref) - desc, err := n.ociRepo.Resolve(context.Background(), ref) + _, desc, err := n.resolver.Resolve(context.Background(), ref) n.repo.GetContext().Logger().Debug("done", "digest", desc.Digest, "size", desc.Size, "mimetype", desc.MediaType, "error", logging.ErrorMessage(err)) if err != nil { - if errors.Is(err, errdef.ErrNotFound) { + if errdefs.IsNotFound(err) { return false, nil } return false, err @@ -159,15 +204,20 @@ func (n *NamespaceContainer) AddArtifact(artifact cpi.Artifact, tags ...string) } n.repo.GetContext().Logger().Debug("adding artifact", "digest", blob.Digest(), "mimetype", blob.MimeType()) + blobData, err := n.blobs.Get(blob.MimeType()) + if err != nil { + return nil, fmt.Errorf("failed to retrieve blob data: %w", err) + } - if err := n.assureCreated(); err != nil { + _, _, err = blobData.AddBlob(blob) + if err != nil { return nil, err } if len(tags) > 0 { for _, tag := range tags { - if err := n.pushTag(blob, tag); err != nil { - return nil, fmt.Errorf("failed to push tag %s: %w", tag, err) + if err := n.push(tag, blob); err != nil { + return nil, err } } } @@ -175,52 +225,22 @@ func (n *NamespaceContainer) AddArtifact(artifact cpi.Artifact, tags ...string) return blob, err } -func (n *NamespaceContainer) pushTag(blob blobaccess.BlobAccess, tag string) error { - reader, err := blob.Reader() - if err != nil { - return err - } - - expectedDescriptor := *artdesc.DefaultBlobDescriptor(blob) - // If the descriptor exists, we are adding the blob to the descriptor as is. - if err := n.ociRepo.PushReference(context.Background(), expectedDescriptor, reader, tag); err != nil { - // If the manifest is unknown to the registry, which can occur with Docker, - // we might be able to push the entire blob instead of a reference. - // We can't assert the error because docker returns Manifest Unknown, - // while ghcr.io or quay work with PushReference out of the box. - // Meanwhile, we need to get the reader again, because PushReference exhausted it. - reader, err = blob.Reader() - if err != nil { - return err - } - - // If any other error arises, pushing the blob would also fail. - return n.ociRepo.Blobs().Push(context.Background(), expectedDescriptor, reader) - } - - return nil -} - func (n *NamespaceContainer) AddTags(digest digest.Digest, tags ...string) error { - ref := n.repo.GetRef(n.impl.GetNamespace(), digest.String()) - desc, err := n.ociRepo.Resolve(context.Background(), ref) + _, desc, err := n.resolver.Resolve(context.Background(), n.repo.GetRef(n.impl.GetNamespace(), digest.String())) if err != nil { return fmt.Errorf("unable to resolve: %w", err) } - acc, err := NewDataAccess(n.ociRepo, desc.Digest, false) + acc, err := NewDataAccess(n.fetcher, desc.Digest, desc.MediaType, false) if err != nil { return fmt.Errorf("error creating new data access: %w", err) } - if err := n.assureCreated(); err != nil { - return err - } - blob := blobaccess.ForDataAccess(desc.Digest, desc.Size, desc.MediaType, acc) for _, tag := range tags { - if err := n.pushTag(blob, tag); err != nil { - return fmt.Errorf("failed to push tag %s: %w", tag, err) + err := n.push(tag, blob) + if err != nil { + return fmt.Errorf("unable to push: %w", err) } } diff --git a/api/oci/extensions/repositories/ocireg/repository.go b/api/oci/extensions/repositories/ocireg/repository.go index 061879a0d..1bae127a7 100644 --- a/api/oci/extensions/repositories/ocireg/repository.go +++ b/api/oci/extensions/repositories/ocireg/repository.go @@ -4,23 +4,20 @@ import ( "context" "crypto/tls" "crypto/x509" - "fmt" - "net/http" "path" "strings" + "github.com/containerd/containerd/remotes/docker/config" + "github.com/containerd/errdefs" "github.com/mandelsoft/goutils/errors" "github.com/mandelsoft/logging" - "oras.land/oras-go/v2/errdef" - "oras.land/oras-go/v2/registry" - "oras.land/oras-go/v2/registry/remote" - "oras.land/oras-go/v2/registry/remote/auth" - "oras.land/oras-go/v2/registry/remote/retry" "ocm.software/ocm/api/credentials" "ocm.software/ocm/api/datacontext/attrs/rootcertsattr" "ocm.software/ocm/api/oci/artdesc" "ocm.software/ocm/api/oci/cpi" + "ocm.software/ocm/api/tech/docker" + "ocm.software/ocm/api/tech/docker/resolve" "ocm.software/ocm/api/tech/oci/identity" "ocm.software/ocm/api/utils" ocmlog "ocm.software/ocm/api/utils/logging" @@ -115,7 +112,7 @@ func (r *RepositoryImpl) getCreds(comp string) (credentials.Credentials, error) return identity.GetCredentials(r.GetContext(), r.info.Locator, comp) } -func (r *RepositoryImpl) getResolver(ref string, comp string) (registry.Repository, error) { +func (r *RepositoryImpl) getResolver(comp string) (resolve.Resolver, error) { creds, err := r.getCreds(comp) if err != nil { if !errors.IsErrUnknownKind(err, credentials.KIND_CONSUMER) { @@ -126,59 +123,53 @@ func (r *RepositoryImpl) getResolver(ref string, comp string) (registry.Reposito if creds == nil { logger.Trace("no credentials") } - repo, err := remote.NewRepository(ref) - if err != nil { - return nil, fmt.Errorf("error creating oci repository: %w", err) - } - - authCreds := auth.Credential{} - if creds != nil { - pass := creds.GetProperty(credentials.ATTR_IDENTITY_TOKEN) - if pass == "" { - pass = creds.GetProperty(credentials.ATTR_PASSWORD) - } - authCreds.Username = creds.GetProperty(credentials.ATTR_USERNAME) - authCreds.Password = pass - } - client := http.DefaultClient - if r.info.Scheme == "https" { - // set up TLS - //nolint:gosec // used like the default, there are OCI servers (quay.io) not working with min version. - conf := &tls.Config{ - // MinVersion: tls.VersionTLS13, - RootCAs: func() *x509.CertPool { - var rootCAs *x509.CertPool + opts := docker.ResolverOptions{ + Hosts: docker.ConvertHosts(config.ConfigureHosts(context.Background(), config.HostOptions{ + Credentials: func(host string) (string, string, error) { if creds != nil { - c := creds.GetProperty(credentials.ATTR_CERTIFICATE_AUTHORITY) - if c != "" { - rootCAs = x509.NewCertPool() - rootCAs.AppendCertsFromPEM([]byte(c)) + p := creds.GetProperty(credentials.ATTR_IDENTITY_TOKEN) + if p == "" { + p = creds.GetProperty(credentials.ATTR_PASSWORD) } + pw := "" + if p != "" { + pw = "***" + } + logger.Trace("query credentials", ocmlog.ATTR_USER, creds.GetProperty(credentials.ATTR_USERNAME), "pass", pw) + return creds.GetProperty(credentials.ATTR_USERNAME), p, nil + } + logger.Trace("no credentials") + return "", "", nil + }, + DefaultScheme: r.info.Scheme, + //nolint:gosec // used like the default, there are OCI servers (quay.io) not working with min version. + DefaultTLS: func() *tls.Config { + if r.info.Scheme == "http" { + return nil } - if rootCAs == nil { - rootCAs = rootcertsattr.Get(r.GetContext()).GetRootCertPool(true) + return &tls.Config{ + // MinVersion: tls.VersionTLS13, + RootCAs: func() *x509.CertPool { + var rootCAs *x509.CertPool + if creds != nil { + c := creds.GetProperty(credentials.ATTR_CERTIFICATE_AUTHORITY) + if c != "" { + rootCAs = x509.NewCertPool() + rootCAs.AppendCertsFromPEM([]byte(c)) + } + } + if rootCAs == nil { + rootCAs = rootcertsattr.Get(r.GetContext()).GetRootCertPool(true) + } + return rootCAs + }(), } - return rootCAs }(), - } - - client = &http.Client{ - Transport: retry.NewTransport(&http.Transport{ - TLSClientConfig: conf, - }), - } - } else { - repo.PlainHTTP = true - } - - repo.Client = &auth.Client{ - Client: client, - Cache: auth.NewCache(), - Credential: auth.StaticCredential(r.info.HostPort(), authCreds), + })), } - return repo, nil + return docker.NewResolver(opts), nil } func (r *RepositoryImpl) GetRef(comp, vers string) string { @@ -197,14 +188,14 @@ func (r *RepositoryImpl) GetBaseURL() string { } func (r *RepositoryImpl) ExistsArtifact(name string, version string) (bool, error) { - ref := r.GetRef(name, version) - res, err := r.getResolver(ref, name) + res, err := r.getResolver(name) if err != nil { return false, err } - - if _, err = res.Resolve(context.Background(), ref); err != nil { - if errors.Is(err, errdef.ErrNotFound) { + ref := r.GetRef(name, version) + _, _, err = res.Resolve(context.Background(), ref) + if err != nil { + if errdefs.IsNotFound(err) { return false, nil } return false, err diff --git a/api/oci/extensions/repositories/ocireg/utils.go b/api/oci/extensions/repositories/ocireg/utils.go index 828582f06..17a96f040 100644 --- a/api/oci/extensions/repositories/ocireg/utils.go +++ b/api/oci/extensions/repositories/ocireg/utils.go @@ -2,21 +2,19 @@ package ocireg import ( "context" - "errors" "fmt" "io" "sync" "github.com/containerd/containerd/remotes" + "github.com/containerd/errdefs" "github.com/containerd/log" "github.com/opencontainers/go-digest" "github.com/sirupsen/logrus" - "oras.land/oras-go/v2/content" - "oras.land/oras-go/v2/errdef" - "oras.land/oras-go/v2/registry" "ocm.software/ocm/api/oci/artdesc" "ocm.software/ocm/api/oci/cpi" + "ocm.software/ocm/api/tech/docker/resolve" "ocm.software/ocm/api/utils/accessio" "ocm.software/ocm/api/utils/blobaccess/blobaccess" "ocm.software/ocm/api/utils/logging" @@ -26,40 +24,32 @@ import ( type dataAccess struct { accessio.NopCloser - lock sync.Mutex - repo registry.Repository - desc artdesc.Descriptor - reader io.ReadCloser + lock sync.Mutex + fetcher remotes.Fetcher + desc artdesc.Descriptor + reader io.ReadCloser } var _ cpi.DataAccess = (*dataAccess)(nil) -func NewDataAccess(repo registry.Repository, digest digest.Digest, delayed bool) (*dataAccess, error) { +func NewDataAccess(fetcher remotes.Fetcher, digest digest.Digest, mimeType string, delayed bool) (*dataAccess, error) { var reader io.ReadCloser - // First, we try to resolve a blob if a blob was already provided, this will work. - desc, err := repo.Blobs().Resolve(dummyContext, digest.String()) - if err != nil { - if errors.Is(err, errdef.ErrNotFound) { - // If the provided digest was that of a manifest, the second try will find - // the manifest, because the first one didn't find the blob. - desc, err = repo.Resolve(dummyContext, digest.String()) - if err != nil { - return nil, err - } - } else { - return nil, fmt.Errorf("failed to resolve descriptor with digest %s: %w", digest.String(), err) - } + var err error + desc := artdesc.Descriptor{ + MediaType: mimeType, + Digest: digest, + Size: blobaccess.BLOB_UNKNOWN_SIZE, } if !delayed { - reader, err = repo.Fetch(dummyContext, desc) + reader, err = fetcher.Fetch(dummyContext, desc) if err != nil { - return nil, fmt.Errorf("failed to fetch descriptor: %w", err) + return nil, err } } return &dataAccess{ - repo: repo, - desc: desc, - reader: reader, + fetcher: fetcher, + desc: desc, + reader: reader, }, nil } @@ -75,7 +65,7 @@ func (d *dataAccess) Reader() (io.ReadCloser, error) { if reader != nil { return reader, nil } - return d.repo.Fetch(dummyContext, d.desc) + return d.fetcher.Fetch(dummyContext, d.desc) } func readAll(reader io.ReadCloser, err error) ([]byte, error) { @@ -91,32 +81,28 @@ func readAll(reader io.ReadCloser, err error) ([]byte, error) { return data, nil } -func push(ctx context.Context, p content.Pusher, blob blobaccess.BlobAccess) error { +func push(ctx context.Context, p resolve.Pusher, blob blobaccess.BlobAccess) error { desc := *artdesc.DefaultBlobDescriptor(blob) return pushData(ctx, p, desc, blob) } -func pushData(ctx context.Context, p content.Pusher, desc artdesc.Descriptor, data blobaccess.DataAccess) error { +func pushData(ctx context.Context, p resolve.Pusher, desc artdesc.Descriptor, data blobaccess.DataAccess) error { key := remotes.MakeRefKey(ctx, desc) if desc.Size == 0 { desc.Size = -1 } logging.Logger().Debug("*** push blob", "mediatype", desc.MediaType, "digest", desc.Digest, "key", key) - reader, err := data.Reader() + req, err := p.Push(ctx, desc, data) if err != nil { - return err - } - - if err := p.Push(ctx, desc, reader); err != nil { - if errors.Is(err, errdef.ErrAlreadyExists) { + if errdefs.IsAlreadyExists(err) { logging.Logger().Debug("blob already exists", "mediatype", desc.MediaType, "digest", desc.Digest) return nil } return fmt.Errorf("failed to push: %w", err) } - return nil + return req.Commit(ctx, desc.Size, desc.Digest) } var dummyContext = nologger() diff --git a/api/ocm/extensions/pubsub/providers/ocireg/provider.go b/api/ocm/extensions/pubsub/providers/ocireg/provider.go index d2e0e99b2..58c5b435b 100644 --- a/api/ocm/extensions/pubsub/providers/ocireg/provider.go +++ b/api/ocm/extensions/pubsub/providers/ocireg/provider.go @@ -5,8 +5,8 @@ import ( "fmt" "path" + containererr "github.com/containerd/containerd/remotes/errors" "github.com/mandelsoft/goutils/errors" - "oras.land/oras-go/v2/registry/remote/errcode" "ocm.software/ocm/api/ocm/cpi" "ocm.software/ocm/api/ocm/cpi/repocpi" @@ -45,18 +45,10 @@ func (p *Provider) GetPubSubSpec(repo repocpi.Repository) (pubsub.PubSubSpec, er ocirepo := path.Join(gen.Meta().SubPath, componentmapping.ComponentDescriptorNamespace) acc, err := gen.OCIRepository().LookupArtifact(ocirepo, META) - - // Dirty workaround until fix is ready for https://github.com/open-component-model/ocm/issues/872 - errCode := errcode.Error{} - if errors.As(err, &errCode) { - if errCode.Code == errcode.ErrorCodeDenied { - return nil, nil - } - } - - if errors.IsErrNotFound(err) || errors.IsErrUnknown(err) { + if errors.IsErrNotFound(err) || errors.IsErrUnknown(err) || errors.IsA(err, containererr.ErrUnexpectedStatus{}) { return nil, nil } + if err != nil { return nil, errors.Wrapf(err, "cannot access meta data manifest version") } diff --git a/api/tech/docker/README.md b/api/tech/docker/README.md new file mode 100644 index 000000000..096a9c1e1 --- /dev/null +++ b/api/tech/docker/README.md @@ -0,0 +1,4 @@ +# containerd + +Taken from github.com/containerd/containerd remotes/docker to add list endpoints +Fix retry of requests with ResendBuffer diff --git a/api/tech/docker/errors/errors.go b/api/tech/docker/errors/errors.go new file mode 100644 index 000000000..a158f75b5 --- /dev/null +++ b/api/tech/docker/errors/errors.go @@ -0,0 +1,58 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package errors + +import ( + "fmt" + "io" + "net/http" +) + +var _ error = ErrUnexpectedStatus{} + +// ErrUnexpectedStatus is returned if a registry API request returned with unexpected HTTP status +type ErrUnexpectedStatus struct { + Status string + StatusCode int + Body []byte + RequestURL, RequestMethod string +} + +func (e ErrUnexpectedStatus) Error() string { + if len(e.Body) > 0 { + return fmt.Sprintf("unexpected status from %s request to %s: %s: %s", e.RequestMethod, e.RequestURL, e.Status, string(e.Body)) + } + return fmt.Sprintf("unexpected status from %s request to %s: %s", e.RequestMethod, e.RequestURL, e.Status) +} + +// NewUnexpectedStatusErr creates an ErrUnexpectedStatus from HTTP response +func NewUnexpectedStatusErr(resp *http.Response) error { + var b []byte + if resp.Body != nil { + b, _ = io.ReadAll(io.LimitReader(resp.Body, 64000)) // 64KB + } + err := ErrUnexpectedStatus{ + Body: b, + Status: resp.Status, + StatusCode: resp.StatusCode, + RequestMethod: resp.Request.Method, + } + if resp.Request.URL != nil { + err.RequestURL = resp.Request.URL.String() + } + return err +} diff --git a/api/tech/docker/fetcher.go b/api/tech/docker/fetcher.go new file mode 100644 index 000000000..4a2eec584 --- /dev/null +++ b/api/tech/docker/fetcher.go @@ -0,0 +1,202 @@ +package docker + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + + "ocm.software/ocm/api/utils/accessio" +) + +type dockerFetcher struct { + *dockerBase +} + +func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) { + ctx = log.WithLogger(ctx, log.G(ctx).WithField("digest", desc.Digest)) + + hosts := r.filterHosts(HostCapabilityPull) + if len(hosts) == 0 { + return nil, errors.Wrap(errdefs.ErrNotFound, "no pull hosts") + } + + ctx, err := ContextWithRepositoryScope(ctx, r.refspec, false) + if err != nil { + return nil, err + } + + return newHTTPReadSeeker(desc.Size, func(offset int64) (io.ReadCloser, error) { + // firstly try fetch via external urls + for _, us := range desc.URLs { + ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", us)) + + u, err := url.Parse(us) + if err != nil { + log.G(ctx).WithError(err).Debug("failed to parse") + continue + } + if u.Scheme != "http" && u.Scheme != "https" { + log.G(ctx).Debug("non-http(s) alternative url is unsupported") + continue + } + log.G(ctx).Debug("trying alternative url") + + // Try this first, parse it + host := RegistryHost{ + Client: http.DefaultClient, + Host: u.Host, + Scheme: u.Scheme, + Path: u.Path, + Capabilities: HostCapabilityPull, + } + req := r.request(host, http.MethodGet) + // Strip namespace from base + req.path = u.Path + if u.RawQuery != "" { + req.path = req.path + "?" + u.RawQuery + } + + rc, err := r.open(ctx, req, desc.MediaType, offset) + if err != nil { + if errdefs.IsNotFound(err) { + continue // try one of the other urls. + } + + return nil, err + } + + return rc, nil + } + + // Try manifests endpoints for manifests types + switch desc.MediaType { + case images.MediaTypeDockerSchema2Manifest, images.MediaTypeDockerSchema2ManifestList, + images.MediaTypeDockerSchema1Manifest, + ocispec.MediaTypeImageManifest, ocispec.MediaTypeImageIndex: + + var firstErr error + for _, host := range r.hosts { + req := r.request(host, http.MethodGet, "manifests", desc.Digest.String()) + if err := req.addNamespace(r.refspec.Hostname()); err != nil { + return nil, err + } + + rc, err := r.open(ctx, req, desc.MediaType, offset) + if err != nil { + // Store the error for referencing later + if firstErr == nil { + firstErr = err + } + continue // try another host + } + + return rc, nil + } + + return nil, firstErr + } + + // Finally use blobs endpoints + var firstErr error + for _, host := range r.hosts { + req := r.request(host, http.MethodGet, "blobs", desc.Digest.String()) + if err := req.addNamespace(r.refspec.Hostname()); err != nil { + return nil, err + } + + rc, err := r.open(ctx, req, desc.MediaType, offset) + if err != nil { + // Store the error for referencing later + if firstErr == nil { + firstErr = err + } + continue // try another host + } + + return rc, nil + } + + if errdefs.IsNotFound(firstErr) { + firstErr = errors.Wrapf(errdefs.ErrNotFound, + "could not fetch content descriptor %v (%v) from remote", + desc.Digest, desc.MediaType) + } + + return nil, firstErr + }) +} + +func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string, offset int64) (_ io.ReadCloser, retErr error) { + mt := "*/*" + if mediatype != "" { + mt = mediatype + ", " + mt + } + req.header.Set("Accept", mt) + + if offset > 0 { + // Note: "Accept-Ranges: bytes" cannot be trusted as some endpoints + // will return the header without supporting the range. The content + // range must always be checked. + req.header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) + } + + resp, err := req.doWithRetries(ctx, nil) + if err != nil { + return nil, accessio.RetriableError(err) + } + defer func() { + if retErr != nil { + resp.Body.Close() + } + }() + + if resp.StatusCode > 299 { + // TODO(stevvooe): When doing a offset specific request, we should + // really distinguish between a 206 and a 200. In the case of 200, we + // can discard the bytes, hiding the seek behavior from the + // implementation. + + if resp.StatusCode == http.StatusNotFound { + return nil, errors.Wrapf(errdefs.ErrNotFound, "content at %v not found", req.String()) + } + var registryErr Errors + if err := json.NewDecoder(resp.Body).Decode(®istryErr); err != nil || registryErr.Len() < 1 { + return nil, errors.Errorf("unexpected status code %v: %v", req.String(), resp.Status) + } + return nil, errors.Errorf("unexpected status code %v: %s - Server message: %s", req.String(), resp.Status, registryErr.Error()) + } + if offset > 0 { + cr := resp.Header.Get("content-range") + if cr != "" { + if !strings.HasPrefix(cr, fmt.Sprintf("bytes %d-", offset)) { + return nil, errors.Errorf("unhandled content range in response: %v", cr) + } + } else { + // TODO: Should any cases where use of content range + // without the proper header be considered? + // 206 responses? + + // Discard up to offset + // Could use buffer pool here but this case should be rare + n, err := io.Copy(io.Discard, io.LimitReader(resp.Body, offset)) + if err != nil { + return nil, errors.Wrap(err, "failed to discard to offset") + } + if n != offset { + return nil, errors.Errorf("unable to discard to offset") + } + } + } + + return resp.Body, nil +} diff --git a/api/tech/docker/handler.go b/api/tech/docker/handler.go new file mode 100644 index 000000000..0ff9959ad --- /dev/null +++ b/api/tech/docker/handler.go @@ -0,0 +1,136 @@ +package docker + +import ( + "context" + "fmt" + "net/url" + "strings" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/labels" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/reference" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +// labelDistributionSource describes the source blob comes from. +var labelDistributionSource = "containerd.io/distribution.source" + +// AppendDistributionSourceLabel updates the label of blob with distribution source. +func AppendDistributionSourceLabel(manager content.Manager, ref string) (images.HandlerFunc, error) { + refspec, err := reference.Parse(ref) + if err != nil { + return nil, err + } + + u, err := url.Parse("dummy://" + refspec.Locator) + if err != nil { + return nil, err + } + + source, repo := u.Hostname(), strings.TrimPrefix(u.Path, "/") + return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + info, err := manager.Info(ctx, desc.Digest) + if err != nil { + return nil, err + } + + key := distributionSourceLabelKey(source) + + originLabel := "" + if info.Labels != nil { + originLabel = info.Labels[key] + } + value := appendDistributionSourceLabel(originLabel, repo) + + // The repo name has been limited under 256 and the distribution + // label might hit the limitation of label size, when blob data + // is used as the very, very common layer. + if err := labels.Validate(key, value); err != nil { + log.G(ctx).Warnf("skip to append distribution label: %s", err) + return nil, nil + } + + info = content.Info{ + Digest: desc.Digest, + Labels: map[string]string{ + key: value, + }, + } + _, err = manager.Update(ctx, info, fmt.Sprintf("labels.%s", key)) + return nil, err + }, nil +} + +func appendDistributionSourceLabel(originLabel, repo string) string { + repos := []string{} + if originLabel != "" { + repos = strings.Split(originLabel, ",") + } + repos = append(repos, repo) + + // use empty string to present duplicate items + for i := 1; i < len(repos); i++ { + tmp, j := repos[i], i-1 + for ; j >= 0 && repos[j] >= tmp; j-- { + if repos[j] == tmp { + tmp = "" + } + repos[j+1] = repos[j] + } + repos[j+1] = tmp + } + + i := 0 + for ; i < len(repos) && repos[i] == ""; i++ { + } + + return strings.Join(repos[i:], ",") +} + +func distributionSourceLabelKey(source string) string { + return fmt.Sprintf("%s.%s", labelDistributionSource, source) +} + +// selectRepositoryMountCandidate will select the repo which has longest +// common prefix components as the candidate. +func selectRepositoryMountCandidate(refspec reference.Spec, sources map[string]string) string { + u, err := url.Parse("dummy://" + refspec.Locator) + if err != nil { + // NOTE: basically, it won't be error here + return "" + } + + source, target := u.Hostname(), strings.TrimPrefix(u.Path, "/") + repoLabel, ok := sources[distributionSourceLabelKey(source)] + if !ok || repoLabel == "" { + return "" + } + + n, match := 0, "" + components := strings.Split(target, "/") + for _, repo := range strings.Split(repoLabel, ",") { + // the target repo is not a candidate + if repo == target { + continue + } + + if l := commonPrefixComponents(components, repo); l >= n { + n, match = l, repo + } + } + return match +} + +func commonPrefixComponents(components []string, target string) int { + targetComponents := strings.Split(target, "/") + + i := 0 + for ; i < len(components) && i < len(targetComponents); i++ { + if components[i] != targetComponents[i] { + break + } + } + return i +} diff --git a/api/tech/docker/httpreadseeker.go b/api/tech/docker/httpreadseeker.go new file mode 100644 index 000000000..c6b803810 --- /dev/null +++ b/api/tech/docker/httpreadseeker.go @@ -0,0 +1,157 @@ +package docker + +import ( + "bytes" + "io" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" + "github.com/pkg/errors" +) + +const maxRetry = 3 + +type httpReadSeeker struct { + size int64 + offset int64 + rc io.ReadCloser + open func(offset int64) (io.ReadCloser, error) + closed bool + + errsWithNoProgress int +} + +func newHTTPReadSeeker(size int64, open func(offset int64) (io.ReadCloser, error)) (io.ReadCloser, error) { + return &httpReadSeeker{ + size: size, + open: open, + }, nil +} + +func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) { + if hrs.closed { + return 0, io.EOF + } + + rd, err := hrs.reader() + if err != nil { + return 0, err + } + + n, err = rd.Read(p) + hrs.offset += int64(n) + if n > 0 || err == nil { + hrs.errsWithNoProgress = 0 + } + + if !errors.Is(err, io.ErrUnexpectedEOF) { + return + } + // connection closed unexpectedly. try reconnecting. + if n == 0 { + hrs.errsWithNoProgress++ + if hrs.errsWithNoProgress > maxRetry { + return // too many retries for this offset with no progress + } + } + + if hrs.rc != nil { + if clsErr := hrs.rc.Close(); clsErr != nil { + log.L.WithError(clsErr).Errorf("httpReadSeeker: failed to close ReadCloser") + } + hrs.rc = nil + } + + if _, err2 := hrs.reader(); err2 == nil { + return n, nil + } + + return n, err +} + +func (hrs *httpReadSeeker) Close() error { + if hrs.closed { + return nil + } + hrs.closed = true + if hrs.rc != nil { + return hrs.rc.Close() + } + + return nil +} + +func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) { + if hrs.closed { + return 0, errors.Wrap(errdefs.ErrUnavailable, "Fetcher.Seek: closed") + } + + abs := hrs.offset + switch whence { + case io.SeekStart: + abs = offset + case io.SeekCurrent: + abs += offset + case io.SeekEnd: + if hrs.size == -1 { + return 0, errors.Wrap(errdefs.ErrUnavailable, "Fetcher.Seek: unknown size, cannot seek from end") + } + abs = hrs.size + offset + default: + return 0, errors.Wrap(errdefs.ErrInvalidArgument, "Fetcher.Seek: invalid whence") + } + + if abs < 0 { + return 0, errors.Wrapf(errdefs.ErrInvalidArgument, "Fetcher.Seek: negative offset") + } + + if abs != hrs.offset { + if hrs.rc != nil { + if err := hrs.rc.Close(); err != nil { + log.L.WithError(err).Errorf("Fetcher.Seek: failed to close ReadCloser") + } + + hrs.rc = nil + } + + hrs.offset = abs + } + + return hrs.offset, nil +} + +func (hrs *httpReadSeeker) reader() (io.Reader, error) { + if hrs.rc != nil { + return hrs.rc, nil + } + + if hrs.size == -1 || hrs.offset < hrs.size { + // only try to reopen the body request if we are seeking to a value + // less than the actual size. + if hrs.open == nil { + return nil, errors.Wrapf(errdefs.ErrNotImplemented, "cannot open") + } + + rc, err := hrs.open(hrs.offset) + if err != nil { + return nil, errors.Wrapf(err, "httpReadSeeker: failed open") + } + + if hrs.rc != nil { + if err := hrs.rc.Close(); err != nil { + log.L.WithError(err).Errorf("httpReadSeeker: failed to close ReadCloser") + } + } + hrs.rc = rc + } else { + // There is an edge case here where offset == size of the content. If + // we seek, we will probably get an error for content that cannot be + // sought (?). In that case, we should err on committing the content, + // as the length is already satisfied but we just return the empty + // reader instead. + + hrs.rc = io.NopCloser(bytes.NewReader([]byte{})) + } + + return hrs.rc, nil +} diff --git a/api/tech/docker/lister.go b/api/tech/docker/lister.go new file mode 100644 index 000000000..efd3b8e1e --- /dev/null +++ b/api/tech/docker/lister.go @@ -0,0 +1,130 @@ +package docker + +import ( + "context" + "encoding/json" + "io" + "net/http" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" + "github.com/pkg/errors" + + "ocm.software/ocm/api/tech/docker/resolve" +) + +var ErrObjectNotRequired = errors.New("object not required") + +type TagList struct { + Name string `json:"name"` + Tags []string `json:"tags"` +} + +type dockerLister struct { + dockerBase *dockerBase +} + +func (r *dockerResolver) Lister(ctx context.Context, ref string) (resolve.Lister, error) { + base, err := r.resolveDockerBase(ref) + if err != nil { + return nil, err + } + if base.refspec.Object != "" { + return nil, ErrObjectNotRequired + } + + return &dockerLister{ + dockerBase: base, + }, nil +} + +func (r *dockerLister) List(ctx context.Context) ([]string, error) { + refspec := r.dockerBase.refspec + base := r.dockerBase + var ( + firstErr error + paths [][]string + caps = HostCapabilityPull + ) + + // turns out, we have a valid digest, make a url. + paths = append(paths, []string{"tags/list"}) + caps |= HostCapabilityResolve + + hosts := base.filterHosts(caps) + if len(hosts) == 0 { + return nil, errors.Wrap(errdefs.ErrNotFound, "no list hosts") + } + + ctx, err := ContextWithRepositoryScope(ctx, refspec, false) + if err != nil { + return nil, err + } + + for _, u := range paths { + for _, host := range hosts { + ctxWithLogger := log.WithLogger(ctx, log.G(ctx).WithField("host", host.Host)) + + req := base.request(host, http.MethodGet, u...) + if err := req.addNamespace(base.refspec.Hostname()); err != nil { + return nil, err + } + + req.header["Accept"] = []string{"application/json"} + + log.G(ctxWithLogger).Debug("listing") + resp, err := req.doWithRetries(ctxWithLogger, nil) + if err != nil { + if errors.Is(err, ErrInvalidAuthorization) { + err = errors.Wrapf(err, "pull access denied, repository does not exist or may require authorization") + } + // Store the error for referencing later + if firstErr == nil { + firstErr = err + } + log.G(ctxWithLogger).WithError(err).Info("trying next host") + continue // try another host + } + + if resp.StatusCode > 299 { + resp.Body.Close() + if resp.StatusCode == http.StatusNotFound { + log.G(ctxWithLogger).Info("trying next host - response was http.StatusNotFound") + continue + } + if resp.StatusCode > 399 { + // Set firstErr when encountering the first non-404 status code. + if firstErr == nil { + firstErr = errors.Errorf("pulling from host %s failed with status code %v: %v", host.Host, u, resp.Status) + } + continue // try another host + } + return nil, errors.Errorf("taglist from host %s failed with unexpected status code %v: %v", host.Host, u, resp.Status) + } + + data, err := io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + return nil, err + } + + tags := &TagList{} + + err = json.Unmarshal(data, tags) + if err != nil { + return nil, err + } + return tags.Tags, nil + } + } + + // If above loop terminates without return, then there was an error. + // "firstErr" contains the first non-404 error. That is, "firstErr == nil" + // means that either no registries were given or each registry returned 404. + + if firstErr == nil { + firstErr = errors.Wrap(errdefs.ErrNotFound, base.refspec.Locator) + } + + return nil, firstErr +} diff --git a/api/tech/docker/orig.go b/api/tech/docker/orig.go new file mode 100644 index 000000000..c9b2468fb --- /dev/null +++ b/api/tech/docker/orig.go @@ -0,0 +1,44 @@ +package docker + +import ( + "github.com/containerd/containerd/remotes/docker" +) + +var ( + ContextWithRepositoryScope = docker.ContextWithRepositoryScope + ContextWithAppendPullRepositoryScope = docker.ContextWithAppendPullRepositoryScope + NewInMemoryTracker = docker.NewInMemoryTracker + NewDockerAuthorizer = docker.NewDockerAuthorizer + WithAuthClient = docker.WithAuthClient + WithAuthHeader = docker.WithAuthHeader + WithAuthCreds = docker.WithAuthCreds +) + +type ( + Errors = docker.Errors + StatusTracker = docker.StatusTracker + Status = docker.Status + StatusTrackLocker = docker.StatusTrackLocker +) + +func ConvertHosts(hosts docker.RegistryHosts) RegistryHosts { + return func(host string) ([]RegistryHost, error) { + list, err := hosts(host) + if err != nil { + return nil, err + } + result := make([]RegistryHost, len(list)) + for i, v := range list { + result[i] = RegistryHost{ + Client: v.Client, + Authorizer: v.Authorizer, + Host: v.Host, + Scheme: v.Scheme, + Path: v.Path, + Capabilities: HostCapabilities(v.Capabilities), + Header: v.Header, + } + } + return result, nil + } +} diff --git a/api/tech/docker/pusher.go b/api/tech/docker/pusher.go new file mode 100644 index 000000000..708ad0f34 --- /dev/null +++ b/api/tech/docker/pusher.go @@ -0,0 +1,433 @@ +package docker + +import ( + "context" + "io" + "net/http" + "net/url" + "strings" + "time" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/remotes" + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + remoteserrors "ocm.software/ocm/api/tech/docker/errors" + "ocm.software/ocm/api/tech/docker/resolve" + "ocm.software/ocm/api/utils/accessio" +) + +func init() { + l := logrus.New() + l.Level = logrus.WarnLevel + log.L = logrus.NewEntry(l) +} + +type dockerPusher struct { + *dockerBase + object string + + // TODO: namespace tracker + tracker StatusTracker +} + +func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor, src resolve.Source) (resolve.PushRequest, error) { + return p.push(ctx, desc, src, remotes.MakeRefKey(ctx, desc), false) +} + +func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, src resolve.Source, ref string, unavailableOnFail bool) (resolve.PushRequest, error) { + if l, ok := p.tracker.(StatusTrackLocker); ok { + l.Lock(ref) + defer l.Unlock(ref) + } + ctx, err := ContextWithRepositoryScope(ctx, p.refspec, true) + if err != nil { + return nil, err + } + status, err := p.tracker.GetStatus(ref) + if err == nil { + if status.Committed && status.Offset == status.Total { + return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "ref %v", ref) + } + if unavailableOnFail { + // Another push of this ref is happening elsewhere. The rest of function + // will continue only when `errdefs.IsNotFound(err) == true` (i.e. there + // is no actively-tracked ref already). + return nil, errors.Wrap(errdefs.ErrUnavailable, "push is on-going") + } + // TODO: Handle incomplete status + } else if !errdefs.IsNotFound(err) { + return nil, errors.Wrap(err, "failed to get status") + } + + hosts := p.filterHosts(HostCapabilityPush) + if len(hosts) == 0 { + return nil, errors.Wrap(errdefs.ErrNotFound, "no push hosts") + } + + var ( + isManifest bool + existCheck []string + host = hosts[0] + ) + + switch desc.MediaType { + case images.MediaTypeDockerSchema2Manifest, images.MediaTypeDockerSchema2ManifestList, + ocispec.MediaTypeImageManifest, ocispec.MediaTypeImageIndex: + isManifest = true + existCheck = getManifestPath(p.object, desc.Digest) + default: + existCheck = []string{"blobs", desc.Digest.String()} + } + + req := p.request(host, http.MethodHead, existCheck...) + req.header.Set("Accept", strings.Join([]string{desc.MediaType, `*/*`}, ", ")) + + log.G(ctx).WithField("url", req.String()).Debugf("checking and pushing to") + + headResp, err := req.doWithRetries(ctx, nil) + if err != nil { + if !errors.Is(err, ErrInvalidAuthorization) { + return nil, err + } + log.G(ctx).WithError(err).Debugf("Unable to check existence, continuing with push") + } else { + defer headResp.Body.Close() + + if headResp.StatusCode == http.StatusOK { + var exists bool + if isManifest && existCheck[1] != desc.Digest.String() { + dgstHeader := digest.Digest(headResp.Header.Get("Docker-Content-Digest")) + if dgstHeader == desc.Digest { + exists = true + } + } else { + exists = true + } + + if exists { + p.tracker.SetStatus(ref, Status{ + Committed: true, + Status: content.Status{ + Ref: ref, + Total: desc.Size, + Offset: desc.Size, + // TODO: Set updated time? + }, + }) + + return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v on remote", desc.Digest) + } + } else if headResp.StatusCode != http.StatusNotFound { + err := remoteserrors.NewUnexpectedStatusErr(headResp) + + var statusError remoteserrors.ErrUnexpectedStatus + if errors.As(err, &statusError) { + log.G(ctx). + WithField("resp", headResp). + WithField("body", string(statusError.Body)). + Debug("unexpected response") + } + + return nil, accessio.RetriableError(err) + } + } + + if isManifest { + putPath := getManifestPath(p.object, desc.Digest) + req = p.request(host, http.MethodPut, putPath...) + req.header.Add("Content-Type", desc.MediaType) + } else { + // Start upload request + req = p.request(host, http.MethodPost, "blobs", "uploads/") + + var resp *http.Response + if fromRepo := selectRepositoryMountCandidate(p.refspec, desc.Annotations); fromRepo != "" { + preq := requestWithMountFrom(req, desc.Digest.String(), fromRepo) + pctx := ContextWithAppendPullRepositoryScope(ctx, fromRepo) + + // NOTE: the fromRepo might be private repo and + // auth service still can grant token without error. + // but the post request will fail because of 401. + // + // for the private repo, we should remove mount-from + // query and send the request again. + resp, err = preq.doWithRetries(pctx, nil) + if err != nil { + return nil, accessio.RetriableError(err) + } + + if resp.StatusCode == http.StatusUnauthorized { + log.G(ctx).Debugf("failed to mount from repository %s", fromRepo) + + resp.Body.Close() + resp = nil + } + } + + if resp == nil { + resp, err = req.doWithRetries(ctx, nil) + if err != nil { + return nil, accessio.RetriableError(err) + } + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusOK, http.StatusAccepted, http.StatusNoContent: + case http.StatusCreated: + p.tracker.SetStatus(ref, Status{ + Committed: true, + Status: content.Status{ + Ref: ref, + Total: desc.Size, + Offset: desc.Size, + }, + }) + return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v on remote", desc.Digest) + default: + err := remoteserrors.NewUnexpectedStatusErr(resp) + + var statusError remoteserrors.ErrUnexpectedStatus + if errors.As(err, &statusError) { + log.G(ctx). + WithField("resp", resp). + WithField("body", string(statusError.Body)). + Debug("unexpected response") + } + + return nil, err + } + + var ( + location = resp.Header.Get("Location") + lurl *url.URL + lhost = host + ) + // Support paths without host in location + if strings.HasPrefix(location, "/") { + lurl, err = url.Parse(lhost.Scheme + "://" + lhost.Host + location) + if err != nil { + return nil, errors.Wrapf(err, "unable to parse location %v", location) + } + } else { + if !strings.Contains(location, "://") { + location = lhost.Scheme + "://" + location + } + lurl, err = url.Parse(location) + if err != nil { + return nil, errors.Wrapf(err, "unable to parse location %v", location) + } + + if lurl.Host != lhost.Host || lhost.Scheme != lurl.Scheme { + lhost.Scheme = lurl.Scheme + lhost.Host = lurl.Host + log.G(ctx).WithField("host", lhost.Host).WithField("scheme", lhost.Scheme).Debug("upload changed destination") + + // Strip authorizer if change to host or scheme + lhost.Authorizer = nil + } + } + q := lurl.Query() + q.Add("digest", desc.Digest.String()) + + req = p.request(lhost, http.MethodPut) + req.header.Set("Content-Type", "application/octet-stream") + req.path = lurl.Path + "?" + q.Encode() + } + p.tracker.SetStatus(ref, Status{ + Status: content.Status{ + Ref: ref, + Total: desc.Size, + Expected: desc.Digest, + StartedAt: time.Now(), + }, + }) + + // TODO: Support chunked upload + + respC := make(chan response, 1) + + preq := &pushRequest{ + base: p.dockerBase, + ref: ref, + responseC: respC, + source: src, + isManifest: isManifest, + expected: desc.Digest, + tracker: p.tracker, + } + + req.body = preq.Reader + req.size = desc.Size + + go func() { + defer close(respC) + resp, err := req.doWithRetries(ctx, nil) + if err != nil { + respC <- response{err: err} + return + } + + switch resp.StatusCode { + case http.StatusOK, http.StatusCreated, http.StatusNoContent: + default: + err := remoteserrors.NewUnexpectedStatusErr(resp) + + var statusError remoteserrors.ErrUnexpectedStatus + if errors.As(err, &statusError) { + log.G(ctx). + WithField("resp", resp). + WithField("body", string(statusError.Body)). + Debug("unexpected response") + } + } + respC <- response{Response: resp} + }() + + return preq, nil +} + +func getManifestPath(object string, dgst digest.Digest) []string { + if i := strings.IndexByte(object, '@'); i >= 0 { + if object[i+1:] != dgst.String() { + // use digest, not tag + object = "" + } else { + // strip @ for registry path to make tag + object = object[:i] + } + } + + if object == "" { + return []string{"manifests", dgst.String()} + } + + return []string{"manifests", object} +} + +type response struct { + *http.Response + err error +} + +type pushRequest struct { + base *dockerBase + ref string + + responseC <-chan response + source resolve.Source + isManifest bool + + expected digest.Digest + tracker StatusTracker +} + +func (pw *pushRequest) Status() (content.Status, error) { + status, err := pw.tracker.GetStatus(pw.ref) + if err != nil { + return content.Status{}, err + } + return status.Status, nil +} + +func (pw *pushRequest) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + // TODO: timeout waiting for response + resp := <-pw.responseC + if resp.err != nil { + return resp.err + } + defer resp.Response.Body.Close() + + // 201 is specified return status, some registries return + // 200, 202 or 204. + switch resp.StatusCode { + case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: + default: + return remoteserrors.NewUnexpectedStatusErr(resp.Response) + } + + status, err := pw.tracker.GetStatus(pw.ref) + if err != nil { + return errors.Wrap(err, "failed to get status") + } + + if size > 0 && size != status.Offset { + return errors.Errorf("unexpected size %d, expected %d", status.Offset, size) + } + + if expected == "" { + expected = status.Expected + } + + actual, err := digest.Parse(resp.Header.Get("Docker-Content-Digest")) + if err != nil { + return errors.Wrap(err, "invalid content digest in response") + } + + if actual != expected { + return errors.Errorf("got digest %s, expected %s", actual, expected) + } + + status.Committed = true + status.UpdatedAt = time.Now() + pw.tracker.SetStatus(pw.ref, status) + + return nil +} + +func (pw *pushRequest) Reader() (io.ReadCloser, error) { + status, err := pw.tracker.GetStatus(pw.ref) + if err != nil { + return nil, err + } + status.Offset = 0 + status.UpdatedAt = time.Now() + pw.tracker.SetStatus(pw.ref, status) + + r, err := pw.source.Reader() + if err != nil { + return nil, err + } + return &sizeTrackingReader{pw, r}, nil +} + +type sizeTrackingReader struct { + pw *pushRequest + io.ReadCloser +} + +func (t *sizeTrackingReader) Read(in []byte) (int, error) { + // fmt.Printf("reading next...\n") + n, err := t.ReadCloser.Read(in) + if n > 0 { + status, err := t.pw.tracker.GetStatus(t.pw.ref) + // fmt.Printf("read %d[%d] bytes\n", n, status.Offset) + if err != nil { + return n, err + } + status.Offset += int64(n) + status.UpdatedAt = time.Now() + t.pw.tracker.SetStatus(t.pw.ref, status) + } + return n, err +} + +func requestWithMountFrom(req *request, mount, from string) *request { + creq := *req + + sep := "?" + if strings.Contains(creq.path, sep) { + sep = "&" + } + + creq.path = creq.path + sep + "mount=" + mount + "&from=" + from + + return &creq +} diff --git a/api/tech/docker/registry.go b/api/tech/docker/registry.go new file mode 100644 index 000000000..795dd6e24 --- /dev/null +++ b/api/tech/docker/registry.go @@ -0,0 +1,234 @@ +package docker + +import ( + "net" + "net/http" + + "github.com/pkg/errors" +) + +// HostCapabilities represent the capabilities of the registry +// host. This also represents the set of operations for which +// the registry host may be trusted to perform. +// +// For example pushing is a capability which should only be +// performed on an upstream source, not a mirror. +// Resolving (the process of converting a name into a digest) +// must be considered a trusted operation and only done by +// a host which is trusted (or more preferably by secure process +// which can prove the provenance of the mapping). A public +// mirror should never be trusted to do a resolve action. +// +// | Registry Type | Pull | Resolve | Push | +// |------------------|------|---------|------| +// | Public Registry | yes | yes | yes | +// | Private Registry | yes | yes | yes | +// | Public Mirror | yes | no | no | +// | Private Mirror | yes | yes | no |. +type HostCapabilities uint8 + +const ( + // HostCapabilityPull represents the capability to fetch manifests + // and blobs by digest. + HostCapabilityPull HostCapabilities = 1 << iota + + // HostCapabilityResolve represents the capability to fetch manifests + // by name. + HostCapabilityResolve + + // HostCapabilityPush represents the capability to push blobs and + // manifests. + HostCapabilityPush + + // Reserved for future capabilities (i.e. search, catalog, remove). +) + +// Has checks whether the capabilities list has the provide capability. +func (c HostCapabilities) Has(t HostCapabilities) bool { + return c&t == t +} + +// RegistryHost represents a complete configuration for a registry +// host, representing the capabilities, authorizations, connection +// configuration, and location. +type RegistryHost struct { + Client *http.Client + Authorizer Authorizer + Host string + Scheme string + Path string + Capabilities HostCapabilities + Header http.Header +} + +const ( + dockerHostname = "docker.io" + dockerRegistryHostname = "registry-1.docker.io" +) + +func (h RegistryHost) isProxy(refhost string) bool { + if refhost != h.Host { + if refhost != dockerHostname || h.Host != dockerRegistryHostname { + return true + } + } + return false +} + +// RegistryHosts fetches the registry hosts for a given namespace, +// provided by the host component of an distribution image reference. +type RegistryHosts func(string) ([]RegistryHost, error) + +// Registries joins multiple registry configuration functions, using the same +// order as provided within the arguments. When an empty registry configuration +// is returned with a nil error, the next function will be called. +// NOTE: This function will not join configurations, as soon as a non-empty +// configuration is returned from a configuration function, it will be returned +// to the caller. +func Registries(registries ...RegistryHosts) RegistryHosts { + return func(host string) ([]RegistryHost, error) { + for _, registry := range registries { + config, err := registry(host) + if err != nil { + return config, err + } + if len(config) > 0 { + return config, nil + } + } + return nil, nil + } +} + +type registryOpts struct { + authorizer Authorizer + plainHTTP func(string) (bool, error) + host func(string) (string, error) + client *http.Client +} + +// RegistryOpt defines a registry default option. +type RegistryOpt func(*registryOpts) + +// WithPlainHTTP configures registries to use plaintext http scheme +// for the provided host match function. +func WithPlainHTTP(f func(string) (bool, error)) RegistryOpt { + return func(opts *registryOpts) { + opts.plainHTTP = f + } +} + +// WithAuthorizer configures the default authorizer for a registry. +func WithAuthorizer(a Authorizer) RegistryOpt { + return func(opts *registryOpts) { + opts.authorizer = a + } +} + +// WithHostTranslator defines the default translator to use for registry hosts. +func WithHostTranslator(h func(string) (string, error)) RegistryOpt { + return func(opts *registryOpts) { + opts.host = h + } +} + +// WithClient configures the default http client for a registry. +func WithClient(c *http.Client) RegistryOpt { + return func(opts *registryOpts) { + opts.client = c + } +} + +// ConfigureDefaultRegistries is used to create a default configuration for +// registries. For more advanced configurations or per-domain setups, +// the RegistryHosts interface should be used directly. +// NOTE: This function will always return a non-empty value or error. +func ConfigureDefaultRegistries(ropts ...RegistryOpt) RegistryHosts { + var opts registryOpts + for _, opt := range ropts { + opt(&opts) + } + + return func(host string) ([]RegistryHost, error) { + config := RegistryHost{ + Client: opts.client, + Authorizer: opts.authorizer, + Host: host, + Scheme: "https", + Path: "/v2", + Capabilities: HostCapabilityPull | HostCapabilityResolve | HostCapabilityPush, + } + + if config.Client == nil { + config.Client = http.DefaultClient + } + + if opts.plainHTTP != nil { + match, err := opts.plainHTTP(host) + if err != nil { + return nil, err + } + if match { + config.Scheme = "http" + } + } + + if opts.host != nil { + var err error + config.Host, err = opts.host(config.Host) + if err != nil { + return nil, err + } + } else if host == dockerHostname { + config.Host = dockerRegistryHostname + } + + return []RegistryHost{config}, nil + } +} + +// MatchAllHosts is a host match function which is always true. +func MatchAllHosts(string) (bool, error) { + return true, nil +} + +// MatchLocalhost is a host match function which returns true for +// localhost. +// +// Note: this does not handle matching of ip addresses in octal, +// decimal or hex form. +func MatchLocalhost(host string) (bool, error) { + switch { + case host == "::1": + return true, nil + case host == "[::1]": + return true, nil + } + h, p, err := net.SplitHostPort(host) + + // addrError helps distinguish between errors of form + // "no colon in address" and "too many colons in address". + // The former is fine as the host string need not have a + // port. Latter needs to be handled. + addrError := &net.AddrError{ + Err: "missing port in address", + Addr: host, + } + if err != nil { + if err.Error() != addrError.Error() { + return false, err + } + // host string without any port specified + h = host + } else if len(p) == 0 { + return false, errors.New("invalid host name format") + } + + // use ipv4 dotted decimal for further checking + if h == "localhost" { + h = "127.0.0.1" + } + ip := net.ParseIP(h) + + return ip.IsLoopback(), nil +} diff --git a/api/tech/docker/resolve/interface.go b/api/tech/docker/resolve/interface.go new file mode 100644 index 000000000..847600001 --- /dev/null +++ b/api/tech/docker/resolve/interface.go @@ -0,0 +1,75 @@ +package resolve + +import ( + "context" + "io" + + "github.com/containerd/containerd/content" + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +// all new and modified + +type Source interface { + Reader() (io.ReadCloser, error) +} + +// Resolver provides remotes based on a locator. +type Resolver interface { + // Resolve attempts to resolve the reference into a name and descriptor. + // + // The argument `ref` should be a scheme-less URI representing the remote. + // Structurally, it has a host and path. The "host" can be used to directly + // reference a specific host or be matched against a specific handler. + // + // The returned name should be used to identify the referenced entity. + // Dependending on the remote namespace, this may be immutable or mutable. + // While the name may differ from ref, it should itself be a valid ref. + // + // If the resolution fails, an error will be returned. + Resolve(ctx context.Context, ref string) (name string, desc ocispec.Descriptor, err error) + + // Fetcher returns a new fetcher for the provided reference. + // All content fetched from the returned fetcher will be + // from the namespace referred to by ref. + Fetcher(ctx context.Context, ref string) (Fetcher, error) + + // Pusher returns a new pusher for the provided reference + // The returned Pusher should satisfy content.Ingester and concurrent attempts + // to push the same blob using the Ingester API should result in ErrUnavailable. + Pusher(ctx context.Context, ref string) (Pusher, error) + + Lister(ctx context.Context, ref string) (Lister, error) +} + +// Fetcher fetches content. +type Fetcher interface { + // Fetch the resource identified by the descriptor. + Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) +} + +// Pusher pushes content +// don't use write interface of containerd remotes.Pusher. +type Pusher interface { + // Push returns a push request for the given resource identified + // by the descriptor and the given data source. + Push(ctx context.Context, d ocispec.Descriptor, src Source) (PushRequest, error) +} + +type Lister interface { + List(context.Context) ([]string, error) +} + +// PushRequest handles the result of a push request +// replaces containerd content.Writer. +type PushRequest interface { + // Commit commits the blob (but no roll-back is guaranteed on an error). + // size and expected can be zero-value when unknown. + // Commit always closes the writer, even on error. + // ErrAlreadyExists aborts the writer. + Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error + + // Status returns the current state of write + Status() (content.Status, error) +} diff --git a/api/tech/docker/resolver.go b/api/tech/docker/resolver.go new file mode 100644 index 000000000..292df03ae --- /dev/null +++ b/api/tech/docker/resolver.go @@ -0,0 +1,656 @@ +package docker + +import ( + "context" + "fmt" + "io" + "net/http" + "net/url" + "path" + "strings" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/reference" + "github.com/containerd/containerd/remotes/docker/schema1" + "github.com/containerd/containerd/version" + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "golang.org/x/net/context/ctxhttp" + + "ocm.software/ocm/api/tech/docker/resolve" + "ocm.software/ocm/api/utils/accessio" +) + +var ( + // ErrInvalidAuthorization is used when credentials are passed to a server but + // those credentials are rejected. + ErrInvalidAuthorization = errors.New("authorization failed") + + // MaxManifestSize represents the largest size accepted from a registry + // during resolution. Larger manifests may be accepted using a + // resolution method other than the registry. + // + // NOTE: The max supported layers by some runtimes is 128 and individual + // layers will not contribute more than 256 bytes, making a + // reasonable limit for a large image manifests of 32K bytes. + // 4M bytes represents a much larger upper bound for images which may + // contain large annotations or be non-images. A proper manifest + // design puts large metadata in subobjects, as is consistent the + // intent of the manifest design. + MaxManifestSize int64 = 4 * 1048 * 1048 +) + +// Authorizer is used to authorize HTTP requests based on 401 HTTP responses. +// An Authorizer is responsible for caching tokens or credentials used by +// requests. +type Authorizer interface { + // Authorize sets the appropriate `Authorization` header on the given + // request. + // + // If no authorization is found for the request, the request remains + // unmodified. It may also add an `Authorization` header as + // "bearer " + // "basic " + Authorize(context.Context, *http.Request) error + + // AddResponses adds a 401 response for the authorizer to consider when + // authorizing requests. The last response should be unauthorized and + // the previous requests are used to consider redirects and retries + // that may have led to the 401. + // + // If response is not handled, returns `ErrNotImplemented` + AddResponses(context.Context, []*http.Response) error +} + +// ResolverOptions are used to configured a new Docker register resolver. +type ResolverOptions struct { + // Hosts returns registry host configurations for a namespace. + Hosts RegistryHosts + + // Headers are the HTTP request header fields sent by the resolver + Headers http.Header + + // Tracker is used to track uploads to the registry. This is used + // since the registry does not have upload tracking and the existing + // mechanism for getting blob upload status is expensive. + Tracker StatusTracker + + // Authorizer is used to authorize registry requests + // Deprecated: use Hosts + Authorizer Authorizer + + // Credentials provides username and secret given a host. + // If username is empty but a secret is given, that secret + // is interpreted as a long lived token. + // Deprecated: use Hosts + Credentials func(string) (string, string, error) + + // Host provides the hostname given a namespace. + // Deprecated: use Hosts + Host func(string) (string, error) + + // PlainHTTP specifies to use plain http and not https + // Deprecated: use Hosts + PlainHTTP bool + + // Client is the http client to used when making registry requests + // Deprecated: use Hosts + Client *http.Client +} + +// DefaultHost is the default host function. +func DefaultHost(ns string) (string, error) { + if ns == "docker.io" { + return "registry-1.docker.io", nil + } + return ns, nil +} + +type dockerResolver struct { + hosts RegistryHosts + header http.Header + resolveHeader http.Header + tracker StatusTracker +} + +// NewResolver returns a new resolver to a Docker registry. +func NewResolver(options ResolverOptions) resolve.Resolver { + if options.Tracker == nil { + options.Tracker = NewInMemoryTracker() + } + + if options.Headers == nil { + options.Headers = make(http.Header) + } + if _, ok := options.Headers["User-Agent"]; !ok { + options.Headers.Set("User-Agent", "containerd/"+version.Version) + } + + resolveHeader := http.Header{} + if _, ok := options.Headers["Accept"]; !ok { + // set headers for all the types we support for resolution. + resolveHeader.Set("Accept", strings.Join([]string{ + images.MediaTypeDockerSchema2Manifest, + images.MediaTypeDockerSchema2ManifestList, + ocispec.MediaTypeImageManifest, + ocispec.MediaTypeImageIndex, "*/*", + }, ", ")) + } else { + resolveHeader["Accept"] = options.Headers["Accept"] + delete(options.Headers, "Accept") + } + + if options.Hosts == nil { + opts := []RegistryOpt{} + if options.Host != nil { + opts = append(opts, WithHostTranslator(options.Host)) + } + + if options.Authorizer == nil { + options.Authorizer = NewDockerAuthorizer( + WithAuthClient(options.Client), + WithAuthHeader(options.Headers), + WithAuthCreds(options.Credentials)) + } + opts = append(opts, WithAuthorizer(options.Authorizer)) + + if options.Client != nil { + opts = append(opts, WithClient(options.Client)) + } + if options.PlainHTTP { + opts = append(opts, WithPlainHTTP(MatchAllHosts)) + } else { + opts = append(opts, WithPlainHTTP(MatchLocalhost)) + } + options.Hosts = ConfigureDefaultRegistries(opts...) + } + return &dockerResolver{ + hosts: options.Hosts, + header: options.Headers, + resolveHeader: resolveHeader, + tracker: options.Tracker, + } +} + +func getManifestMediaType(resp *http.Response) string { + // Strip encoding data (manifests should always be ascii JSON) + contentType := resp.Header.Get("Content-Type") + if sp := strings.IndexByte(contentType, ';'); sp != -1 { + contentType = contentType[0:sp] + } + + // As of Apr 30 2019 the registry.access.redhat.com registry does not specify + // the content type of any data but uses schema1 manifests. + if contentType == "text/plain" { + contentType = images.MediaTypeDockerSchema1Manifest + } + return contentType +} + +type countingReader struct { + reader io.Reader + bytesRead int64 +} + +func (r *countingReader) Read(p []byte) (int, error) { + n, err := r.reader.Read(p) + r.bytesRead += int64(n) + return n, err +} + +var _ resolve.Resolver = &dockerResolver{} + +func (r *dockerResolver) Resolve(ctx context.Context, ref string) (string, ocispec.Descriptor, error) { + base, err := r.resolveDockerBase(ref) + if err != nil { + return "", ocispec.Descriptor{}, err + } + refspec := base.refspec + if refspec.Object == "" { + return "", ocispec.Descriptor{}, reference.ErrObjectRequired + } + + var ( + firstErr error + paths [][]string + dgst = refspec.Digest() + caps = HostCapabilityPull + ) + + if dgst != "" { + if err := dgst.Validate(); err != nil { + // need to fail here, since we can't actually resolve the invalid + // digest. + return "", ocispec.Descriptor{}, err + } + + // turns out, we have a valid digest, make a url. + paths = append(paths, []string{"manifests", dgst.String()}) + + // fallback to blobs on not found. + paths = append(paths, []string{"blobs", dgst.String()}) + } else { + // Add + paths = append(paths, []string{"manifests", refspec.Object}) + caps |= HostCapabilityResolve + } + + hosts := base.filterHosts(caps) + if len(hosts) == 0 { + return "", ocispec.Descriptor{}, errors.Wrap(errdefs.ErrNotFound, "no resolve hosts") + } + + ctx, err = ContextWithRepositoryScope(ctx, refspec, false) + if err != nil { + return "", ocispec.Descriptor{}, err + } + + for _, u := range paths { + for _, host := range hosts { + ctxWithLogger := log.WithLogger(ctx, log.G(ctx).WithField("host", host.Host)) + + req := base.request(host, http.MethodHead, u...) + if err := req.addNamespace(base.refspec.Hostname()); err != nil { + return "", ocispec.Descriptor{}, err + } + + for key, value := range r.resolveHeader { + req.header[key] = append(req.header[key], value...) + } + + log.G(ctxWithLogger).Debug("resolving") + resp, err := req.doWithRetries(ctxWithLogger, nil) + if err != nil { + if errors.Is(err, ErrInvalidAuthorization) { + err = errors.Wrapf(err, "pull access denied, repository does not exist or may require authorization") + } else { + err = accessio.RetriableError(err) + } + // Store the error for referencing later + if firstErr == nil { + firstErr = err + } + log.G(ctxWithLogger).WithError(err).Info("trying next host") + continue // try another host + } + resp.Body.Close() // don't care about body contents. + + if resp.StatusCode > 299 { + if resp.StatusCode == http.StatusNotFound { + // log.G(ctxWithLogger).Info("trying next host - response was http.StatusNotFound") + continue + } + if resp.StatusCode > 399 { + // Set firstErr when encountering the first non-404 status code. + if firstErr == nil { + firstErr = errors.Errorf("pulling from host %s failed with status code %v: %v", host.Host, u, resp.Status) + } + continue // try another host + } + return "", ocispec.Descriptor{}, errors.Errorf("pulling from host %s failed with unexpected status code %v: %v", host.Host, u, resp.Status) + } + size := resp.ContentLength + contentType := getManifestMediaType(resp) + + // if no digest was provided, then only a resolve + // trusted registry was contacted, in this case use + // the digest header (or content from GET) + if dgst == "" { + // this is the only point at which we trust the registry. we use the + // content headers to assemble a descriptor for the name. when this becomes + // more robust, we mostly get this information from a secure trust store. + dgstHeader := digest.Digest(resp.Header.Get("Docker-Content-Digest")) + + if dgstHeader != "" && size != -1 { + if err := dgstHeader.Validate(); err != nil { + return "", ocispec.Descriptor{}, errors.Wrapf(err, "%q in header not a valid digest", dgstHeader) + } + dgst = dgstHeader + } + } + if dgst == "" || size == -1 { + log.G(ctxWithLogger).Debug("no Docker-Content-Digest header, fetching manifest instead") + + req = base.request(host, http.MethodGet, u...) + if err := req.addNamespace(base.refspec.Hostname()); err != nil { + return "", ocispec.Descriptor{}, err + } + + for key, value := range r.resolveHeader { + req.header[key] = append(req.header[key], value...) + } + + resp, err := req.doWithRetries(ctxWithLogger, nil) + if err != nil { + return "", ocispec.Descriptor{}, accessio.RetriableError(err) + } + defer resp.Body.Close() + + bodyReader := countingReader{reader: resp.Body} + + contentType = getManifestMediaType(resp) + if dgst == "" { + if contentType == images.MediaTypeDockerSchema1Manifest { + b, err := schema1.ReadStripSignature(&bodyReader) + if err != nil { + return "", ocispec.Descriptor{}, accessio.RetriableError(err) + } + + dgst = digest.FromBytes(b) + } else { + dgst, err = digest.FromReader(&bodyReader) + if err != nil { + return "", ocispec.Descriptor{}, accessio.RetriableError(err) + } + } + } else if _, err := io.Copy(io.Discard, &bodyReader); err != nil { + return "", ocispec.Descriptor{}, accessio.RetriableError(err) + } + size = bodyReader.bytesRead + } + // Prevent resolving to excessively large manifests + if size > MaxManifestSize { + if firstErr == nil { + firstErr = errors.Wrapf(errdefs.ErrNotFound, "rejecting %d byte manifest for %s", size, ref) + } + continue + } + + desc := ocispec.Descriptor{ + Digest: dgst, + MediaType: contentType, + Size: size, + } + + log.G(ctxWithLogger).WithField("desc.digest", desc.Digest).Debug("resolved") + return ref, desc, nil + } + } + + // If above loop terminates without return, then there was an error. + // "firstErr" contains the first non-404 error. That is, "firstErr == nil" + // means that either no registries were given or each registry returned 404. + + if firstErr == nil { + firstErr = errors.Wrap(errdefs.ErrNotFound, ref) + } + + return "", ocispec.Descriptor{}, firstErr +} + +func (r *dockerResolver) Fetcher(ctx context.Context, ref string) (resolve.Fetcher, error) { + base, err := r.resolveDockerBase(ref) + if err != nil { + return nil, err + } + + return dockerFetcher{ + dockerBase: base, + }, nil +} + +func (r *dockerResolver) Pusher(ctx context.Context, ref string) (resolve.Pusher, error) { + base, err := r.resolveDockerBase(ref) + if err != nil { + return nil, err + } + + return dockerPusher{ + dockerBase: base, + object: base.refspec.Object, + tracker: r.tracker, + }, nil +} + +func (r *dockerResolver) resolveDockerBase(ref string) (*dockerBase, error) { + refspec, err := reference.Parse(ref) + if err != nil { + return nil, err + } + + return r.base(refspec) +} + +type dockerBase struct { + refspec reference.Spec + repository string + hosts []RegistryHost + header http.Header +} + +func (r *dockerResolver) base(refspec reference.Spec) (*dockerBase, error) { + host := refspec.Hostname() + hosts, err := r.hosts(host) + if err != nil { + return nil, err + } + return &dockerBase{ + refspec: refspec, + repository: strings.TrimPrefix(refspec.Locator, host+"/"), + hosts: hosts, + header: r.header, + }, nil +} + +func (r *dockerBase) filterHosts(caps HostCapabilities) (hosts []RegistryHost) { + for _, host := range r.hosts { + if host.Capabilities.Has(caps) { + hosts = append(hosts, host) + } + } + return +} + +func (r *dockerBase) request(host RegistryHost, method string, ps ...string) *request { + header := r.header.Clone() + if header == nil { + header = http.Header{} + } + + for key, value := range host.Header { + header[key] = append(header[key], value...) + } + parts := append([]string{"/", host.Path, r.repository}, ps...) + p := path.Join(parts...) + // Join strips trailing slash, re-add ending "/" if included + if len(parts) > 0 && strings.HasSuffix(parts[len(parts)-1], "/") { + p += "/" + } + return &request{ + method: method, + path: p, + header: header, + host: host, + } +} + +func (r *request) authorize(ctx context.Context, req *http.Request) error { + // Check if has header for host + if r.host.Authorizer != nil { + if err := r.host.Authorizer.Authorize(ctx, req); err != nil { + return err + } + } + + return nil +} + +func (r *request) addNamespace(ns string) (err error) { + if !r.host.isProxy(ns) { + return nil + } + var q url.Values + // Parse query + if i := strings.IndexByte(r.path, '?'); i > 0 { + r.path = r.path[:i+1] + q, err = url.ParseQuery(r.path[i+1:]) + if err != nil { + return + } + } else { + r.path += "?" + q = url.Values{} + } + q.Add("ns", ns) + + r.path += q.Encode() + + return +} + +type request struct { + method string + path string + header http.Header + host RegistryHost + body func() (io.ReadCloser, error) + size int64 +} + +func (r *request) do(ctx context.Context) (*http.Response, error) { + u := r.host.Scheme + "://" + r.host.Host + r.path + req, err := http.NewRequestWithContext(ctx, r.method, u, nil) + if err != nil { + return nil, err + } + req.Header = http.Header{} // headers need to be copied to avoid concurrent map access + for k, v := range r.header { + req.Header[k] = v + } + if r.body != nil { + body, err := r.body() + if err != nil { + return nil, err + } + req.Body = body + req.GetBody = r.body + if r.size > 0 { + req.ContentLength = r.size + } + defer body.Close() + } + + ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", u)) + log.G(ctx).WithFields(sanitizedRequestFields(req)).Debug("do request") + if err := r.authorize(ctx, req); err != nil { + return nil, errors.Wrap(err, "failed to authorize") + } + + client := &http.Client{} + if r.host.Client != nil { + *client = *r.host.Client + } + if client.CheckRedirect == nil { + client.CheckRedirect = func(req *http.Request, via []*http.Request) error { + if len(via) >= 10 { + return errors.New("stopped after 10 redirects") + } + return errors.Wrap(r.authorize(ctx, req), "failed to authorize redirect") + } + } + + resp, err := ctxhttp.Do(ctx, client, req) + if err != nil { + return nil, errors.Wrap(err, "failed to do request") + } + log.G(ctx).WithFields(responseFields(resp)).Debug("fetch response received") + return resp, nil +} + +func (r *request) doWithRetries(ctx context.Context, responses []*http.Response) (*http.Response, error) { + resp, err := r.do(ctx) + if err != nil { + return nil, err + } + + responses = append(responses, resp) + retry, err := r.retryRequest(ctx, responses) + if err != nil { + resp.Body.Close() + return nil, err + } + if retry { + resp.Body.Close() + return r.doWithRetries(ctx, responses) + } + return resp, err +} + +func (r *request) retryRequest(ctx context.Context, responses []*http.Response) (bool, error) { + if len(responses) > 5 { + return false, nil + } + last := responses[len(responses)-1] + switch last.StatusCode { + case http.StatusUnauthorized: + log.G(ctx).WithField("header", last.Header.Get("WWW-Authenticate")).Debug("Unauthorized") + if r.host.Authorizer != nil { + if err := r.host.Authorizer.AddResponses(ctx, responses); err == nil { + return true, nil + } else if !errdefs.IsNotImplemented(err) { + return false, err + } + } + + return false, nil + case http.StatusMethodNotAllowed: + // Support registries which have not properly implemented the HEAD method for + // manifests endpoint + if r.method == http.MethodHead && strings.Contains(r.path, "/manifests/") { + r.method = http.MethodGet + return true, nil + } + case http.StatusRequestTimeout, http.StatusTooManyRequests: + return true, nil + } + + // TODO: Handle 50x errors accounting for attempt history + return false, nil +} + +func (r *request) String() string { + return r.host.Scheme + "://" + r.host.Host + r.path +} + +func sanitizedRequestFields(req *http.Request) logrus.Fields { + fields := map[string]interface{}{ + "request.method": req.Method, + } + for k, vals := range req.Header { + k = strings.ToLower(k) + if k == "authorization" { + continue + } + for i, v := range vals { + field := "request.header." + k + if i > 0 { + field = fmt.Sprintf("%s.%d", field, i) + } + fields[field] = v + } + } + + return logrus.Fields(fields) +} + +func responseFields(resp *http.Response) logrus.Fields { + fields := map[string]interface{}{ + "response.status": resp.Status, + } + for k, vals := range resp.Header { + k = strings.ToLower(k) + for i, v := range vals { + field := "response.header." + k + if i > 0 { + field = fmt.Sprintf("%s.%d", field, i) + } + fields[field] = v + } + } + + return logrus.Fields(fields) +} diff --git a/go.mod b/go.mod index ad60c5afb..795250e95 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.66.0 github.com/cloudflare/cfssl v1.6.5 github.com/containerd/containerd v1.7.23 + github.com/containerd/errdefs v0.3.0 github.com/containerd/log v0.1.0 github.com/containers/image/v5 v5.32.2 github.com/cyberphone/json-canonicalization v0.0.0-20231217050601-ba74d44ecf5f @@ -80,7 +81,6 @@ require ( k8s.io/apimachinery v0.31.1 k8s.io/cli-runtime v0.31.1 k8s.io/client-go v0.31.1 - oras.land/oras-go/v2 v2.5.0 sigs.k8s.io/controller-runtime v0.19.0 sigs.k8s.io/yaml v1.4.0 ) @@ -156,7 +156,6 @@ require ( github.com/clbanning/mxj/v2 v2.7.0 // indirect github.com/cloudflare/circl v1.5.0 // indirect github.com/common-nighthawk/go-figure v0.0.0-20210622060536-734e95fb86be // indirect - github.com/containerd/errdefs v0.3.0 // indirect github.com/containerd/platforms v0.2.1 // indirect github.com/containerd/stargz-snapshotter/estargz v0.15.1 // indirect github.com/containers/libtrust v0.0.0-20230121012942-c1716e8a8d01 // indirect @@ -276,6 +275,7 @@ require ( github.com/opencontainers/runtime-spec v1.2.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pborman/uuid v1.2.1 // indirect + github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect diff --git a/go.sum b/go.sum index 536d7eca5..b4577dbb1 100644 --- a/go.sum +++ b/go.sum @@ -1359,8 +1359,6 @@ k8s.io/utils v0.0.0-20240921022957-49e7df575cb6 h1:MDF6h2H/h4tbzmtIKTuctcwZmY0tY k8s.io/utils v0.0.0-20240921022957-49e7df575cb6/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= oras.land/oras-go v1.2.6 h1:z8cmxQXBU8yZ4mkytWqXfo6tZcamPwjsuxYU81xJ8Lk= oras.land/oras-go v1.2.6/go.mod h1:OVPc1PegSEe/K8YiLfosrlqlqTN9PUyFvOw5Y9gwrT8= -oras.land/oras-go/v2 v2.5.0 h1:o8Me9kLY74Vp5uw07QXPiitjsw7qNXi8Twd+19Zf02c= -oras.land/oras-go/v2 v2.5.0/go.mod h1:z4eisnLP530vwIOUOJeBIj0aGI0L1C3d53atvCBqZHg= sigs.k8s.io/controller-runtime v0.19.0 h1:nWVM7aq+Il2ABxwiCizrVDSlmDcshi9llbaFbC0ji/Q= sigs.k8s.io/controller-runtime v0.19.0/go.mod h1:iRmWllt8IlaLjvTTDLhRBXIEtkCK6hwVBJJsYS9Ajf4= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 h1:gBQPwqORJ8d8/YNZWEjoZs7npUVDpVXUUOFfW6CgAqE=