Skip to content

Commit

Permalink
feat: registry proxy
Browse files Browse the repository at this point in the history
Implement container registry proxy.

Signed-off-by: Dmitriy Matrenichev <[email protected]>
  • Loading branch information
DmitriyMV committed Nov 21, 2024
1 parent 77cf84f commit 0ffb218
Show file tree
Hide file tree
Showing 9 changed files with 799 additions and 25 deletions.
47 changes: 26 additions & 21 deletions internal/app/machined/pkg/system/services/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"net/http"
"os"
"sync"
"time"

containerdapi "github.com/containerd/containerd/v2/client"
Expand Down Expand Up @@ -193,27 +194,7 @@ func (k *Kubelet) Runner(r runtime.Runtime) (runner.Runner, error) {

// HealthFunc implements the HealthcheckedService interface.
func (k *Kubelet) HealthFunc(runtime.Runtime) health.Check {
return func(ctx context.Context) error {
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:10248/healthz", nil)
if err != nil {
return err
}

req = req.WithContext(ctx)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
//nolint:errcheck
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("expected HTTP status OK, got %s", resp.Status)
}

return nil
}
return func(ctx context.Context) error { return simpleHealthCheck(ctx, "http://127.0.0.1:10248/healthz") }
}

// HealthSettings implements the HealthcheckedService interface.
Expand Down Expand Up @@ -247,3 +228,27 @@ func kubeletSeccomp(seccomp *specs.LinuxSeccomp) {
},
)
}

func simpleHealthCheck(ctx context.Context, url string) error {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return err
}

req = req.WithContext(ctx)

resp, err := http.DefaultClient.Do(req) //nolint:bodyclose
if err != nil {
return err
}

bodyCloser := sync.OnceValue(resp.Body.Close)

defer bodyCloser() //nolint:errcheck

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("expected HTTP status OK, got %s", resp.Status)
}

return bodyCloser()
}
50 changes: 50 additions & 0 deletions internal/app/machined/pkg/system/services/registry/app/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package main

import (
"context"
"fmt"
"io/fs"
"os"
"os/signal"
"path/filepath"

"go.uber.org/zap"

"github.com/siderolabs/talos/internal/app/machined/pkg/system/services/registry"
)

func main() {
if err := app(); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}

func app() error {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

development, err := zap.NewDevelopment()
if err != nil {
return fmt.Errorf("failed to create development logger: %w", err)
}

homeDir, err := os.UserHomeDir()
if err != nil {
return fmt.Errorf("failed to get user home directory: %w", err)
}

it := func(yield func(fs.StatFS) bool) {
for _, root := range []string{"registry-cache-2", "registry-cache"} {
if !yield(os.DirFS(filepath.Join(homeDir, root)).(fs.StatFS)) {
return
}
}
}

return registry.NewService(registry.NewMultiPathFS(it), development).Run(ctx)
}
61 changes: 61 additions & 0 deletions internal/app/machined/pkg/system/services/registry/fs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package registry

import (
"errors"
"io/fs"
"iter"

"github.com/hashicorp/go-multierror"
)

// MultiPathFS is a FS that can be used to combine multiple FSs into one.
type MultiPathFS struct {
fsIt iter.Seq[fs.StatFS]
}

// NewMultiPathFS creates a new MultiPathFS. It takes an iterator of FSs which can be used multiple times asynchrously.
func NewMultiPathFS(it iter.Seq[fs.StatFS]) *MultiPathFS { return &MultiPathFS{fsIt: it} }

// Open opens the named file.
func (m *MultiPathFS) Open(name string) (fs.File, error) {
var multiErr *multierror.Error

for f := range m.fsIt {
r, err := f.Open(name)
if err == nil {
return r, nil
}

multiErr = multierror.Append(multiErr, err)
}

if multiErr == nil {
return nil, errors.New("roots are empty")
}

return nil, multiErr.ErrorOrNil()
}

// Stat returns a [fs.FileInfo] describing the named file.
func (m *MultiPathFS) Stat(name string) (fs.FileInfo, error) {
var multiErr *multierror.Error

for f := range m.fsIt {
r, err := f.Stat(name)
if err == nil {
return r, nil
}

multiErr = multierror.Append(multiErr, err)
}

if multiErr == nil {
return nil, errors.New("roots are empty")
}

return nil, multiErr.ErrorOrNil()
}
73 changes: 73 additions & 0 deletions internal/app/machined/pkg/system/services/registry/params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package registry

import (
"net/http"
"path"
"strings"

"github.com/distribution/reference"
"github.com/siderolabs/gen/xerrors"
)

func extractParams(req *http.Request) (params, error) {
registry := req.URL.Query().Get("ns")
if registry == "" {
return params{}, xerrors.NewTaggedf[badRequestTag]("missing ns")
}

value := req.PathValue("args")

parts := strings.Split(path.Clean(value), "/")
if len(parts) < 4 {
return params{}, xerrors.NewTaggedf[notFoundTag]("incorrect args value '%s'", value)
}

numParts := len(parts)
isBlob := parts[numParts-2] == "blobs"
isManifest := parts[numParts-2] == "manifests"

if !isBlob && !isManifest {
return params{}, xerrors.NewTaggedf[notFoundTag]("incorrect ref: '%s'", parts[numParts-2])
}

name := strings.Join(parts[:numParts-2], "/")
dig := parts[numParts-1]

if !reference.NameRegexp.MatchString(name) {
return params{}, xerrors.NewTaggedf[badRequestTag]("incorrect name: '%s'", name)
}

return params{registry: registry, name: name, dig: dig, isBlob: isBlob}, nil
}

type params struct {
registry string
name string
dig string
isBlob bool
}

func (p params) String() string {
var result strings.Builder

if p.registry != "" {
result.WriteString(p.registry)
result.WriteByte('/')
}

result.WriteString(p.name)

if strings.HasPrefix(p.dig, "sha256:") {
result.WriteByte('@')
result.WriteString(p.dig)
} else {
result.WriteByte(':')
result.WriteString(p.dig)
}

return result.String()
}
137 changes: 137 additions & 0 deletions internal/app/machined/pkg/system/services/registry/readers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package registry

import (
"errors"
"fmt"
"io"
"io/fs"
"os"

"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/errdefs"
)

var (
errInvalidSize = errors.New("readerat: invalid size")
errSeekToInvalidWhence = errors.New("readerat: seek to invalid whence")
errSeekToNegativePosition = errors.New("readerat: seek to negative position")
)

// readSeeker is an io.ReadSeeker implementation based on an io.ReaderAt (and
// an int64 size).
//
// For example, an os.File is both an io.ReaderAt and an io.ReadSeeker, but its
// io.ReadSeeker methods are not safe to use concurrently. In comparison,
// multiple readerat.readSeeker values (using the same os.File as their
// io.ReaderAt) are safe to use concurrently. Each can Read and Seek
// independently.
//
// A single readerat.readSeeker is not safe to use concurrently.
//
// Do not modify its exported fields after calling any of its methods.
type readSeeker struct {
ReaderAt io.ReaderAt
Size int64
offset int64
}

// Read implements io.Reader.
func (r *readSeeker) Read(p []byte) (int, error) {
if r.Size < 0 {
return 0, errInvalidSize
} else if r.Size <= r.offset {
return 0, io.EOF
}

if length := r.Size - r.offset; int64(len(p)) > length {
p = p[:length]
}

if len(p) == 0 {
return 0, nil
}

actual, err := r.ReaderAt.ReadAt(p, r.offset)
r.offset += int64(actual)

if err == nil && r.offset == r.Size {
err = io.EOF
}

return actual, err
}

// Seek implements io.Seeker.
func (r *readSeeker) Seek(offset int64, whence int) (int64, error) {
if r.Size < 0 {
return 0, errInvalidSize
}

switch whence {
case io.SeekStart:
// No-op.
case io.SeekCurrent:
offset += r.offset
case io.SeekEnd:
offset += r.Size
default:
return 0, errSeekToInvalidWhence
}

if offset < 0 {
return 0, errSeekToNegativePosition
}

r.offset = offset

return r.offset, nil
}

// openReaderAt creates ReaderAt from a file.
func openReaderAt(p string, statFS fs.StatFS) (content.ReaderAt, error) {
fi, err := statFS.Stat(p)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}

return nil, fmt.Errorf("blob not found: %w", errdefs.ErrNotFound)
}

fp, err := statFS.Open(p)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}

return nil, fmt.Errorf("blob not found: %w", errdefs.ErrNotFound)
}

f, ok := fp.(fsFileReaderAt)
if !ok {
return nil, fmt.Errorf("not a fsFileReaderAt: %T, details: %v", fp, fp)
}

return sizeReaderAt{size: fi.Size(), fp: f}, nil
}

// readerat implements io.ReaderAt in a completely stateless manner by opening
// the referenced file for each call to ReadAt.
type sizeReaderAt struct {
size int64
fp fsFileReaderAt
}

func (ra sizeReaderAt) ReadAt(p []byte, offset int64) (int, error) { return ra.fp.ReadAt(p, offset) }
func (ra sizeReaderAt) Size() int64 { return ra.size }
func (ra sizeReaderAt) Close() error { return ra.fp.Close() }
func (ra sizeReaderAt) Reader() io.Reader { return io.LimitReader(ra.fp, ra.size) }

type fsFileReaderAt interface {
io.ReaderAt
fs.File
}
Loading

0 comments on commit 0ffb218

Please sign in to comment.