Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Require contexts in remote client methods
Browse files Browse the repository at this point in the history
Replace the `context.TODO()`s given to with a context passed in.

Also: the values returned by the docker distribution registry client
will be one of the types in
github.com/docker/distribution/manifest/{schema1,schema2,manifestlist},
so instead of dispatching on the media type and doing the
deserialisation ourselves, just dispatch on the value's type.
  • Loading branch information
squaremo committed Nov 30, 2017
1 parent 7bad974 commit a07617d
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 70 deletions.
23 changes: 17 additions & 6 deletions registry/cache/warming.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFun
imageCreds := imagesToFetchFunc()
backlog := imageCredsToBacklog(imageCreds)

// We have some fine control over how long to spend on each fetch
// operation, since they are given a `context`. For now though,
// just rattle through them one by one, however long they take.
ctx := context.Background()

// This loop acts keeps a kind of priority queue, whereby image
// names coming in on the `Priority` channel are looked up first.
// If there are none, images used in the cluster are refreshed;
Expand All @@ -65,7 +70,7 @@ func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFun
// image has to have been running the last time we
// requested the credentials.
if creds, ok := imageCreds[name]; ok {
w.warm(name, creds)
w.warm(ctx, name, creds)
} else {
w.Logger.Log("priority", name.String(), "err", "no creds available")
}
Expand All @@ -76,7 +81,7 @@ func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFun
if len(backlog) > 0 {
im := backlog[0]
backlog = backlog[1:]
w.warm(im.Name, im.Credentials)
w.warm(ctx, im.Name, im.Credentials)
} else {
select {
case <-refresh:
Expand All @@ -98,7 +103,7 @@ func imageCredsToBacklog(imageCreds registry.ImageCreds) []backlogItem {
return backlog
}

func (w *Warmer) warm(id image.Name, creds registry.Credentials) {
func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credentials) {
client, err := w.ClientFactory.ClientFor(id.CanonicalName(), creds)
if err != nil {
w.Logger.Log("err", err.Error())
Expand Down Expand Up @@ -136,7 +141,7 @@ func (w *Warmer) warm(id image.Name, creds registry.Credentials) {
}
}()

tags, err := client.Tags()
tags, err := client.Tags(ctx)
if err != nil {
if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) && !strings.Contains(err.Error(), "net/http: request canceled") {
w.Logger.Log("err", errors.Wrap(err, "requesting tags"))
Expand Down Expand Up @@ -186,13 +191,19 @@ func (w *Warmer) warm(id image.Name, creds registry.Credentials) {
// w.Burst, so limit the number of fetching goroutines to that.
fetchers := make(chan struct{}, w.Burst)
awaitFetchers := &sync.WaitGroup{}
updates:
for _, imID := range toUpdate {
select {
case <-ctx.Done():
break updates
case fetchers <- struct{}{}:
}

awaitFetchers.Add(1)
fetchers <- struct{}{}
go func(imageID image.Ref) {
defer func() { awaitFetchers.Done(); <-fetchers }()
// Get the image from the remote
img, err := client.Manifest(imageID.Tag)
img, err := client.Manifest(ctx, imageID.Tag)
if err != nil {
if err, ok := errors.Cause(err).(net.Error); ok && err.Timeout() {
// This was due to a context timeout, don't bother logging
Expand Down
84 changes: 35 additions & 49 deletions registry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"net/http"
"reflect"
"time"

"github.com/docker/distribution"
Expand All @@ -17,27 +18,35 @@ import (
"github.com/weaveworks/flux/image"
)

// Client is a remote registry client for a particular image
// repository (e.g., for quay.io/weaveworks/flux). It is an interface
// so we can wrap it in instrumentation, write fake implementations,
// and so on.
type Client interface {
Tags(context.Context) ([]string, error)
Manifest(ctx context.Context, ref string) (image.Info, error)
}

type Remote struct {
transport http.RoundTripper
repo image.CanonicalName
}

// Adapt to docker distribution reference.Named
// Adapt to docker distribution `reference.Named`.
type named struct {
image.CanonicalName
}

// Name returns the name of the repository. These values are used to
// build API URLs, and (it turns out) are _not_ expected to include a
// domain (e.g., quay.io). Hence, the implementation here just returns
// the path.
func (n named) Name() string {
return n.Image
}

func (n named) String() string {
return n.String()
}

// Return the tags for this repository.
func (a *Remote) Tags() ([]string, error) {
ctx := context.TODO()
func (a *Remote) Tags(ctx context.Context) ([]string, error) {
repository, err := client.NewRepository(named{a.repo}, "https://"+a.repo.Domain, a.transport)
if err != nil {
return nil, err
Expand All @@ -47,8 +56,7 @@ func (a *Remote) Tags() ([]string, error) {

// Manifest fetches the metadata for an image reference; currently
// assumed to be in the same repo as that provided to `NewRemote(...)`
func (a *Remote) Manifest(ref string) (image.Info, error) {
ctx := context.TODO()
func (a *Remote) Manifest(ctx context.Context, ref string) (image.Info, error) {
repository, err := client.NewRepository(named{a.repo}, "https://"+a.repo.Domain, a.transport)
if err != nil {
return image.Info{}, err
Expand All @@ -64,49 +72,27 @@ interpret:
return image.Info{}, err
}

mt, bytes, err := manifest.Payload()
if err != nil {
return image.Info{}, err
}

info := image.Info{ID: a.repo.ToRef(ref)}

// for decoding the v1-compatibility entry in schema1 manifests
var v1 struct {
ID string `json:"id"`
Created time.Time `json:"created"`
OS string `json:"os"`
Arch string `json:"architecture"`
}

// TODO(michael): can we type switch? Not sure how dependable the
// underlying types are.
switch mt {
case schema1.MediaTypeManifest:
// TODO: can this be fallthrough? Find something to check on...
var man schema1.Manifest
if err = json.Unmarshal(bytes, &man); err != nil {
return image.Info{}, err
}
if err = json.Unmarshal([]byte(man.History[0].V1Compatibility), &v1); err != nil {
return image.Info{}, err
}
info.CreatedAt = v1.Created
case schema1.MediaTypeSignedManifest:
var man schema1.SignedManifest
if err = json.Unmarshal(bytes, &man); err != nil {
return image.Info{}, err
switch deserialised := manifest.(type) {
case *schema1.SignedManifest:
var man schema1.Manifest = deserialised.Manifest
// for decoding the v1-compatibility entry in schema1 manifests
var v1 struct {
ID string `json:"id"`
Created time.Time `json:"created"`
OS string `json:"os"`
Arch string `json:"architecture"`
}

if err = json.Unmarshal([]byte(man.History[0].V1Compatibility), &v1); err != nil {
return image.Info{}, err
}
info.CreatedAt = v1.Created
case schema2.MediaTypeManifest:
var man schema2.Manifest
if err = json.Unmarshal(bytes, &man); err != nil {
return image.Info{}, err
}

case *schema2.DeserializedManifest:
var man schema2.Manifest = deserialised.Manifest
configBytes, err := repository.Blobs(ctx).Get(ctx, man.Config.Digest)
if err != nil {
return image.Info{}, err
Expand All @@ -121,19 +107,19 @@ interpret:
return image.Info{}, err
}
info.CreatedAt = config.Created
case manifestlist.MediaTypeManifestList:
var list manifestlist.ManifestList
if err = json.Unmarshal(bytes, &list); err != nil {
return image.Info{}, err
}
// TODO(michael): can we just pick the first one that matches?
case *manifestlist.DeserializedManifestList:
var list manifestlist.ManifestList = deserialised.ManifestList
// TODO(michael): is it valid to just pick the first one that matches?
for _, m := range list.Manifests {
if m.Platform.OS == "linux" && m.Platform.Architecture == "amd64" {
manifest, fetchErr = manifests.Get(ctx, m.Digest)
goto interpret
}
}
return image.Info{}, errors.New("no suitable manifest (linux amd64) in manifestlist")
default:
t := reflect.TypeOf(manifest)
return image.Info{}, errors.New("unknown manifest type: " + t.String())
}
return info, nil
}
6 changes: 4 additions & 2 deletions registry/mock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package registry

import (
"context"

"github.com/pkg/errors"

"github.com/weaveworks/flux/image"
Expand Down Expand Up @@ -31,11 +33,11 @@ func NewMockClient(manifest ManifestFunc, tags TagsFunc) Client {
}
}

func (m *mockDockerClient) Manifest(tag string) (image.Info, error) {
func (m *mockDockerClient) Manifest(ctx context.Context, tag string) (image.Info, error) {
return m.manifest(tag)
}

func (m *mockDockerClient) Tags() ([]string, error) {
func (m *mockDockerClient) Tags(context.Context) ([]string, error) {
return m.tags()
}

Expand Down
9 changes: 5 additions & 4 deletions registry/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package registry
// Monitoring middlewares for registry interfaces

import (
"context"
"strconv"
"time"

Expand Down Expand Up @@ -77,19 +78,19 @@ func NewInstrumentedClient(next Client) Client {
}
}

func (m *instrumentedClient) Manifest(ref string) (res image.Info, err error) {
func (m *instrumentedClient) Manifest(ctx context.Context, ref string) (res image.Info, err error) {
start := time.Now()
res, err = m.next.Manifest(ref)
res, err = m.next.Manifest(ctx, ref)
remoteDuration.With(
LabelRequestKind, RequestKindMetadata,
fluxmetrics.LabelSuccess, strconv.FormatBool(err == nil),
).Observe(time.Since(start).Seconds())
return
}

func (m *instrumentedClient) Tags() (res []string, err error) {
func (m *instrumentedClient) Tags(ctx context.Context) (res []string, err error) {
start := time.Now()
res, err = m.next.Tags()
res, err = m.next.Tags(ctx)
remoteDuration.With(
LabelRequestKind, RequestKindTags,
fluxmetrics.LabelSuccess, strconv.FormatBool(err == nil),
Expand Down
9 changes: 0 additions & 9 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,6 @@ type Registry interface {
GetImage(image.Ref) (image.Info, error)
}

// Client is a remote registry client for a particular image
// repository (e.g., for quay.io/weaveworks/flux). It is an interface
// so we can wrap it in instrumentation, write fake implementations,
// and so on.
type Client interface {
Tags() ([]string, error)
Manifest(ref string) (image.Info, error)
}

// ImageCreds is a record of which images need which credentials,
// which is supplied to us (probably by interrogating the cluster)
type ImageCreds map[image.Name]Credentials

0 comments on commit a07617d

Please sign in to comment.