From 2cb31876cb74501608f9333bad830ad976848b17 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Fri, 20 Dec 2024 14:03:09 +0100 Subject: [PATCH] feat: support splitting blobs when stored as OCI layer (#1140) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### What this PR does / why we need it There are OCI repositories with a layer size limitation. Because OCM potentially maps any external artifact to a single blob stored as layer on OCI registries, this could lead to problems. An obvious problematic scenario is the transport of a multi-platform OCI image. Its blob format is an archive containing all images and all layers of those images. This PR introduces the possibility to specify blob limits for OCI registries. The OCM-to-OCI mapping then splits larger blobs into multiple layers. The `localBlob` access method then uses a comma-separated list of blob layer-blob digest to remember the sequence of layers. The access then combines the layerblobs again to a single stream. #### Which issue(s) this PR fixes Fixes: https://github.com/open-component-model/ocm-project/issues/12 A follow-up issue https://github.com/open-component-model/ocm-project/issues/338 describes the provisioning of appropriate defaults for common public registries. --------- Co-authored-by: Jakob Möller --- api/credentials/identity/hostpath/identity.go | 11 ++ .../repositories/ocireg/repository.go | 1 + api/ocm/cpi/repocpi/bridge_r.go | 18 +++ .../genericocireg/accessmethod_localblob.go | 136 +++++++++++++++- .../repositories/genericocireg/bloblimits.go | 53 ++++++ .../genericocireg/componentversion.go | 118 +++++++++++--- .../repositories/genericocireg/config/type.go | 110 +++++++++++++ .../repositories/genericocireg/config_test.go | 62 +++++++ .../repositories/genericocireg/repo_test.go | 151 ++++++++++++++++++ .../repositories/genericocireg/repository.go | 58 ++++++- .../repositories/genericocireg/type.go | 26 ++- docs/reference/ocm_configfile.md | 22 +++ 12 files changed, 732 insertions(+), 34 deletions(-) create mode 100644 api/ocm/extensions/repositories/genericocireg/bloblimits.go create mode 100644 api/ocm/extensions/repositories/genericocireg/config/type.go create mode 100644 api/ocm/extensions/repositories/genericocireg/config_test.go diff --git a/api/credentials/identity/hostpath/identity.go b/api/credentials/identity/hostpath/identity.go index baa2aad8a..968412dda 100644 --- a/api/credentials/identity/hostpath/identity.go +++ b/api/credentials/identity/hostpath/identity.go @@ -157,3 +157,14 @@ func PathPrefix(id cpi.ConsumerIdentity) string { } return strings.TrimPrefix(id[ID_PATHPREFIX], "/") } + +func HostPort(id cpi.ConsumerIdentity) string { + if id == nil { + return "" + } + host := id[ID_HOSTNAME] + if port, ok := id[ID_PORT]; ok { + return host + ":" + port + } + return host +} diff --git a/api/oci/extensions/repositories/ocireg/repository.go b/api/oci/extensions/repositories/ocireg/repository.go index 12fe7276d..cc2c845ac 100644 --- a/api/oci/extensions/repositories/ocireg/repository.go +++ b/api/oci/extensions/repositories/ocireg/repository.go @@ -70,6 +70,7 @@ func NewRepository(ctx cpi.Context, spec *RepositorySpec, info *RepositoryInfo) spec: spec, info: info, } + i.logger.Debug("created repository") return cpi.NewRepository(i), nil } diff --git a/api/ocm/cpi/repocpi/bridge_r.go b/api/ocm/cpi/repocpi/bridge_r.go index f7eea176f..53ed151f5 100644 --- a/api/ocm/cpi/repocpi/bridge_r.go +++ b/api/ocm/cpi/repocpi/bridge_r.go @@ -34,6 +34,24 @@ type RepositoryImpl interface { io.Closer } +// Chunked is an optional interface, which +// may be implemented to accept a blob limit for mapping +// local blobs to an external storage system. +type Chunked interface { + // SetBlobLimit sets the blob limit if possible. + // It returns true, if this was successful. + SetBlobLimit(s int64) bool +} + +// SetBlobLimit tries to set a blob limit for a repository +// implementation. It returns true, if this was possible. +func SetBlobLimit(i RepositoryImpl, s int64) bool { + if c, ok := i.(Chunked); ok { + return c.SetBlobLimit(s) + } + return false +} + type _repositoryBridgeBase = resource.ResourceImplBase[cpi.Repository] type repositoryBridge struct { diff --git a/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go b/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go index 94a0a8596..c661c9f29 100644 --- a/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go +++ b/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go @@ -1,10 +1,14 @@ package genericocireg import ( + "bytes" "io" + "os" + "strings" "sync" "github.com/mandelsoft/goutils/errors" + "github.com/mandelsoft/goutils/finalizer" "github.com/opencontainers/go-digest" "ocm.software/ocm/api/oci" @@ -88,9 +92,19 @@ func (m *localBlobAccessMethod) getBlob() (blobaccess.DataAccess, error) { return nil, errors.ErrNotImplemented("artifact blob synthesis") } } - _, data, err := m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference)) - if err != nil { - return nil, err + refs := strings.Split(m.spec.LocalReference, ",") + + var ( + data blobaccess.DataAccess + err error + ) + if len(refs) < 2 { + _, data, err = m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference)) + if err != nil { + return nil, err + } + } else { + data = &composedBlock{m, refs} } m.data = data return m.data, err @@ -111,3 +125,119 @@ func (m *localBlobAccessMethod) Get() ([]byte, error) { func (m *localBlobAccessMethod) MimeType() string { return m.spec.MediaType } + +//////////////////////////////////////////////////////////////////////////////// + +type composedBlock struct { + m *localBlobAccessMethod + refs []string +} + +var _ blobaccess.DataAccess = (*composedBlock)(nil) + +func (c *composedBlock) Get() ([]byte, error) { + buf := bytes.NewBuffer(nil) + for _, ref := range c.refs { + var finalize finalizer.Finalizer + + _, data, err := c.m.namespace.GetBlobData(digest.Digest(ref)) + if err != nil { + return nil, err + } + finalize.Close(data) + r, err := data.Reader() + if err != nil { + return nil, err + } + finalize.Close(r) + _, err = io.Copy(buf, r) + if err != nil { + return nil, err + } + err = finalize.Finalize() + if err != nil { + return nil, err + } + } + return buf.Bytes(), nil +} + +func (c *composedBlock) Reader() (io.ReadCloser, error) { + return &composedReader{ + m: c.m, + refs: c.refs, + }, nil +} + +func (c *composedBlock) Close() error { + return nil +} + +type composedReader struct { + lock sync.Mutex + m *localBlobAccessMethod + refs []string + reader io.ReadCloser + data blobaccess.DataAccess +} + +func (c *composedReader) Read(p []byte) (n int, err error) { + c.lock.Lock() + defer c.lock.Unlock() + + for { + if c.reader != nil { + n, err := c.reader.Read(p) + + if err == io.EOF { + c.reader.Close() + c.data.Close() + c.refs = c.refs[1:] + c.reader = nil + c.data = nil + // start new layer and return partial (>0) read before next layer is started + err = nil + } + // return partial read (even a zero read if layer is not yet finished) or error + if c.reader != nil || err != nil || n > 0 { + return n, err + } + // otherwise, we can use the given buffer for the next layer + + // now, we have to check for a next succeeding layer. + // This means to finish with the actual reader and continue + // with the next one. + } + + // If no more layers are available, report EOF. + if len(c.refs) == 0 { + return 0, io.EOF + } + + ref := strings.TrimSpace(c.refs[0]) + _, c.data, err = c.m.namespace.GetBlobData(digest.Digest(ref)) + if err != nil { + return 0, err + } + c.reader, err = c.data.Reader() + if err != nil { + return 0, err + } + } +} + +func (c *composedReader) Close() error { + c.lock.Lock() + defer c.lock.Unlock() + + if c.reader == nil && c.refs == nil { + return os.ErrClosed + } + if c.reader != nil { + c.reader.Close() + c.data.Close() + c.reader = nil + c.refs = nil + } + return nil +} diff --git a/api/ocm/extensions/repositories/genericocireg/bloblimits.go b/api/ocm/extensions/repositories/genericocireg/bloblimits.go new file mode 100644 index 000000000..c4ecfc799 --- /dev/null +++ b/api/ocm/extensions/repositories/genericocireg/bloblimits.go @@ -0,0 +1,53 @@ +package genericocireg + +import ( + "sync" + + configctx "ocm.software/ocm/api/config" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config" +) + +var ( + defaultBlobLimits config.BlobLimits + lock sync.Mutex +) + +const ( + KB = int64(1000) + MB = 1000 * KB + GB = 1000 * MB +) + +func init() { + defaultBlobLimits = config.BlobLimits{} + + // Add limits for known OCI repositories, here, + // or provide init functions in specialized packages + // by calling AddDefaultBlobLimit. + AddDefaultBlobLimit("ghcr.io", 10*GB) // https://github.com/orgs/community/discussions/77429 +} + +// AddDefaultBlobLimit can be used to set default blob limits +// for known repositories. +// Those limits will be overwritten, by blob limits +// given by a configuration object and the repository +// specification. +func AddDefaultBlobLimit(name string, limit int64) { + lock.Lock() + defer lock.Unlock() + + defaultBlobLimits[name] = limit +} + +func ConfigureBlobLimits(ctx configctx.ContextProvider, target config.Configurable) { + if target != nil { + lock.Lock() + defer lock.Unlock() + + target.ConfigureBlobLimits(defaultBlobLimits) + + if ctx != nil { + ctx.ConfigContext().ApplyTo(0, target) + } + } +} diff --git a/api/ocm/extensions/repositories/genericocireg/componentversion.go b/api/ocm/extensions/repositories/genericocireg/componentversion.go index f603fbe50..d332ca252 100644 --- a/api/ocm/extensions/repositories/genericocireg/componentversion.go +++ b/api/ocm/extensions/repositories/genericocireg/componentversion.go @@ -2,13 +2,16 @@ package genericocireg import ( "fmt" + "io" "path" "strings" "github.com/mandelsoft/goutils/errors" "github.com/mandelsoft/goutils/set" + "github.com/mandelsoft/vfs/pkg/vfs" "github.com/opencontainers/go-digest" + "ocm.software/ocm/api/datacontext/attrs/vfsattr" "ocm.software/ocm/api/oci" "ocm.software/ocm/api/oci/artdesc" "ocm.software/ocm/api/oci/extensions/repositories/artifactset" @@ -25,7 +28,9 @@ import ( ocihdlr "ocm.software/ocm/api/ocm/extensions/blobhandler/handlers/oci" "ocm.software/ocm/api/utils/accessio" "ocm.software/ocm/api/utils/accessobj" + "ocm.software/ocm/api/utils/blobaccess" "ocm.software/ocm/api/utils/errkind" + "ocm.software/ocm/api/utils/mime" common "ocm.software/ocm/api/utils/misc" "ocm.software/ocm/api/utils/refmgmt" "ocm.software/ocm/api/utils/runtime" @@ -183,11 +188,11 @@ func (c *ComponentVersionContainer) Update() (bool, error) { layers.Add(i) } for i, r := range desc.Resources { - s, l, err := c.evalLayer(r.Access) + s, list, err := c.evalLayer(r.Access) if err != nil { return false, fmt.Errorf("failed resource layer evaluation: %w", err) } - if l > 0 { + for _, l := range list { layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{ Kind: ARTKIND_RESOURCE, Identity: r.GetIdentity(desc.Resources), @@ -199,11 +204,11 @@ func (c *ComponentVersionContainer) Update() (bool, error) { } } for i, r := range desc.Sources { - s, l, err := c.evalLayer(r.Access) + s, list, err := c.evalLayer(r.Access) if err != nil { return false, fmt.Errorf("failed source layer evaluation: %w", err) } - if l > 0 { + for _, l := range list { layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{ Kind: ARTKIND_SOURCE, Identity: r.GetIdentity(desc.Sources), @@ -259,32 +264,45 @@ func (c *ComponentVersionContainer) Update() (bool, error) { return false, nil } -func (c *ComponentVersionContainer) evalLayer(s compdesc.AccessSpec) (compdesc.AccessSpec, int, error) { - var d *artdesc.Descriptor +func (c *ComponentVersionContainer) evalLayer(s compdesc.AccessSpec) (compdesc.AccessSpec, []int, error) { + var ( + d *artdesc.Descriptor + layernums []int + ) spec, err := c.GetContext().AccessSpecForSpec(s) if err != nil { - return s, 0, err + return s, nil, err } if a, ok := spec.(*localblob.AccessSpec); ok { if ok, _ := artdesc.IsDigest(a.LocalReference); !ok { - return s, 0, errors.ErrInvalid("digest", a.LocalReference) + return s, nil, errors.ErrInvalid("digest", a.LocalReference) } - d = &artdesc.Descriptor{Digest: digest.Digest(a.LocalReference), MediaType: a.GetMimeType()} - } - if d != nil { - // find layer - layers := c.manifest.GetDescriptor().Layers - maxLen := len(layers) - 1 - for i := range layers { - l := layers[len(layers)-1-i] - if i < maxLen && l.Digest == d.Digest && (d.Digest == "" || d.Digest == l.Digest) { - return s, len(layers) - 1 - i, nil + refs := strings.Split(a.LocalReference, ",") + media := a.GetMimeType() + if len(refs) > 1 { + media = mime.MIME_OCTET + } + for _, ref := range refs { + d = &artdesc.Descriptor{Digest: digest.Digest(strings.TrimSpace(ref)), MediaType: media} + // find layer + layers := c.manifest.GetDescriptor().Layers + maxLen := len(layers) - 1 + found := false + for i := maxLen; i > 0; i-- { // layer 0 is the component descriptor + l := layers[i] + if l.Digest == d.Digest { + layernums = append(layernums, i) + found = true + break + } + } + if !found { + return s, nil, fmt.Errorf("resource access %s: no layer found for local blob %s[%s]", spec.Describe(c.GetContext()), d.Digest, d.MediaType) } } - return s, 0, fmt.Errorf("resource access %s: no layer found for local blob %s[%s]", spec.Describe(c.GetContext()), d.Digest, d.MediaType) } - return s, 0, nil + return s, layernums, nil } func (c *ComponentVersionContainer) GetDescriptor() *compdesc.ComponentDescriptor { @@ -299,20 +317,74 @@ func (c *ComponentVersionContainer) GetStorageContext() cpi.StorageContext { return ocihdlr.New(c.comp.GetName(), c.Repository(), c.comp.repo.ocirepo.GetSpecification().GetKind(), c.comp.repo.ocirepo, c.comp.namespace, c.manifest) } +func blobAccessForChunk(blob blobaccess.BlobAccess, fs vfs.FileSystem, r io.Reader, limit int64) (cpi.BlobAccess, bool, error) { + f, err := blobaccess.NewTempFile("", "chunk-*", fs) + if err != nil { + return nil, true, err + } + written, err := io.CopyN(f.Writer(), r, limit) + if err != nil && !errors.Is(err, io.EOF) { + f.Close() + return nil, false, err + } + if written <= 0 { + f.Close() + return nil, false, nil + } + return f.AsBlob(blob.MimeType()), written == limit, nil +} + func (c *ComponentVersionContainer) AddBlob(blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) { if blob == nil { return nil, errors.New("a resource has to be defined") } + fs := vfsattr.Get(c.GetContext()) + size := blob.Size() + limit := c.comp.repo.blobLimit + var refs []string + if limit > 0 && size != blobaccess.BLOB_UNKNOWN_SIZE && size > limit { + reader, err := blob.Reader() + if err != nil { + return nil, err + } + defer reader.Close() + + var b blobaccess.BlobAccess + cont := true + for cont { + b, cont, err = blobAccessForChunk(blob, fs, reader, limit) + if err != nil { + return nil, err + } + if b != nil { + err = c.addLayer(b, &refs) + b.Close() + if err != nil { + return nil, err + } + } + } + } else { + err := c.addLayer(blob, &refs) + if err != nil { + return nil, err + } + } + return localblob.New(strings.Join(refs, ","), refName, blob.MimeType(), global), nil +} + +func (c *ComponentVersionContainer) addLayer(blob cpi.BlobAccess, refs *[]string) error { err := c.manifest.AddBlob(blob) if err != nil { - return nil, err + return err } err = ocihdlr.AssureLayer(c.manifest.GetDescriptor(), blob) if err != nil { - return nil, err + return err } - return localblob.New(blob.Digest().String(), refName, blob.MimeType(), global), nil + *refs = append(*refs, blob.Digest().String()) + return nil } // AssureGlobalRef provides a global manifest for a local OCI Artifact. diff --git a/api/ocm/extensions/repositories/genericocireg/config/type.go b/api/ocm/extensions/repositories/genericocireg/config/type.go new file mode 100644 index 000000000..137c1d268 --- /dev/null +++ b/api/ocm/extensions/repositories/genericocireg/config/type.go @@ -0,0 +1,110 @@ +package config + +import ( + "net" + "strings" + + "ocm.software/ocm/api/config" + cfgcpi "ocm.software/ocm/api/config/cpi" + "ocm.software/ocm/api/utils/runtime" +) + +const ( + ConfigType = "blobLimits.ocireg.ocm" + cfgcpi.OCM_CONFIG_TYPE_SUFFIX + ConfigTypeV1 = ConfigType + runtime.VersionSeparator + "v1" +) + +func init() { + cfgcpi.RegisterConfigType(cfgcpi.NewConfigType[*Config](ConfigType, usage)) + cfgcpi.RegisterConfigType(cfgcpi.NewConfigType[*Config](ConfigTypeV1)) +} + +// Config describes a memory based config interface +// for configuring blob limits for underlying OCI manifest layers. +type Config struct { + runtime.ObjectVersionedType `json:",inline"` + // BlobLimits describe the limit setting for host:port + // entries. As a special case (for testing) it is possible + // to configure limits for CTF, also, by using "@"+filepath. + BlobLimits BlobLimits `json:"blobLimits"` +} + +type BlobLimits map[string]int64 + +func (b BlobLimits) GetLimit(hostport string) int64 { + if b == nil { + return -1 + } + l, ok := b[hostport] + if ok { + return l + } + + if !strings.HasPrefix(hostport, "@") { + host, _, err := net.SplitHostPort(hostport) + if err == nil { + l, ok = b[host] + if ok { + return l + } + } + } + return -1 +} + +type Configurable interface { + ConfigureBlobLimits(limits BlobLimits) +} + +// New creates a blob limit ConfigSpec. +func New() *Config { + return &Config{ + ObjectVersionedType: runtime.NewVersionedTypedObject(ConfigType), + } +} + +func (a *Config) GetType() string { + return ConfigType +} + +func (a *Config) AddLimit(hostport string, limit int64) { + if a.BlobLimits == nil { + a.BlobLimits = BlobLimits{} + } + a.BlobLimits[hostport] = limit +} + +func (a *Config) ApplyTo(ctx config.Context, target interface{}) error { + t, ok := target.(Configurable) + if !ok { + return config.ErrNoContext(ConfigType) + } + if a.BlobLimits != nil { + t.ConfigureBlobLimits(a.BlobLimits) + } + return nil +} + +const usage = ` +The config type ` + ConfigType + ` can be used to set some +blob layer limits for particular OCI registries used to host OCM repositories. +The blobLimits field maps a OCI registry address to the blob limit to use: + +
+    type: ` + ConfigType + `
+    blobLimits:
+        dummy.io: 65564
+        dummy.io:8443: 32768 // with :8443 specifying the port and 32768 specifying the byte limit
+
+ +If blob limits apply to a registry, local blobs with a size larger than +the configured limit will be split into several layers with a maximum +size of the given value. + +These settings can be overwritten by explicit settings in an OCM +repository specification for those repositories. + +The most specific entry will be used. If a registry with a dedicated +port is requested, but no explicit configuration is found, the +setting for the sole hostname is used (if configured). +` diff --git a/api/ocm/extensions/repositories/genericocireg/config_test.go b/api/ocm/extensions/repositories/genericocireg/config_test.go new file mode 100644 index 000000000..2435497a8 --- /dev/null +++ b/api/ocm/extensions/repositories/genericocireg/config_test.go @@ -0,0 +1,62 @@ +package genericocireg_test + +import ( + "reflect" + + . "github.com/mandelsoft/goutils/testutils" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "ocm.software/ocm/api/datacontext" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config" + + "github.com/mandelsoft/goutils/finalizer" + "github.com/mandelsoft/vfs/pkg/osfs" + "github.com/mandelsoft/vfs/pkg/vfs" + "ocm.software/ocm/api/oci" + "ocm.software/ocm/api/oci/extensions/repositories/ctf" + "ocm.software/ocm/api/ocm" + "ocm.software/ocm/api/ocm/cpi/repocpi" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg" + "ocm.software/ocm/api/utils/accessio" + "ocm.software/ocm/api/utils/accessobj" +) + +var _ = Describe("component repository mapping", func() { + var tempfs vfs.FileSystem + + var ocispec oci.RepositorySpec + var spec *genericocireg.RepositorySpec + + BeforeEach(func() { + t, err := osfs.NewTempFileSystem() + Expect(err).To(Succeed()) + tempfs = t + + // ocmlog.Context().AddRule(logging.NewConditionRule(logging.TraceLevel, accessio.ALLOC_REALM)) + + ocispec, err = ctf.NewRepositorySpec(accessobj.ACC_CREATE, "test", accessio.PathFileSystem(tempfs), accessobj.FormatDirectory) + Expect(err).To(Succeed()) + spec = genericocireg.NewRepositorySpec(ocispec, nil) + }) + + AfterEach(func() { + vfs.Cleanup(tempfs) + }) + + It("creates a dummy component with configured chunks", func() { + var finalize finalizer.Finalizer + defer Defer(finalize.Finalize) + + ctx := ocm.New(datacontext.MODE_EXTENDED) + + cfg := config.New() + cfg.AddLimit("@test", 5) + ctx.ConfigContext().ApplyConfig(cfg, "direct") + + repo := finalizer.ClosingWith(&finalize, Must(ctx.RepositoryForSpec(spec))) + impl := Must(repocpi.GetRepositoryImplementation(repo)) + Expect(reflect.TypeOf(impl).String()).To(Equal("*genericocireg.RepositoryImpl")) + + Expect(impl.(*genericocireg.RepositoryImpl).GetBlobLimit()).To(Equal(int64(5))) + }) +}) diff --git a/api/ocm/extensions/repositories/genericocireg/repo_test.go b/api/ocm/extensions/repositories/genericocireg/repo_test.go index 5ea992c69..a892bf0f9 100644 --- a/api/ocm/extensions/repositories/genericocireg/repo_test.go +++ b/api/ocm/extensions/repositories/genericocireg/repo_test.go @@ -2,6 +2,7 @@ package genericocireg_test import ( "fmt" + "io" "path" "reflect" @@ -123,6 +124,156 @@ var _ = Describe("component repository mapping", func() { MustBeSuccessful(finalize.Finalize()) }) + const ref4 = "sha256:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08,sha256:3a6eb0790f39ac87c94f3856b2dd2c5d110e6811602261a9a923d3bb23adc8b7" + const ref5 = "sha256:a4853613b2a38568ed4e49196238152469097412d06d5e5fc9be8ab92cfdf2bf,sha256:977817f6f61f4dd501df3036a3e16b31452b36f4aa3edcf9a3f3242a79d7170d" + const ref8 = "sha256:" + ocmtesthelper.D_TESTDATA + + DescribeTable("creates a dummy component with chunks", func(limit int, f func(ocm.ResourceAccess), ref string) { + var finalize finalizer.Finalizer + defer Defer(finalize.Finalize) + + repo := finalizer.ClosingWith(&finalize, Must(DefaultContext.RepositoryForSpec(spec))) + impl := Must(repocpi.GetRepositoryImplementation(repo)) + Expect(reflect.TypeOf(impl).String()).To(Equal("*genericocireg.RepositoryImpl")) + repocpi.SetBlobLimit(impl, int64(limit)) + + comp := finalizer.ClosingWith(&finalize, Must(repo.LookupComponent(COMPONENT))) + vers := finalizer.ClosingWith(&finalize, Must(comp.NewVersion("v1"))) + + m1 := compdesc.NewResourceMeta("rsc1", resourcetypes.PLAIN_TEXT, metav1.LocalRelation) + blob := blobaccess.ForString(mime.MIME_TEXT, ocmtesthelper.S_TESTDATA) + MustBeSuccessful(vers.SetResourceBlob(m1, blob, "", nil)) + + MustBeSuccessful(comp.AddVersion(vers)) + + noref := vers.Repository() + Expect(noref).NotTo(BeNil()) + Expect(noref.IsClosed()).To(BeFalse()) + Expect(noref.Close()).To(Succeed()) + Expect(noref.IsClosed()).To(BeFalse()) + + MustBeSuccessful(finalize.Finalize()) + + Expect(noref.IsClosed()).To(BeTrue()) + Expect(noref.Close()).To(MatchError("closed")) + ExpectError(noref.LookupComponent("dummy")).To(MatchError("closed")) + + // access it again + repo = finalizer.ClosingWith(&finalize, Must(DefaultContext.RepositoryForSpec(spec))) + + ok := Must(repo.ExistsComponentVersion(COMPONENT, "v1")) + Expect(ok).To(BeTrue()) + + comp = finalizer.ClosingWith(&finalize, Must(repo.LookupComponent(COMPONENT))) + vers = finalizer.ClosingWith(&finalize, Must(comp.LookupVersion("v1"))) + + rsc := Must(vers.GetResourceByIndex(0)) + acc := Must(rsc.Access()) + local, ok := acc.(*localblob.AccessSpec) + Expect(ok).To(BeTrue()) + // fmt.Printf("localref: %s\n", local.LocalReference) + Expect(local.LocalReference).To(Equal(ref)) + Expect(rsc.Meta().Digest).NotTo(BeNil()) + Expect(rsc.Meta().Digest.Value).To(Equal(ocmtesthelper.D_TESTDATA)) + + f(rsc) + + MustBeSuccessful(finalize.Finalize()) + }, + Entry("get blob", 5, func(rsc ocm.ResourceAccess) { + data := Must(ocmutils.GetResourceData(rsc)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref5), + Entry("stream blob", 5, func(rsc ocm.ResourceAccess) { + r := Must(ocmutils.GetResourceReader(rsc)) + data := Must(io.ReadAll(r)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref5), + Entry("stream blob with small buffer", 5, func(rsc ocm.ResourceAccess) { + var buf [2]byte + var data []byte + + r := Must(ocmutils.GetResourceReader(rsc)) + + for { + n, err := r.Read(buf[:]) + if n > 0 { + data = append(data, buf[:n]...) + } + if err != nil { + if err == io.EOF { + break + } else { + MustBeSuccessful(err) + } + } + } + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref5), + + Entry("get blob (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) { + data := Must(ocmutils.GetResourceData(rsc)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref8), + Entry("stream blob (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) { + r := Must(ocmutils.GetResourceReader(rsc)) + data := Must(io.ReadAll(r)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref8), + Entry("stream blob with small buffer (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) { + var buf [2]byte + var data []byte + + r := Must(ocmutils.GetResourceReader(rsc)) + + for { + n, err := r.Read(buf[:]) + if n > 0 { + data = append(data, buf[:n]...) + } + if err != nil { + if err == io.EOF { + break + } else { + MustBeSuccessful(err) + } + } + } + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref8), + + Entry("get blob (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) { + data := Must(ocmutils.GetResourceData(rsc)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref4), + Entry("stream blob (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) { + r := Must(ocmutils.GetResourceReader(rsc)) + data := Must(io.ReadAll(r)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref4), + Entry("stream blob with small buffer (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) { + var buf [2]byte + var data []byte + + r := Must(ocmutils.GetResourceReader(rsc)) + + for { + n, err := r.Read(buf[:]) + if n > 0 { + data = append(data, buf[:n]...) + } + if err != nil { + if err == io.EOF { + break + } else { + MustBeSuccessful(err) + } + } + } + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref4), + ) + It("handles legacylocalociblob access method", func() { var finalize finalizer.Finalizer defer Defer(finalize.Finalize) diff --git a/api/ocm/extensions/repositories/genericocireg/repository.go b/api/ocm/extensions/repositories/genericocireg/repository.go index 70661f76f..8773350ea 100644 --- a/api/ocm/extensions/repositories/genericocireg/repository.go +++ b/api/ocm/extensions/repositories/genericocireg/repository.go @@ -11,12 +11,16 @@ import ( "github.com/mandelsoft/goutils/general" "ocm.software/ocm/api/credentials" + "ocm.software/ocm/api/credentials/identity/hostpath" "ocm.software/ocm/api/datacontext" "ocm.software/ocm/api/oci" ocicpi "ocm.software/ocm/api/oci/cpi" + "ocm.software/ocm/api/oci/extensions/repositories/ctf" + "ocm.software/ocm/api/oci/extensions/repositories/ocireg" "ocm.software/ocm/api/ocm/cpi" "ocm.software/ocm/api/ocm/cpi/repocpi" "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/componentmapping" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config" ) type OCIBasedRepository interface { @@ -44,23 +48,69 @@ type RepositoryImpl struct { nonref cpi.Repository ocirepo oci.Repository readonly bool + // blobLimit is the size limit for layers maintained for the storage of localBlobs. + // The value -1 means an unconfigured value (a default from the blob limit configuration is used), + // a value == 0 disables the limiting and (a default from the blob limit configuration is ignored), + // a value > 0 enabled the usage of the specified size. + blobLimit int64 } var ( _ repocpi.RepositoryImpl = (*RepositoryImpl)(nil) _ credentials.ConsumerIdentityProvider = (*RepositoryImpl)(nil) + _ config.Configurable = (*RepositoryImpl)(nil) ) -func NewRepository(ctxp cpi.ContextProvider, meta *ComponentRepositoryMeta, ocirepo oci.Repository) cpi.Repository { +// NewRepository creates a new OCM repository based on any OCI abstraction from +// the OCI context type. +// The optional blobLimit is the size limit for layers maintained for the storage of localBlobs. +// The value -1 means an unconfigured value (a default from the blob limit configuration is used), +// a value == 0 disables the limiting and (a default from the blob limit configuration is ignored), +// a value > 0 enabled the usage of the specified size. +func NewRepository(ctxp cpi.ContextProvider, meta *ComponentRepositoryMeta, ocirepo oci.Repository, blobLimit ...int64) cpi.Repository { ctx := datacontext.InternalContextRef(ctxp.OCMContext()) + impl := &RepositoryImpl{ - ctx: ctx, - meta: *DefaultComponentRepositoryMeta(meta), - ocirepo: ocirepo, + ctx: ctx, + meta: *DefaultComponentRepositoryMeta(meta), + ocirepo: ocirepo, + blobLimit: general.OptionalDefaulted(-1, blobLimit...), + } + if impl.blobLimit < 0 { + ConfigureBlobLimits(ctxp.OCMContext(), impl) } return repocpi.NewRepository(impl, "OCM repo[OCI]") } +func (r *RepositoryImpl) ConfigureBlobLimits(limits config.BlobLimits) { + if len(limits) == 0 { + return + } + if spec, ok := r.ocirepo.GetSpecification().(*ocireg.RepositorySpec); ok { + id := spec.GetConsumerId() + hp := hostpath.HostPort(id) + l := limits.GetLimit(hp) + if l >= 0 { + r.blobLimit = l + } + } + if spec, ok := r.ocirepo.GetSpecification().(*ctf.RepositorySpec); ok { + l := limits.GetLimit("@" + spec.FilePath) + if l >= 0 { + r.blobLimit = l + } + } +} + +func (r *RepositoryImpl) SetBlobLimit(s int64) bool { + r.blobLimit = s + return true +} + +func (r *RepositoryImpl) GetBlobLimit() int64 { + return r.blobLimit +} + func (r *RepositoryImpl) Close() error { return r.ocirepo.Close() } diff --git a/api/ocm/extensions/repositories/genericocireg/type.go b/api/ocm/extensions/repositories/genericocireg/type.go index 809d59814..b3c4b460e 100644 --- a/api/ocm/extensions/repositories/genericocireg/type.go +++ b/api/ocm/extensions/repositories/genericocireg/type.go @@ -91,6 +91,7 @@ func NewComponentRepositoryMeta(subPath string, mapping ComponentNameMapping) *C type RepositorySpec struct { oci.RepositorySpec ComponentRepositoryMeta + BlobLimit *int64 } var ( @@ -127,19 +128,28 @@ func (a *RepositorySpec) AsUniformSpec(cpi.Context) *cpi.UniformRepositorySpec { return &cpi.UniformRepositorySpec{Type: a.GetKind(), Scheme: spec.Scheme, Host: spec.Host, Info: spec.Info, TypeHint: spec.TypeHint, SubPath: a.SubPath} } +type meta struct { + ComponentRepositoryMeta `json:",inline"` + BlobLimit *int64 `json:"blobLimit,omitempty"` +} + func (u *RepositorySpec) UnmarshalJSON(data []byte) error { logrus.Debugf("unmarshal generic ocireg spec %s\n", string(data)) ocispec := &oci.GenericRepositorySpec{} if err := json.Unmarshal(data, ocispec); err != nil { return err } - compmeta := &ComponentRepositoryMeta{} - if err := json.Unmarshal(data, ocispec); err != nil { + + var m meta + if err := json.Unmarshal(data, &m); err != nil { return err } u.RepositorySpec = ocispec - u.ComponentRepositoryMeta = *compmeta + u.ComponentRepositoryMeta = m.ComponentRepositoryMeta + if m.BlobLimit != nil { + u.BlobLimit = m.BlobLimit + } normalizers.Normalize(u) return nil @@ -154,7 +164,12 @@ func (u RepositorySpec) MarshalJSON() ([]byte, error) { if err != nil { return nil, err } - compmeta, err := runtime.ToUnstructuredObject(u.ComponentRepositoryMeta) + + m := meta{ + ComponentRepositoryMeta: u.ComponentRepositoryMeta, + BlobLimit: u.BlobLimit, + } + compmeta, err := runtime.ToUnstructuredObject(&m) if err != nil { return nil, err } @@ -166,6 +181,9 @@ func (s *RepositorySpec) Repository(ctx cpi.Context, creds credentials.Credentia if err != nil { return nil, err } + if s.BlobLimit != nil { + return NewRepository(ctx, &s.ComponentRepositoryMeta, r, *s.BlobLimit), nil + } return NewRepository(ctx, &s.ComponentRepositoryMeta, r), nil } diff --git a/docs/reference/ocm_configfile.md b/docs/reference/ocm_configfile.md index ae9e51920..4f5282064 100644 --- a/docs/reference/ocm_configfile.md +++ b/docs/reference/ocm_configfile.md @@ -24,6 +24,28 @@ The following configuration types are supported: <name>: <yaml defining the attribute> ... +- blobLimits.ocireg.ocm.config.ocm.software + The config type blobLimits.ocireg.ocm.config.ocm.software can be used to set some + blob layer limits for particular OCI registries used to host OCM repositories. + The blobLimits field maps a OCI registry address to the blob limit to use: + +
+      type: blobLimits.ocireg.ocm.config.ocm.software
+      blobLimits:
+          dummy.io: 65564
+          dummy.io:8443: 32768 // with :8443 specifying the port and 32768 specifying the byte limit
+  
+ + If blob limits apply to a registry, local blobs with a size larger than + the configured limit will be split into several layers with a maximum + size of the given value. + + These settings can be overwritten by explicit settings in an OCM + repository specification for those repositories. + + The most specific entry will be used. If a registry with a dedicated + port is requested, but no explicit configuration is found, the + setting for the sole hostname is used (if configured). - cli.ocm.config.ocm.software The config type cli.ocm.config.ocm.software is used to handle the main configuration flags of the OCM command line tool.