Skip to content

Commit

Permalink
change OCI pusher API to get rid of pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
mandelsoft committed Aug 24, 2022
1 parent 5267fee commit df4670c
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 302 deletions.
15 changes: 8 additions & 7 deletions pkg/contexts/oci/repositories/ocireg/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/open-component-model/ocm/pkg/common/accessio"
"github.com/open-component-model/ocm/pkg/contexts/oci/attrs/cacheattr"
"github.com/open-component-model/ocm/pkg/contexts/oci/cpi"
"github.com/open-component-model/ocm/pkg/docker/resolve"
"github.com/open-component-model/ocm/pkg/errors"
)

Expand All @@ -34,20 +35,20 @@ type BlobContainer interface {

type blobContainer struct {
accessio.StaticAllocatable
fetcher remotes.Fetcher
pusher remotes.Pusher
fetcher resolve.Fetcher
pusher resolve.Pusher
mime string
}

type BlobContainers struct {
lock sync.Mutex
cache accessio.BlobCache
fetcher remotes.Fetcher
pusher remotes.Pusher
fetcher resolve.Fetcher
pusher resolve.Pusher
mimes map[string]BlobContainer
}

func NewBlobContainers(ctx cpi.Context, fetcher remotes.Fetcher, pusher remotes.Pusher) *BlobContainers {
func NewBlobContainers(ctx cpi.Context, fetcher remotes.Fetcher, pusher resolve.Pusher) *BlobContainers {
return &BlobContainers{
cache: cacheattr.Get(ctx),
fetcher: fetcher,
Expand Down Expand Up @@ -78,15 +79,15 @@ func (c *BlobContainers) Release() error {
return list.Result()
}

func newBlobContainer(mime string, fetcher remotes.Fetcher, pusher remotes.Pusher) *blobContainer {
func newBlobContainer(mime string, fetcher resolve.Fetcher, pusher resolve.Pusher) *blobContainer {
return &blobContainer{
mime: mime,
fetcher: fetcher,
pusher: pusher,
}
}

func NewBlobContainer(cache accessio.BlobCache, mime string, fetcher remotes.Fetcher, pusher remotes.Pusher) BlobContainer {
func NewBlobContainer(cache accessio.BlobCache, mime string, fetcher resolve.Fetcher, pusher resolve.Pusher) BlobContainer {
c := newBlobContainer(mime, fetcher, pusher)

if cache == nil {
Expand Down
15 changes: 7 additions & 8 deletions pkg/contexts/oci/repositories/ocireg/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ import (
"fmt"

"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/remotes"
"github.com/opencontainers/go-digest"

"github.com/open-component-model/ocm/pkg/common/accessio"
"github.com/open-component-model/ocm/pkg/common/accessobj"
"github.com/open-component-model/ocm/pkg/contexts/oci/artdesc"
"github.com/open-component-model/ocm/pkg/contexts/oci/cpi"
"github.com/open-component-model/ocm/pkg/docker"
"github.com/open-component-model/ocm/pkg/docker/resolve"
"github.com/open-component-model/ocm/pkg/errors"
)

Expand All @@ -37,10 +36,10 @@ type Namespace struct {
type NamespaceContainer struct {
repo *Repository
namespace string
resolver remotes.Resolver
lister docker.Lister
fetcher remotes.Fetcher
pusher remotes.Pusher
resolver resolve.Resolver
lister resolve.Lister
fetcher resolve.Fetcher
pusher resolve.Pusher
blobs *BlobContainers
}

Expand All @@ -61,7 +60,7 @@ func NewNamespace(repo *Repository, name string) (*Namespace, error) {
if err != nil {
return nil, err
}
lister, err := resolver.(docker.Resolver).Lister(context.Background(), ref)
lister, err := resolver.Lister(context.Background(), ref)
if err != nil {
return nil, err
}
Expand All @@ -83,7 +82,7 @@ func (n *NamespaceContainer) Close() error {
return n.blobs.Release()
}

func (n *NamespaceContainer) getPusher(vers string) (remotes.Pusher, error) {
func (n *NamespaceContainer) getPusher(vers string) (resolve.Pusher, error) {
ref := n.repo.getRef(n.namespace, vers)
fmt.Printf("pusher for %s\n", ref)
resolver := n.resolver
Expand Down
4 changes: 2 additions & 2 deletions pkg/contexts/oci/repositories/ocireg/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import (
"strings"

"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker/config"

"github.com/open-component-model/ocm/pkg/contexts/credentials"
"github.com/open-component-model/ocm/pkg/contexts/oci/artdesc"
"github.com/open-component-model/ocm/pkg/contexts/oci/cpi"
"github.com/open-component-model/ocm/pkg/contexts/oci/identity"
"github.com/open-component-model/ocm/pkg/docker"
"github.com/open-component-model/ocm/pkg/docker/resolve"
"github.com/open-component-model/ocm/pkg/errors"
)

Expand Down Expand Up @@ -113,7 +113,7 @@ func (r *Repository) getCreds(comp string) (credentials.Credentials, error) {
return creds, nil
}

func (r *Repository) getResolver(comp string) (remotes.Resolver, error) {
func (r *Repository) getResolver(comp string) (resolve.Resolver, error) {
creds, err := r.getCreds(comp)
if err != nil {
if !errors.IsErrUnknownKind(err, credentials.KIND_CONSUMER) {
Expand Down
18 changes: 5 additions & 13 deletions pkg/contexts/oci/repositories/ocireg/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/open-component-model/ocm/pkg/common/accessio"
"github.com/open-component-model/ocm/pkg/contexts/oci/artdesc"
"github.com/open-component-model/ocm/pkg/contexts/oci/cpi"
"github.com/open-component-model/ocm/pkg/docker/resolve"
)

// TODO: add cache
Expand Down Expand Up @@ -102,35 +103,26 @@ func readAll(reader io.ReadCloser, err error) ([]byte, error) {
return data, nil
}

func push(ctx context.Context, p remotes.Pusher, blob accessio.BlobAccess) error {
func push(ctx context.Context, p resolve.Pusher, blob accessio.BlobAccess) error {
desc := *artdesc.DefaultBlobDescriptor(blob)
return pushData(ctx, p, desc, blob)
}

func pushData(ctx context.Context, p remotes.Pusher, desc artdesc.Descriptor, data accessio.DataAccess) error {
func pushData(ctx context.Context, p resolve.Pusher, desc artdesc.Descriptor, data accessio.DataAccess) error {
key := remotes.MakeRefKey(ctx, desc)
if desc.Size == 0 {
desc.Size = -1
}
fmt.Printf("*** push %s %s: %s\n", desc.MediaType, desc.Digest, key)
write, err := p.Push(ctx, desc)
req, err := p.Push(ctx, desc, data)
if err != nil {
if errdefs.IsAlreadyExists(err) {
fmt.Printf("*** %s %s: already exists\n", desc.MediaType, desc.Digest)
return nil
}
return err
}
read, err := data.Reader()
if err != nil {
return err
}
defer read.Close()
_, err = io.Copy(write, read)
if err != nil {
return err
}
return write.Commit(ctx, desc.Size, desc.Digest)
return req.Commit(ctx, desc.Size, desc.Digest)
}

var dummyContext = nologger()
Expand Down
4 changes: 3 additions & 1 deletion pkg/docker/lister.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/pkg/errors"

"github.com/open-component-model/ocm/pkg/docker/resolve"
)

var ErrObjectNotRequired = errors.New("object not required")
Expand All @@ -36,7 +38,7 @@ type dockerLister struct {
dockerBase *dockerBase
}

func (r *dockerResolver) Lister(ctx context.Context, ref string) (Lister, error) {
func (r *dockerResolver) Lister(ctx context.Context, ref string) (resolve.Lister, error) {
base, err := r.resolveDockerBase(ref)
if err != nil {
return nil, err
Expand Down
12 changes: 0 additions & 12 deletions pkg/docker/orig.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,9 @@
package docker

import (
"context"

"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
)

type Resolver interface {
remotes.Resolver
Lister(ctx context.Context, ref string) (Lister, error)
}

type Lister interface {
List(context.Context) ([]string, error)
}

var ContextWithRepositoryScope = docker.ContextWithRepositoryScope
var ContextWithAppendPullRepositoryScope = docker.ContextWithAppendPullRepositoryScope
var NewInMemoryTracker = docker.NewInMemoryTracker
Expand Down
Loading

0 comments on commit df4670c

Please sign in to comment.