Skip to content

Commit

Permalink
add --blobs-to-disk to 'crane registry serve' (#1731)
Browse files Browse the repository at this point in the history
* add --blobs-to-disk to 'crane registry serve'

Signed-off-by: Jason Hall <[email protected]>

* add tests

Signed-off-by: Jason Hall <[email protected]>

* mark flag hidden

Signed-off-by: Jason Hall <[email protected]>

* tasty boilerplate

Signed-off-by: Jason Hall <[email protected]>

* boilerplate mmm

Signed-off-by: Jason Hall <[email protected]>

---------

Signed-off-by: Jason Hall <[email protected]>
  • Loading branch information
imjasonh authored Jun 15, 2023
1 parent 4e4b03a commit 03ad2ac
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 16 deletions.
15 changes: 13 additions & 2 deletions cmd/crane/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func newCmdRegistry() *cobra.Command {
}

func newCmdServe() *cobra.Command {
return &cobra.Command{
var disk bool
cmd := &cobra.Command{
Use: "serve",
Short: "Serve an in-memory registry implementation",
Long: `This sub-command serves an in-memory registry implementation on port :8080 (or $PORT)
Expand All @@ -60,9 +61,16 @@ Contents are only stored in memory, and when the process exits, pushed data is l
porti := listener.Addr().(*net.TCPAddr).Port
port = fmt.Sprintf("%d", porti)

bh := registry.NewInMemoryBlobHandler()
if disk {
tmp := os.TempDir()
log.Printf("storing blobs in %s", tmp)
bh = registry.NewDiskBlobHandler(tmp)
}

s := &http.Server{
ReadHeaderTimeout: 5 * time.Second, // prevent slowloris, quiet linter
Handler: registry.New(),
Handler: registry.New(registry.WithBlobHandler(bh)),
}
log.Printf("serving on port %s", port)

Expand All @@ -81,4 +89,7 @@ Contents are only stored in memory, and when the process exits, pushed data is l
return nil
},
}
cmd.Flags().BoolVar(&disk, "blobs-to-disk", false, "Store blobs on disk")
cmd.Flags().MarkHidden("blobs-to-disk")
return cmd
}
30 changes: 16 additions & 14 deletions pkg/registry/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,24 @@ func isBlob(req *http.Request) bool {
elem[len(elem)-2] == "uploads")
}

// blobHandler represents a minimal blob storage backend, capable of serving
// BlobHandler represents a minimal blob storage backend, capable of serving
// blob contents.
type blobHandler interface {
type BlobHandler interface {
// Get gets the blob contents, or errNotFound if the blob wasn't found.
Get(ctx context.Context, repo string, h v1.Hash) (io.ReadCloser, error)
}

// blobStatHandler is an extension interface representing a blob storage
// BlobStatHandler is an extension interface representing a blob storage
// backend that can serve metadata about blobs.
type blobStatHandler interface {
type BlobStatHandler interface {
// Stat returns the size of the blob, or errNotFound if the blob wasn't
// found, or redirectError if the blob can be found elsewhere.
Stat(ctx context.Context, repo string, h v1.Hash) (int64, error)
}

// blobPutHandler is an extension interface representing a blob storage backend
// BlobPutHandler is an extension interface representing a blob storage backend
// that can write blob contents.
type blobPutHandler interface {
type BlobPutHandler interface {
// Put puts the blob contents.
//
// The contents will be verified against the expected size and digest
Expand All @@ -75,9 +75,9 @@ type blobPutHandler interface {
Put(ctx context.Context, repo string, h v1.Hash, rc io.ReadCloser) error
}

// blobDeleteHandler is an extension interface representing a blob storage
// BlobDeleteHandler is an extension interface representing a blob storage
// backend that can delete blob contents.
type blobDeleteHandler interface {
type BlobDeleteHandler interface {
// Delete the blob contents.
Delete(ctx context.Context, repo string, h v1.Hash) error
}
Expand All @@ -103,6 +103,8 @@ type memHandler struct {
lock sync.Mutex
}

func NewInMemoryBlobHandler() BlobHandler { return &memHandler{m: map[string][]byte{}} }

func (m *memHandler) Stat(_ context.Context, _ string, h v1.Hash) (int64, error) {
m.lock.Lock()
defer m.lock.Unlock()
Expand Down Expand Up @@ -149,7 +151,7 @@ func (m *memHandler) Delete(_ context.Context, _ string, h v1.Hash) error {

// blobs
type blobs struct {
blobHandler blobHandler
blobHandler BlobHandler

// Each upload gets a unique id that writes occur to until finalized.
uploads map[string][]byte
Expand Down Expand Up @@ -190,7 +192,7 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {
}

var size int64
if bsh, ok := b.blobHandler.(blobStatHandler); ok {
if bsh, ok := b.blobHandler.(BlobStatHandler); ok {
size, err = bsh.Stat(req.Context(), repo, h)
if errors.Is(err, errNotFound) {
return regErrBlobUnknown
Expand Down Expand Up @@ -238,7 +240,7 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {

var size int64
var r io.Reader
if bsh, ok := b.blobHandler.(blobStatHandler); ok {
if bsh, ok := b.blobHandler.(BlobStatHandler); ok {
size, err = bsh.Stat(req.Context(), repo, h)
if errors.Is(err, errNotFound) {
return regErrBlobUnknown
Expand Down Expand Up @@ -292,7 +294,7 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {
return nil

case http.MethodPost:
bph, ok := b.blobHandler.(blobPutHandler)
bph, ok := b.blobHandler.(BlobPutHandler)
if !ok {
return regErrUnsupported
}
Expand Down Expand Up @@ -393,7 +395,7 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {
return nil

case http.MethodPut:
bph, ok := b.blobHandler.(blobPutHandler)
bph, ok := b.blobHandler.(BlobPutHandler)
if !ok {
return regErrUnsupported
}
Expand Down Expand Up @@ -454,7 +456,7 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {
return nil

case http.MethodDelete:
bdh, ok := b.blobHandler.(blobDeleteHandler)
bdh, ok := b.blobHandler.(BlobDeleteHandler)
if !ok {
return regErrUnsupported
}
Expand Down
73 changes: 73 additions & 0 deletions pkg/registry/blobs_disk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2023 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package registry

import (
"context"
"errors"
"io"
"os"
"path/filepath"
"sync"

v1 "github.com/google/go-containerregistry/pkg/v1"
)

type diskHandler struct {
dir string
lock sync.Mutex
}

func NewDiskBlobHandler(dir string) BlobHandler { return &diskHandler{dir: dir} }

func (m *diskHandler) Stat(_ context.Context, _ string, h v1.Hash) (int64, error) {
m.lock.Lock()
defer m.lock.Unlock()

fi, err := os.Stat(filepath.Join(m.dir, h.String()))
if errors.Is(err, os.ErrNotExist) {
return 0, errNotFound
} else if err != nil {
return 0, err
}
return fi.Size(), nil
}
func (m *diskHandler) Get(_ context.Context, _ string, h v1.Hash) (io.ReadCloser, error) {
m.lock.Lock()
defer m.lock.Unlock()

return os.Open(filepath.Join(m.dir, h.String()))
}
func (m *diskHandler) Put(_ context.Context, _ string, h v1.Hash, rc io.ReadCloser) error {
m.lock.Lock()
defer m.lock.Unlock()

f, err := os.Create(filepath.Join(m.dir, h.String()))
if err != nil {
return err
}
defer f.Close()

if _, err := io.Copy(f, rc); err != nil {
return err
}
return nil
}
func (m *diskHandler) Delete(_ context.Context, _ string, h v1.Hash) error {
m.lock.Lock()
defer m.lock.Unlock()

return os.Remove(filepath.Join(m.dir, h.String()))
}
83 changes: 83 additions & 0 deletions pkg/registry/blobs_disk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2023 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package registry_test

import (
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"

"github.com/google/go-containerregistry/pkg/name"
"github.com/google/go-containerregistry/pkg/registry"
"github.com/google/go-containerregistry/pkg/v1/random"
"github.com/google/go-containerregistry/pkg/v1/remote"
"github.com/google/go-containerregistry/pkg/v1/validate"
)

func TestDiskPush(t *testing.T) {
dir := t.TempDir()
reg := registry.New(registry.WithBlobHandler(registry.NewDiskBlobHandler(dir)))
srv := httptest.NewServer(reg)
defer srv.Close()

ref, err := name.ParseReference(strings.TrimPrefix(srv.URL, "http://") + "/foo/bar:latest")
if err != nil {
t.Fatal(err)
}
img, err := random.Image(1024, 5)
if err != nil {
t.Fatal(err)
}
if err := remote.Write(ref, img); err != nil {
t.Fatalf("remote.Write: %v", err)
}

// Test we can read and validate the image.
if _, err := remote.Image(ref); err != nil {
t.Fatalf("remote.Image: %v", err)
}
if err := validate.Image(img); err != nil {
t.Fatalf("validate.Image: %v", err)
}

// Collect the layer SHAs we expect to find.
want := map[string]bool{}
if h, err := img.ConfigName(); err != nil {
t.Fatal(err)
} else {
want[h.String()] = true
}
ls, err := img.Layers()
if err != nil {
t.Fatal(err)
}
for _, l := range ls {
if h, err := l.Digest(); err != nil {
t.Fatal(err)
} else {
want[h.String()] = true
}
}

// Test the blobs are there on disk.
for dig := range want {
if _, err := os.Stat(filepath.Join(dir, dig)); err != nil {
t.Fatalf("os.Stat(%s): %v", dig, err)
}
t.Logf("Found %s", dig)
}
}
6 changes: 6 additions & 0 deletions pkg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,9 @@ func WithWarning(prob float64, msg string) Option {
r.warnings[prob] = msg
}
}

func WithBlobHandler(h BlobHandler) Option {
return func(r *registry) {
r.blobs.blobHandler = h
}
}

0 comments on commit 03ad2ac

Please sign in to comment.