diff --git a/Gopkg.lock b/Gopkg.lock index ffb025bc2..4274006be 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -40,8 +40,8 @@ [[projects]] branch = "master" name = "github.com/docker/distribution" - packages = [".","digestset","manifest","manifest/schema1","manifest/schema2","reference"] - revision = "7484e51bf6af0d3b1a849644cdaced3cfcf13617" + packages = [".","digestset","manifest","manifest/manifestlist","manifest/schema1","manifest/schema2","reference","registry/api/errcode","registry/api/v2","registry/client","registry/client/auth","registry/client/auth/challenge","registry/client/transport","registry/storage/cache","registry/storage/cache/memory"] + revision = "bc3c7b0525e59d3ecfab3e1568350895fd4a462f" [[projects]] branch = "master" @@ -187,13 +187,6 @@ packages = ["."] revision = "3573b8b52aa7b37b9358d966a898feb387f62437" -[[projects]] - branch = "master" - name = "github.com/heroku/docker-registry-client" - packages = ["registry"] - revision = "2e2f1c58a06fae38b08307d1a834ae9c971a2467" - source = "github.com/weaveworks/docker-registry-client" - [[projects]] name = "github.com/inconshreveable/mousetrap" packages = ["."] @@ -299,7 +292,7 @@ branch = "master" name = "github.com/weaveworks/common" packages = ["errors","httpgrpc","logging","middleware","user"] - revision = "1df6402aaf7232feadf798b64c6a0cae847e84d8" + revision = "57600de7028ee2b8070f1696250887ad243b786c" [[projects]] branch = "master" @@ -322,7 +315,7 @@ [[projects]] branch = "master" name = "golang.org/x/net" - packages = ["context","http2","http2/hpack","idna","internal/timeseries","lex/httplex","publicsuffix","trace"] + packages = ["context","http2","http2/hpack","idna","internal/timeseries","lex/httplex","trace"] revision = "0a9397675ba34b2845f758fe3cd68828369c6517" [[projects]] @@ -388,6 +381,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "7afd8c2dc700bde147e92ae63f845dca1b2555a68c948e76aa72e78ff6470b06" + inputs-digest = "9a246cacb3ba700c40f1d9dddaddb45729b77be85a810753503c096b2a9c70d4" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 789df2bd3..ef44e8c48 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -6,11 +6,10 @@ name = "k8s.io/client-go" version = "4.0.0" -[[constraint]] - name = "github.com/heroku/docker-registry-client" - source = "github.com/weaveworks/docker-registry-client" - branch = "master" - [[override]] name = "github.com/ugorji/go" revision = "8c0409fcbb70099c748d71f714529204975f6c3f" + +[[constraint]] + name = "github.com/docker/distribution" + branch = "master" diff --git a/cmd/fluxd/main.go b/cmd/fluxd/main.go index c624521e2..df543d2b9 100644 --- a/cmd/fluxd/main.go +++ b/cmd/fluxd/main.go @@ -31,7 +31,8 @@ import ( "github.com/weaveworks/flux/image" "github.com/weaveworks/flux/job" "github.com/weaveworks/flux/registry" - registryMemcache "github.com/weaveworks/flux/registry/cache" + "github.com/weaveworks/flux/registry/cache" + registryMemcache "github.com/weaveworks/flux/registry/cache/memcached" registryMiddleware "github.com/weaveworks/flux/registry/middleware" "github.com/weaveworks/flux/remote" "github.com/weaveworks/flux/ssh" @@ -85,11 +86,11 @@ func main() { gitPollInterval = fs.Duration("git-poll-interval", 5*time.Minute, "period at which to poll git repo for new commits") // registry - memcachedHostname = fs.String("memcached-hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.") + memcachedHostname = fs.String("memcached-hostname", "memcached", "Hostname for memcached service.") memcachedTimeout = fs.Duration("memcached-timeout", time.Second, "Maximum time to wait before giving up on memcached requests.") memcachedService = fs.String("memcached-service", "memcached", "SRV service used to discover memcache servers.") - registryCacheExpiry = fs.Duration("registry-cache-expiry", 20*time.Minute, "Duration to keep cached registry tag info. Must be < 1 month.") - registryPollInterval = fs.Duration("registry-poll-interval", 5*time.Minute, "period at which to poll registry for new images") + registryCacheExpiry = fs.Duration("registry-cache-expiry", 1*time.Hour, "Duration to keep cached image info. Must be < 1 month.") + registryPollInterval = fs.Duration("registry-poll-interval", 5*time.Minute, "period at which to check for updated images") registryRPS = fs.Int("registry-rps", 200, "maximum registry requests per second per host") registryBurst = fs.Int("registry-burst", defaultRemoteConnections, "maximum number of warmer connections to remote and memcache") @@ -229,61 +230,45 @@ func main() { } // Registry components - var cache registry.Registry - var cacheWarmer registry.Warmer + var cacheRegistry registry.Registry + var cacheWarmer *cache.Warmer { - // Cache - var memcacheRegistry registryMemcache.Client - if *memcachedHostname != "" { - memcacheRegistry = registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{ - Host: *memcachedHostname, - Service: *memcachedService, - Timeout: *memcachedTimeout, - UpdateInterval: 1 * time.Minute, - Logger: log.With(logger, "component", "memcached"), - MaxIdleConns: defaultMemcacheConnections, - }) - memcacheRegistry = registryMemcache.InstrumentMemcacheClient(memcacheRegistry) - defer memcacheRegistry.Stop() - } - var memcacheWarmer registryMemcache.Client - if *memcachedHostname != "" { - memcacheWarmer = registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{ - Host: *memcachedHostname, - Service: *memcachedService, - Timeout: *memcachedTimeout, - UpdateInterval: 1 * time.Minute, - Logger: log.With(logger, "component", "memcached"), - MaxIdleConns: *registryBurst, - }) - memcacheWarmer = registryMemcache.InstrumentMemcacheClient(memcacheWarmer) - defer memcacheWarmer.Stop() - } + // Cache client, for use by registry and cache warmer + var cacheClient cache.Client + memcacheClient := registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{ + Host: *memcachedHostname, + Service: *memcachedService, + Expiry: *registryCacheExpiry, + Timeout: *memcachedTimeout, + UpdateInterval: 1 * time.Minute, + Logger: log.With(logger, "component", "memcached"), + MaxIdleConns: *registryBurst, + }) + defer memcacheClient.Stop() + cacheClient = cache.InstrumentClient(memcacheClient) - cacheLogger := log.With(logger, "component", "cache") - cache = registry.NewRegistry( - registry.NewCacheClientFactory(cacheLogger, memcacheRegistry, *registryCacheExpiry), - cacheLogger, - defaultMemcacheConnections, - ) - cache = registry.NewInstrumentedRegistry(cache) + cacheRegistry = &cache.Cache{ + Reader: cacheClient, + } + cacheRegistry = registry.NewInstrumentedRegistry(cacheRegistry) - // Remote + // Remote client, for warmer to refresh entries registryLogger := log.With(logger, "component", "registry") - remoteFactory := registry.NewRemoteClientFactory(registryLogger, registryMiddleware.RateLimiterConfig{ + registryLimits := ®istryMiddleware.RateLimiters{ RPS: *registryRPS, Burst: *registryBurst, - }) + } + remoteFactory := ®istry.RemoteClientFactory{ + Logger: registryLogger, + Limiters: registryLimits, + } // Warmer - warmerLogger := log.With(logger, "component", "warmer") - cacheWarmer = registry.Warmer{ - Logger: warmerLogger, - ClientFactory: remoteFactory, - Expiry: *registryCacheExpiry, - Reader: memcacheWarmer, - Writer: memcacheWarmer, - Burst: *registryBurst, + var err error + cacheWarmer, err = cache.NewWarmer(remoteFactory, cacheClient, *registryBurst) + if err != nil { + logger.Log("err", err) + os.Exit(1) } } @@ -439,7 +424,7 @@ func main() { V: version, Cluster: k8s, Manifests: k8sManifests, - Registry: cache, + Registry: cacheRegistry, ImageRefresh: make(chan image.Name, 100), // size chosen by fair dice roll Repo: repo, Checkout: checkout, Jobs: jobs, @@ -458,7 +443,7 @@ func main() { cacheWarmer.Notify = daemon.AskForImagePoll cacheWarmer.Priority = daemon.ImageRefresh shutdownWg.Add(1) - go cacheWarmer.Loop(shutdown, shutdownWg, image_creds) + go cacheWarmer.Loop(log.With(logger, "component", "warmer"), shutdown, shutdownWg, image_creds) // Update daemonRef so that upstream and handlers point to fully working daemon daemonRef.UpdatePlatform(daemon) diff --git a/daemon/daemon_test.go b/daemon/daemon_test.go index 34e6feec3..95d9b5aee 100644 --- a/daemon/daemon_test.go +++ b/daemon/daemon_test.go @@ -25,6 +25,7 @@ import ( "github.com/weaveworks/flux/job" "github.com/weaveworks/flux/policy" "github.com/weaveworks/flux/registry" + registryMock "github.com/weaveworks/flux/registry/mock" "github.com/weaveworks/flux/remote" "github.com/weaveworks/flux/resource" "github.com/weaveworks/flux/update" @@ -321,6 +322,11 @@ func TestDaemon_JobStatusWithNoCache(t *testing.T) { w.ForJobSucceeded(d, id) } +func makeImageInfo(ref string, t time.Time) image.Info { + r, _ := image.ParseRef(ref) + return image.Info{ID: r, CreatedAt: t} +} + func mockDaemon(t *testing.T) (*Daemon, func(), *cluster.Mock, *mockEventWriter) { logger := log.NewNopLogger() @@ -397,14 +403,16 @@ func mockDaemon(t *testing.T) (*Daemon, func(), *cluster.Mock, *mockEventWriter) var imageRegistry registry.Registry { - img1, _ := image.ParseInfo(currentHelloImage, time.Now()) - img2, _ := image.ParseInfo(newHelloImage, time.Now().Add(1*time.Second)) - img3, _ := image.ParseInfo("another/service:latest", time.Now().Add(1*time.Second)) - imageRegistry = registry.NewMockRegistry([]image.Info{ - img1, - img2, - img3, - }, nil) + img1 := makeImageInfo(currentHelloImage, time.Now()) + img2 := makeImageInfo(newHelloImage, time.Now().Add(1*time.Second)) + img3 := makeImageInfo("another/service:latest", time.Now().Add(1*time.Second)) + imageRegistry = ®istryMock.Registry{ + Images: []image.Info{ + img1, + img2, + img3, + }, + } } events := &mockEventWriter{} diff --git a/daemon/loop_test.go b/daemon/loop_test.go index 445717921..e99f11f03 100644 --- a/daemon/loop_test.go +++ b/daemon/loop_test.go @@ -21,7 +21,7 @@ import ( "github.com/weaveworks/flux/git" "github.com/weaveworks/flux/git/gittest" "github.com/weaveworks/flux/job" - "github.com/weaveworks/flux/registry" + registryMock "github.com/weaveworks/flux/registry/mock" "github.com/weaveworks/flux/resource" ) @@ -66,7 +66,7 @@ func daemon(t *testing.T) (*Daemon, func()) { d := &Daemon{ Cluster: k8s, Manifests: k8s, - Registry: registry.NewMockRegistry(nil, nil), + Registry: ®istryMock.Registry{}, Checkout: working, Jobs: jobs, JobStatusCache: &job.StatusCache{Size: 100}, diff --git a/deploy/flux-deployment.yaml b/deploy/flux-deployment.yaml index de1625f80..b79aad888 100644 --- a/deploy/flux-deployment.yaml +++ b/deploy/flux-deployment.yaml @@ -32,13 +32,13 @@ spec: - name: git-key mountPath: /etc/fluxd/ssh args: - # if you deployed memcached, you can supply these arguments to - # tell fluxd to use it. You may need to change the namespace - # (`default`) if you run fluxd in another namespace. - - --memcached-hostname=memcached.default.svc.cluster.local - - --memcached-timeout=100ms - - --memcached-service=memcached - - --registry-cache-expiry=20m + + # if you deployed memcached in a different namespace to flux, + # or with a different service name, you can supply these + # following two arguments to tell fluxd how to connect to it. + # - --memcached-hostname=memcached.default.svc.cluster.local + # - --memcached-service=memcached + # replace (at least) the following URL - --git-url=git@github.com:weaveworks/flux-example - --git-branch=master @@ -46,6 +46,3 @@ spec: # (e.g., Weave Cloud). The token is particular to the service. # - --connect=wss://cloud.weave.works/api/flux # - --token=abc123abc123abc123abc123 - # override -b and -t arguments to ssh-keygen - # - --ssh-keygen-bits=2048 - - --ssh-keygen-type=ed25519 diff --git a/image/image.go b/image/image.go index 1bc0a30a6..ac283165a 100644 --- a/image/image.go +++ b/image/image.go @@ -221,32 +221,49 @@ func (i Ref) WithNewTag(t string) Ref { return img } -// Info has the metadata we are able to determine about an image, from -// its registry. +// Info has the metadata we are able to determine about an image ref, +// from its registry. type Info struct { - ID Ref + // the reference to this image; probably a tagged image name + ID Ref + // the digest we got when fetching the metadata, which will be + // different each time a manifest is uploaded for the reference + Digest string + // an identifier for the *image* this reference points to; this + // will be the same for references that point at the same image + // (but does not necessarily equal Docker's image ID) + ImageID string + // the time at which the image pointed at was created CreatedAt time.Time } +// MarshalJSON returns the Info value in JSON (as bytes). It is +// implemented so that we can omit the `CreatedAt` value when it's +// zero, which would otherwise be tricky for e.g., JavaScript to +// detect. func (im Info) MarshalJSON() ([]byte, error) { + type InfoAlias Info // alias to shed existing MarshalJSON implementation var t string if !im.CreatedAt.IsZero() { t = im.CreatedAt.UTC().Format(time.RFC3339Nano) } encode := struct { - ID Ref + InfoAlias CreatedAt string `json:",omitempty"` - }{im.ID, t} + }{InfoAlias(im), t} return json.Marshal(encode) } +// UnmarshalJSON populates an Info from JSON (as bytes). It's the +// companion to MarshalJSON above. func (im *Info) UnmarshalJSON(b []byte) error { + type InfoAlias Info unencode := struct { - ID Ref + InfoAlias CreatedAt string `json:",omitempty"` }{} json.Unmarshal(b, &unencode) - im.ID = unencode.ID + *im = Info(unencode.InfoAlias) if unencode.CreatedAt == "" { im.CreatedAt = time.Time{} } else { @@ -259,17 +276,6 @@ func (im *Info) UnmarshalJSON(b []byte) error { return nil } -func ParseInfo(s string, createdAt time.Time) (Info, error) { - id, err := ParseRef(s) - if err != nil { - return Info{}, err - } - return Info{ - ID: id, - CreatedAt: createdAt, - }, nil -} - // ByCreatedDesc is a shim used to sort image info by creation date type ByCreatedDesc []Info diff --git a/image/image_test.go b/image/image_test.go index a550d472c..f3a6eb6f2 100644 --- a/image/image_test.go +++ b/image/image_test.go @@ -3,6 +3,7 @@ package image import ( "encoding/json" "fmt" + "reflect" "sort" "strconv" "testing" @@ -137,15 +138,56 @@ func TestRefSerialization(t *testing.T) { } } +func mustMakeInfo(ref string, created time.Time) Info { + r, err := ParseRef(ref) + if err != nil { + panic(err) + } + return Info{ID: r, CreatedAt: created} +} + +func TestImageInfoSerialisation(t *testing.T) { + t0 := time.Now().UTC() // UTC so it has nil location, otherwise it won't compare + info := mustMakeInfo("my/image:tag", t0) + info.Digest = "sha256:digest" + info.ImageID = "sha256:layerID" + bytes, err := json.Marshal(info) + if err != nil { + t.Fatal(err) + } + var info1 Info + if err = json.Unmarshal(bytes, &info1); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(info, info1) { + t.Errorf("roundtrip serialisation failed:\n original: %#v\nroundtripped: %#v", info, info1) + } +} + +func TestImageInfoCreatedAtZero(t *testing.T) { + info := mustMakeInfo("my/image:tag", time.Now()) + info = Info{ID: info.ID} + bytes, err := json.Marshal(info) + if err != nil { + t.Fatal(err) + } + var info1 map[string]interface{} + if err = json.Unmarshal(bytes, &info1); err != nil { + t.Fatal(err) + } + if _, ok := info1["CreatedAt"]; ok { + t.Errorf("serialised Info included zero time field; expected it to be omitted\n%s", string(bytes)) + } +} + func TestImage_OrderByCreationDate(t *testing.T) { - fmt.Printf("testTime: %s\n", testTime) time0 := testTime.Add(time.Second) time2 := testTime.Add(-time.Second) - imA, _ := ParseInfo("my/Image:3", testTime) - imB, _ := ParseInfo("my/Image:1", time0) - imC, _ := ParseInfo("my/Image:4", time2) - imD, _ := ParseInfo("my/Image:0", time.Time{}) // test nil - imE, _ := ParseInfo("my/Image:2", testTime) // test equal + imA := mustMakeInfo("my/Image:3", testTime) + imB := mustMakeInfo("my/Image:1", time0) + imC := mustMakeInfo("my/Image:4", time2) + imD := mustMakeInfo("my/Image:0", time.Time{}) // test nil + imE := mustMakeInfo("my/Image:2", testTime) // test equal imgs := []Info{imA, imB, imC, imD, imE} sort.Sort(ByCreatedDesc(imgs)) for i, im := range imgs { diff --git a/registry/cache/cache.go b/registry/cache/cache.go new file mode 100644 index 000000000..0811c7436 --- /dev/null +++ b/registry/cache/cache.go @@ -0,0 +1,74 @@ +package cache + +import ( + "strings" + "time" + + "github.com/weaveworks/flux/image" +) + +type Reader interface { + GetKey(k Keyer) ([]byte, time.Time, error) +} + +type Writer interface { + SetKey(k Keyer, v []byte) error +} + +type Client interface { + Reader + Writer +} + +// An interface to provide the key under which to store the data +// Use the full path to image for the memcache key because there +// might be duplicates from other registries +type Keyer interface { + Key() string +} + +type manifestKey struct { + fullRepositoryPath, reference string +} + +func NewManifestKey(image image.CanonicalRef) Keyer { + return &manifestKey{image.CanonicalName().String(), image.Tag} +} + +func (k *manifestKey) Key() string { + return strings.Join([]string{ + "registryhistoryv3", // Just to version in case we need to change format later. + k.fullRepositoryPath, + k.reference, + }, "|") +} + +type tagKey struct { + fullRepositoryPath string +} + +func NewTagKey(id image.CanonicalName) Keyer { + return &tagKey{id.String()} +} + +func (k *tagKey) Key() string { + return strings.Join([]string{ + "registrytagsv3", // Just to version in case we need to change format later. + k.fullRepositoryPath, + }, "|") +} + +type repoKey struct { + fullRepositoryPath string +} + +func NewRepositoryKey(repo image.CanonicalName) Keyer { + return &repoKey{repo.String()} +} + +func (k *repoKey) Key() string { + return strings.Join([]string{ + "registryrepov3", + k.fullRepositoryPath, + }, "|") +} diff --git a/registry/cache/doc.go b/registry/cache/doc.go new file mode 100644 index 000000000..ae6be8279 --- /dev/null +++ b/registry/cache/doc.go @@ -0,0 +1,12 @@ +/* +This package implements an image metadata cache given a backing k-v +store. + +The interface `Client` stands in for the k-v store (e.g., memcached, +in the subpackage); `Cache` implements registry.Registry given a +`Client`. + +The `Warmer` is for continually refreshing the cache by fetching new +metadata from the original image registries. +*/ +package cache diff --git a/registry/cache/memcached.go b/registry/cache/memcached.go deleted file mode 100644 index c27466645..000000000 --- a/registry/cache/memcached.go +++ /dev/null @@ -1,272 +0,0 @@ -package cache - -import ( - "encoding/json" - "fmt" - "net" - "sort" - "strings" - "sync" - "time" - - "github.com/bradfitz/gomemcache/memcache" - "github.com/go-kit/kit/log" - "github.com/pkg/errors" - - fluxerr "github.com/weaveworks/flux/errors" - "github.com/weaveworks/flux/image" -) - -const ( - expiry = time.Hour -) - -var ( - ErrNotCached = &fluxerr.Error{ - Type: fluxerr.Missing, - Err: memcache.ErrCacheMiss, - Help: `Image not yet cached - -It takes time to initially cache all the images. Please wait. - -If you have waited for a long time, check the flux logs. Potential -reasons for the error are: no internet, no cache, error with the remote -repository. -`, - } -) - -type Reader interface { - GetKey(k Keyer) ([]byte, error) - GetExpiration(k Keyer) (time.Time, error) -} - -type Writer interface { - SetKey(k Keyer, v []byte) error -} - -type Client interface { - Reader - Writer - Stop() -} - -type expiryData struct { - Data []byte - Expiry int32 -} - -// MemcacheClient is a memcache client that gets its server list from SRV -// records, and periodically updates that ServerList. -type memcacheClient struct { - *memcache.Client - serverList *memcache.ServerList - hostname string - service string - logger log.Logger - - quit chan struct{} - wait sync.WaitGroup -} - -// MemcacheConfig defines how a MemcacheClient should be constructed. -type MemcacheConfig struct { - Host string - Service string - Timeout time.Duration - UpdateInterval time.Duration - Logger log.Logger - MaxIdleConns int -} - -func NewMemcacheClient(config MemcacheConfig) Client { - var servers memcache.ServerList - client := memcache.NewFromSelector(&servers) - client.Timeout = config.Timeout - client.MaxIdleConns = config.MaxIdleConns - - newClient := &memcacheClient{ - Client: client, - serverList: &servers, - hostname: config.Host, - service: config.Service, - logger: config.Logger, - quit: make(chan struct{}), - } - - err := newClient.updateMemcacheServers() - if err != nil { - config.Logger.Log("err", errors.Wrapf(err, "Error setting memcache servers to '%v'", config.Host)) - } - - newClient.wait.Add(1) - go newClient.updateLoop(config.UpdateInterval) - return newClient -} - -// Does not use DNS, accepts static list of servers. -func NewFixedServerMemcacheClient(config MemcacheConfig, addresses ...string) Client { - var servers memcache.ServerList - servers.SetServers(addresses...) - client := memcache.NewFromSelector(&servers) - client.Timeout = config.Timeout - - newClient := &memcacheClient{ - Client: client, - serverList: &servers, - hostname: config.Host, - service: config.Service, - logger: config.Logger, - quit: make(chan struct{}), - } - - client.FlushAll() - return newClient -} - -func (c *memcacheClient) GetKey(k Keyer) ([]byte, error) { - cacheItem, err := c.Get(k.Key()) - if err != nil { - if err == memcache.ErrCacheMiss { - // Don't log on cache miss - return []byte{}, ErrNotCached - } else { - c.logger.Log("err", errors.Wrap(err, "Fetching tag from memcache")) - return []byte{}, err - } - } - var data expiryData - err = json.Unmarshal(cacheItem.Value, &data) - if err != nil { - return []byte{}, err - } - return data.Data, nil -} - -// GetExpiration returns the expiry time of the key -func (c *memcacheClient) GetExpiration(k Keyer) (time.Time, error) { - cacheItem, err := c.Get(k.Key()) - if err != nil { - if err == memcache.ErrCacheMiss { - // Don't log on cache miss - return time.Time{}, ErrNotCached - } else { - c.logger.Log("err", errors.Wrap(err, "Fetching tag from memcache")) - return time.Time{}, err - } - } - - var data expiryData - err = json.Unmarshal(cacheItem.Value, &data) - if err != nil { - return time.Time{}, err - } - return time.Unix(int64(data.Expiry), 0), nil -} - -func (c *memcacheClient) SetKey(k Keyer, v []byte) error { - expiry := int32(time.Now().Add(expiry).Unix()) - data := expiryData{ - Expiry: expiry, - Data: v, - } - val, err := json.Marshal(&data) - if err != nil { - return err - } - if err := c.Set(&memcache.Item{ - Key: k.Key(), - Value: val, - Expiration: expiry, - }); err != nil { - c.logger.Log("err", errors.Wrap(err, "storing in memcache")) - return err - } - return nil -} - -// Stop the memcache client. -func (c *memcacheClient) Stop() { - close(c.quit) - c.wait.Wait() -} - -func (c *memcacheClient) updateLoop(updateInterval time.Duration) { - defer c.wait.Done() - ticker := time.NewTicker(updateInterval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - if err := c.updateMemcacheServers(); err != nil { - c.logger.Log("err", errors.Wrap(err, "error updating memcache servers")) - } - case <-c.quit: - return - } - } -} - -// updateMemcacheServers sets a memcache server list from SRV records. SRV -// priority & weight are ignored. -func (c *memcacheClient) updateMemcacheServers() error { - _, addrs, err := net.LookupSRV(c.service, "tcp", c.hostname) - if err != nil { - return err - } - var servers []string - for _, srv := range addrs { - servers = append(servers, fmt.Sprintf("%s:%d", srv.Target, srv.Port)) - } - // ServerList deterministically maps keys to _index_ of the server list. - // Since DNS returns records in different order each time, we sort to - // guarantee best possible match between nodes. - sort.Strings(servers) - return c.serverList.SetServers(servers...) -} - -// An interface to provide the key under which to store the data -// Use the full path to image for the memcache key because there -// might be duplicates from other registries -type Keyer interface { - Key() string -} - -type manifestKey struct { - username, fullRepositoryPath, reference string -} - -func NewManifestKey(username string, image image.CanonicalRef) (Keyer, error) { - return &manifestKey{username, image.CanonicalName().String(), image.Tag}, nil -} - -func (k *manifestKey) Key() string { - return strings.Join([]string{ - "registryhistoryv2", // Just to version in case we need to change format later. - // Just the username here means we won't invalidate the cache when user - // changes password, but that should be rare. And, it also means we're not - // putting user passwords in plaintext into memcache. - k.username, - k.fullRepositoryPath, - k.reference, - }, "|") -} - -type tagKey struct { - username, fullRepositoryPath string -} - -func NewTagKey(username string, id image.CanonicalName) (Keyer, error) { - return &tagKey{username, id.String()}, nil -} - -func (k *tagKey) Key() string { - return strings.Join([]string{ - "registrytagsv2", // Just to version in case we need to change format later. - // Just the username here means we won't invalidate the cache when user - // changes password, but that should be rare. And, it also means we're not - // putting user passwords in plaintext into memcache. - k.username, - k.fullRepositoryPath, - }, "|") -} diff --git a/registry/integration_test.go b/registry/cache/memcached/integration_test.go similarity index 58% rename from registry/integration_test.go rename to registry/cache/memcached/integration_test.go index fdbdda609..92b9b4ffc 100644 --- a/registry/integration_test.go +++ b/registry/cache/memcached/integration_test.go @@ -1,9 +1,8 @@ // +build integration -package registry +package memcached import ( - "flag" "os" "strings" "sync" @@ -13,20 +12,19 @@ import ( "github.com/go-kit/kit/log" "github.com/weaveworks/flux/image" + "github.com/weaveworks/flux/registry" "github.com/weaveworks/flux/registry/cache" "github.com/weaveworks/flux/registry/middleware" ) -var ( - memcachedIPs = flag.String("memcached-ips", "127.0.0.1:11211", "space-separated host:port values for memcached to connect to") -) +// memcachedIPs flag from memcached_test.go -// This tests a real memcache cache and a request to a real docker -// repository. It is intended to be an end-to-end integration test -// for the warmer since I had a few bugs introduced when -// refactoring. This should cover against these bugs. +// This tests a real memcached cache and a request to a real docker +// repository. It is intended to be an end-to-end integration test for +// the warmer since I had a few bugs introduced when refactoring. This +// should cover against these bugs. func TestWarming_WarmerWriteCacheRead(t *testing.T) { - mc := cache.NewFixedServerMemcacheClient(cache.MemcacheConfig{ + mc := NewFixedServerMemcacheClient(MemcacheConfig{ Timeout: time.Second, UpdateInterval: 1 * time.Minute, Logger: log.With(log.NewLogfmtLogger(os.Stderr), "component", "memcached"), @@ -36,33 +34,15 @@ func TestWarming_WarmerWriteCacheRead(t *testing.T) { logger := log.NewLogfmtLogger(os.Stderr) - remote := NewRemoteClientFactory( - log.With(logger, "component", "client"), - middleware.RateLimiterConfig{200, 10}, - ) - - cache := NewCacheClientFactory( - log.With(logger, "component", "cache"), - mc, - time.Hour, - ) - - r := NewRegistry( - cache, - log.With(logger, "component", "registry"), - 512, - ) - - w := Warmer{ - Logger: log.With(logger, "component", "warmer"), - ClientFactory: remote, - Creds: NoCredentials(), - Expiry: time.Hour, - Reader: mc, - Writer: mc, - Burst: 125, + remote := ®istry.RemoteClientFactory{ + Logger: log.With(logger, "component", "client"), + Limiters: &middleware.RateLimiters{RPS: 200, Burst: 10}, + Trace: true, } + r := &cache.Cache{mc} + + w, _ := cache.NewWarmer(remote, mc, 125) shutdown := make(chan struct{}) shutdownWg := &sync.WaitGroup{} defer func() { @@ -71,9 +51,9 @@ func TestWarming_WarmerWriteCacheRead(t *testing.T) { }() shutdownWg.Add(1) - go w.Loop(shutdown, shutdownWg, func() ImageCreds { - return ImageCreds{ - id.Name: NoCredentials(), + go w.Loop(log.With(logger, "component", "warmer"), shutdown, shutdownWg, func() registry.ImageCreds { + return registry.ImageCreds{ + id.Name: registry.NoCredentials(), } }) diff --git a/registry/cache/memcached/memcached.go b/registry/cache/memcached/memcached.go new file mode 100644 index 000000000..adbbddfce --- /dev/null +++ b/registry/cache/memcached/memcached.go @@ -0,0 +1,177 @@ +package memcached + +import ( + "encoding/binary" + "fmt" + "net" + "sort" + "sync" + "time" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + + "github.com/weaveworks/flux/registry/cache" +) + +const ( + DefaultExpiry = time.Hour +) + +// MemcacheClient is a memcache client that gets its server list from SRV +// records, and periodically updates that ServerList. +type MemcacheClient struct { + client *memcache.Client + serverList *memcache.ServerList + hostname string + service string + ttl time.Duration + logger log.Logger + + quit chan struct{} + wait sync.WaitGroup +} + +// MemcacheConfig defines how a MemcacheClient should be constructed. +type MemcacheConfig struct { + Host string + Service string + Expiry time.Duration + Timeout time.Duration + UpdateInterval time.Duration + Logger log.Logger + MaxIdleConns int +} + +func NewMemcacheClient(config MemcacheConfig) *MemcacheClient { + var servers memcache.ServerList + client := memcache.NewFromSelector(&servers) + client.Timeout = config.Timeout + client.MaxIdleConns = config.MaxIdleConns + + newClient := &MemcacheClient{ + client: client, + serverList: &servers, + hostname: config.Host, + service: config.Service, + ttl: config.Expiry, + logger: config.Logger, + quit: make(chan struct{}), + } + + if newClient.ttl == 0 { + newClient.ttl = DefaultExpiry + } + + err := newClient.updateMemcacheServers() + if err != nil { + config.Logger.Log("err", errors.Wrapf(err, "Error setting memcache servers to '%v'", config.Host)) + } + + newClient.wait.Add(1) + go newClient.updateLoop(config.UpdateInterval) + return newClient +} + +// Does not use DNS, accepts static list of servers. +func NewFixedServerMemcacheClient(config MemcacheConfig, addresses ...string) *MemcacheClient { + var servers memcache.ServerList + servers.SetServers(addresses...) + client := memcache.NewFromSelector(&servers) + client.Timeout = config.Timeout + + newClient := &MemcacheClient{ + client: client, + serverList: &servers, + hostname: config.Host, + service: config.Service, + ttl: config.Expiry, + logger: config.Logger, + quit: make(chan struct{}), + } + + if newClient.ttl == 0 { + newClient.ttl = DefaultExpiry + } + + return newClient +} + +// The memcached client does not report the expiry when you GET a +// value, but we do want to know it, so we can refresh items that are +// soon to expire (and ignore items that are not). For that reason, we +// prepend the expiry to the value when setting, and read it back when +// getting. + +// GetKey gets the value and its expiry time from the cache. +func (c *MemcacheClient) GetKey(k cache.Keyer) ([]byte, time.Time, error) { + cacheItem, err := c.client.Get(k.Key()) + if err != nil { + if err == memcache.ErrCacheMiss { + // Don't log on cache miss + return []byte{}, time.Time{}, cache.ErrNotCached + } else { + c.logger.Log("err", errors.Wrap(err, "Fetching tag from memcache")) + return []byte{}, time.Time{}, err + } + } + exTime := binary.BigEndian.Uint32(cacheItem.Value) + return cacheItem.Value[4:], time.Unix(int64(exTime), 0), nil +} + +// SetKey sets the value at a key. +func (c *MemcacheClient) SetKey(k cache.Keyer, v []byte) error { + exTime := time.Now().Add(c.ttl).Unix() + exBytes := make([]byte, 4, 4) + binary.BigEndian.PutUint32(exBytes, uint32(exTime)) + if err := c.client.Set(&memcache.Item{ + Key: k.Key(), + Value: append(exBytes, v...), + Expiration: int32(exTime), + }); err != nil { + c.logger.Log("err", errors.Wrap(err, "storing in memcache")) + return err + } + return nil +} + +// Stop the memcache client. +func (c *MemcacheClient) Stop() { + close(c.quit) + c.wait.Wait() +} + +func (c *MemcacheClient) updateLoop(updateInterval time.Duration) { + defer c.wait.Done() + ticker := time.NewTicker(updateInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if err := c.updateMemcacheServers(); err != nil { + c.logger.Log("err", errors.Wrap(err, "error updating memcache servers")) + } + case <-c.quit: + return + } + } +} + +// updateMemcacheServers sets a memcache server list from SRV records. SRV +// priority & weight are ignored. +func (c *MemcacheClient) updateMemcacheServers() error { + _, addrs, err := net.LookupSRV(c.service, "tcp", c.hostname) + if err != nil { + return err + } + var servers []string + for _, srv := range addrs { + servers = append(servers, fmt.Sprintf("%s:%d", srv.Target, srv.Port)) + } + // ServerList deterministically maps keys to _index_ of the server list. + // Since DNS returns records in different order each time, we sort to + // guarantee best possible match between nodes. + sort.Strings(servers) + return c.serverList.SetServers(servers...) +} diff --git a/registry/cache/memcached_test.go b/registry/cache/memcached/memcached_test.go similarity index 66% rename from registry/cache/memcached_test.go rename to registry/cache/memcached/memcached_test.go index ad16f3614..9c0b6d408 100644 --- a/registry/cache/memcached_test.go +++ b/registry/cache/memcached/memcached_test.go @@ -1,6 +1,6 @@ // +build integration -package cache +package memcached import ( "flag" @@ -39,8 +39,7 @@ func TestMemcache_ExpiryReadWrite(t *testing.T) { t.Fatal(err) } - // Get the expiry - expiry, err := mc.GetExpiration(key) + cached, expiry, err := mc.GetKey(key) if err != nil { t.Fatal(err) } @@ -50,27 +49,7 @@ func TestMemcache_ExpiryReadWrite(t *testing.T) { if expiry.Before(time.Now()) { t.Fatal("Expiry should be in the future") } -} -func TestMemcache_ReadWrite(t *testing.T) { - // Memcache client - mc := NewFixedServerMemcacheClient(MemcacheConfig{ - Timeout: time.Second, - UpdateInterval: 1 * time.Minute, - Logger: log.With(log.NewLogfmtLogger(os.Stderr), "component", "memcached"), - }, strings.Fields(*memcachedIPs)...) - - // Set some dummy data - err := mc.SetKey(key, val) - if err != nil { - t.Fatal(err) - } - - // Get the data - cached, err := mc.GetKey(key) - if err != nil { - t.Fatal(err) - } if string(cached) != string(val) { t.Fatalf("Should have returned %q, but got %q", string(val), string(cached)) } diff --git a/registry/cache/monitoring.go b/registry/cache/monitoring.go index 9762e0255..38d707ff4 100644 --- a/registry/cache/monitoring.go +++ b/registry/cache/monitoring.go @@ -11,28 +11,28 @@ import ( ) var ( - memcacheRequestDuration = prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + cacheRequestDuration = prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ Namespace: "flux", - Subsystem: "memcache", + Subsystem: "cache", Name: "request_duration_seconds", - Help: "Duration of memcache requests, in seconds.", + Help: "Duration of cache requests, in seconds.", Buckets: stdprometheus.DefBuckets, }, []string{fluxmetrics.LabelMethod, fluxmetrics.LabelSuccess}) ) -type instrumentedMemcacheClient struct { +type instrumentedClient struct { next Client } -func InstrumentMemcacheClient(c Client) Client { - return &instrumentedMemcacheClient{ +func InstrumentClient(c Client) Client { + return &instrumentedClient{ next: c, } } -func (i *instrumentedMemcacheClient) GetKey(k Keyer) (_ []byte, err error) { +func (i *instrumentedClient) GetKey(k Keyer) (_ []byte, ex time.Time, err error) { defer func(begin time.Time) { - memcacheRequestDuration.With( + cacheRequestDuration.With( fluxmetrics.LabelMethod, "GetKey", fluxmetrics.LabelSuccess, fmt.Sprint(err == nil), ).Observe(time.Since(begin).Seconds()) @@ -40,32 +40,12 @@ func (i *instrumentedMemcacheClient) GetKey(k Keyer) (_ []byte, err error) { return i.next.GetKey(k) } -func (i *instrumentedMemcacheClient) GetExpiration(k Keyer) (_ time.Time, err error) { +func (i *instrumentedClient) SetKey(k Keyer, v []byte) (err error) { defer func(begin time.Time) { - memcacheRequestDuration.With( - fluxmetrics.LabelMethod, "GetExpiration", - fluxmetrics.LabelSuccess, fmt.Sprint(err == nil), - ).Observe(time.Since(begin).Seconds()) - }(time.Now()) - return i.next.GetExpiration(k) -} - -func (i *instrumentedMemcacheClient) SetKey(k Keyer, v []byte) (err error) { - defer func(begin time.Time) { - memcacheRequestDuration.With( + cacheRequestDuration.With( fluxmetrics.LabelMethod, "SetKey", fluxmetrics.LabelSuccess, fmt.Sprint(err == nil), ).Observe(time.Since(begin).Seconds()) }(time.Now()) return i.next.SetKey(k, v) } - -func (i *instrumentedMemcacheClient) Stop() { - defer func(begin time.Time) { - memcacheRequestDuration.With( - fluxmetrics.LabelMethod, "Stop", - fluxmetrics.LabelSuccess, "true", - ).Observe(time.Since(begin).Seconds()) - }(time.Now()) - i.next.Stop() -} diff --git a/registry/cache/registry.go b/registry/cache/registry.go new file mode 100644 index 000000000..9d27e1a7a --- /dev/null +++ b/registry/cache/registry.go @@ -0,0 +1,103 @@ +package cache + +import ( + "encoding/json" + "sort" + "time" + + "github.com/pkg/errors" + + fluxerr "github.com/weaveworks/flux/errors" + "github.com/weaveworks/flux/image" +) + +var ( + ErrNotCached = &fluxerr.Error{ + Type: fluxerr.Missing, + Err: errors.New("item not in cache"), + Help: `Image not yet cached + +It takes time to initially cache all the images. Please wait. + +If you have waited for a long time, check the flux logs. Potential +reasons for the error are: no internet, no cache, error with the remote +repository. +`, + } +) + +// Cache is a local cache of image metadata. +type Cache struct { + Reader Reader +} + +// GetRepository returns the list of image manifests in an image +// repository (e.g,. at "quay.io/weaveworks/flux") +func (c *Cache) GetRepository(id image.Name) ([]image.Info, error) { + repoKey := NewRepositoryKey(id.CanonicalName()) + bytes, _, err := c.Reader.GetKey(repoKey) + if err != nil { + return nil, err + } + var repo ImageRepository + if err = json.Unmarshal(bytes, &repo); err != nil { + return nil, err + } + + // We only care about the error if we've never successfully + // updated the result. + if repo.LastUpdate.IsZero() { + if repo.LastError != "" { + return nil, errors.New(repo.LastError) + } + return nil, ErrNotCached + } + + images := make([]image.Info, len(repo.Images)) + var i int + for _, im := range repo.Images { + images[i] = im + i++ + } + sort.Sort(image.ByCreatedDesc(images)) + return images, nil +} + +// GetImage gets the manifest of a specific image ref, from its +// registry. +func (c *Cache) GetImage(id image.Ref) (image.Info, error) { + key := NewManifestKey(id.CanonicalRef()) + + val, _, err := c.Reader.GetKey(key) + if err != nil { + return image.Info{}, err + } + var img image.Info + err = json.Unmarshal(val, &img) + if err != nil { + return image.Info{}, err + } + return img, nil +} + +// ImageRepository holds the last good information on an image +// repository. +// +// Whenever we successfully fetch a full set of image info, +// `LastUpdate` and `Images` shall each be assigned a value, and +// `LastError` will be cleared. +// +// If we cannot for any reason obtain a full set of image info, +// `LastError` shall be assigned a value, and the other fields left +// alone. +// +// It's possible to have all fields populated: this means at some +// point it was successfully fetched, but since then, there's been an +// error. It's then up to the caller to decide what to do with the +// value (show the images, but also indicate there's a problem, for +// example). +type ImageRepository struct { + LastError string + LastUpdate time.Time + Images map[string]image.Info +} diff --git a/registry/cache/warming.go b/registry/cache/warming.go new file mode 100644 index 000000000..6c15bd39b --- /dev/null +++ b/registry/cache/warming.go @@ -0,0 +1,293 @@ +package cache + +import ( + "context" + "encoding/json" + "net" + "strings" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/weaveworks/flux/image" + "github.com/weaveworks/flux/registry" +) + +const refreshWhenExpiryWithin = time.Minute +const askForNewImagesInterval = time.Minute + +// Warmer refreshes the information kept in the cache from remote +// registries. +type Warmer struct { + clientFactory registry.ClientFactory + cache Client + burst int + Priority chan image.Name + Notify func() +} + +// NewWarmer creates cache warmer that (when Loop is invoked) will +// periodically refresh the values kept in the cache. +func NewWarmer(cf registry.ClientFactory, cacheClient Client, burst int) (*Warmer, error) { + if cf == nil || cacheClient == nil || burst <= 0 { + return nil, errors.New("arguments must be non-nil (or > 0 in the case of burst)") + } + return &Warmer{ + clientFactory: cf, + cache: cacheClient, + burst: burst, + }, nil +} + +// .. and this is what we keep in the backlog +type backlogItem struct { + image.Name + registry.Credentials +} + +// Continuously get the images to populate the cache with, and +// populate the cache with them. +func (w *Warmer) Loop(logger log.Logger, stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFunc func() registry.ImageCreds) { + defer wg.Done() + + refresh := time.Tick(askForNewImagesInterval) + imageCreds := imagesToFetchFunc() + backlog := imageCredsToBacklog(imageCreds) + + // We have some fine control over how long to spend on each fetch + // operation, since they are given a `context`. For now though, + // just rattle through them one by one, however long they take. + ctx := context.Background() + + // This loop acts keeps a kind of priority queue, whereby image + // names coming in on the `Priority` channel are looked up first. + // If there are none, images used in the cluster are refreshed; + // but no more often than once every `askForNewImagesInterval`, + // since there is no effective back-pressure on cache refreshes + // and it would spin freely otherwise). + for { + select { + case <-stop: + logger.Log("stopping", "true") + return + case name := <-w.Priority: + logger.Log("priority", name.String()) + // NB the implicit contract here is that the prioritised + // image has to have been running the last time we + // requested the credentials. + if creds, ok := imageCreds[name]; ok { + w.warm(ctx, logger, name, creds) + } else { + logger.Log("priority", name.String(), "err", "no creds available") + } + continue + default: + } + + if len(backlog) > 0 { + im := backlog[0] + backlog = backlog[1:] + w.warm(ctx, logger, im.Name, im.Credentials) + } else { + select { + case <-refresh: + imageCreds = imagesToFetchFunc() + backlog = imageCredsToBacklog(imageCreds) + default: + } + } + } +} + +func imageCredsToBacklog(imageCreds registry.ImageCreds) []backlogItem { + backlog := make([]backlogItem, len(imageCreds)) + var i int + for name, cred := range imageCreds { + backlog[i] = backlogItem{name, cred} + i++ + } + return backlog +} + +func (w *Warmer) warm(ctx context.Context, logger log.Logger, id image.Name, creds registry.Credentials) { + client, err := w.clientFactory.ClientFor(id.CanonicalName(), creds) + if err != nil { + logger.Log("err", err.Error()) + return + } + + // This is what we're going to write back to the cache + var repo ImageRepository + repoKey := NewRepositoryKey(id.CanonicalName()) + bytes, _, err := w.cache.GetKey(repoKey) + if err == nil { + err = json.Unmarshal(bytes, &repo) + } else if err == ErrNotCached { + err = nil + } + + if err != nil { + logger.Log("err", errors.Wrap(err, "fetching previous result from cache")) + return + } + + // Save for comparison later + oldImages := repo.Images + + // Now we have the previous result; everything after will be + // attempting to refresh that value. Whatever happens, at the end + // we'll write something back. + defer func() { + bytes, err := json.Marshal(repo) + if err == nil { + err = w.cache.SetKey(repoKey, bytes) + } + if err != nil { + logger.Log("err", errors.Wrap(err, "writing result to cache")) + } + }() + + tags, err := client.Tags(ctx) + if err != nil { + if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) && !strings.Contains(err.Error(), "net/http: request canceled") { + logger.Log("err", errors.Wrap(err, "requesting tags")) + repo.LastError = err.Error() + } + return + } + + newImages := map[string]image.Info{} + + // Create a list of manifests that need updating + var toUpdate []image.Ref + var missing, expired int + for _, tag := range tags { + // See if we have the manifest already cached + newID := id.ToRef(tag) + key := NewManifestKey(newID.CanonicalRef()) + bytes, expiry, err := w.cache.GetKey(key) + // If err, then we don't have it yet. Update. + switch { + case err != nil: + missing++ + case time.Until(expiry) < refreshWhenExpiryWithin: + expired++ + default: + var image image.Info + if err := json.Unmarshal(bytes, &image); err == nil { + newImages[tag] = image + continue + } + missing++ + } + toUpdate = append(toUpdate, newID) + } + + var successCount int + + if len(toUpdate) > 0 { + logger.Log("fetching", id.String(), "total", len(toUpdate), "expired", expired, "missing", missing) + var successMx sync.Mutex + + // The upper bound for concurrent fetches against a single host is + // w.Burst, so limit the number of fetching goroutines to that. + fetchers := make(chan struct{}, w.burst) + awaitFetchers := &sync.WaitGroup{} + updates: + for _, imID := range toUpdate { + select { + case <-ctx.Done(): + break updates + case fetchers <- struct{}{}: + } + + awaitFetchers.Add(1) + go func(imageID image.Ref) { + defer func() { awaitFetchers.Done(); <-fetchers }() + // Get the image from the remote + img, err := client.Manifest(ctx, imageID.Tag) + if err != nil { + if err, ok := errors.Cause(err).(net.Error); ok && err.Timeout() { + // This was due to a context timeout, don't bother logging + return + } + logger.Log("err", errors.Wrap(err, "requesting manifests")) + return + } + + key := NewManifestKey(img.ID.CanonicalRef()) + // Write back to memcached + val, err := json.Marshal(img) + if err != nil { + logger.Log("err", errors.Wrap(err, "serializing tag to store in cache")) + return + } + err = w.cache.SetKey(key, val) + if err != nil { + logger.Log("err", errors.Wrap(err, "storing manifests in cache")) + return + } + successMx.Lock() + successCount++ + newImages[imageID.Tag] = img + successMx.Unlock() + }(imID) + } + awaitFetchers.Wait() + logger.Log("updated", id.String(), "count", successCount) + } + + // We managed to fetch new metadata for everything we were missing + // (if anything). Ratchet the result forward. + if successCount == len(toUpdate) { + repo = ImageRepository{ + LastUpdate: time.Now(), + Images: newImages, + } + } + + if w.Notify != nil { + cacheTags := StringSet{} + for t := range oldImages { + cacheTags[t] = struct{}{} + } + + // If there's more tags than there used to be, there must be + // at least one new tag. + if len(cacheTags) < len(tags) { + w.Notify() + return + } + // Otherwise, check whether there are any entries in the + // fetched tags that aren't in the cached tags. + tagSet := NewStringSet(tags) + if !tagSet.Subset(cacheTags) { + w.Notify() + } + } +} + +// StringSet is a set of strings. +type StringSet map[string]struct{} + +// NewStringSet returns a StringSet containing exactly the strings +// given as arguments. +func NewStringSet(ss []string) StringSet { + res := StringSet{} + for _, s := range ss { + res[s] = struct{}{} + } + return res +} + +// Subset returns true if `s` is a subset of `t` (including the case +// of having the same members). +func (s StringSet) Subset(t StringSet) bool { + for k := range s { + if _, ok := t[k]; !ok { + return false + } + } + return true +} diff --git a/registry/cache/warming_test.go b/registry/cache/warming_test.go new file mode 100644 index 000000000..29159a55d --- /dev/null +++ b/registry/cache/warming_test.go @@ -0,0 +1,86 @@ +package cache + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/go-kit/kit/log" + + "github.com/weaveworks/flux/image" + "github.com/weaveworks/flux/registry" + "github.com/weaveworks/flux/registry/mock" +) + +type mem struct { + kv map[string][]byte + mx sync.Mutex +} + +func (c *mem) SetKey(k Keyer, v []byte) error { + c.mx.Lock() + defer c.mx.Unlock() + if c.kv == nil { + c.kv = make(map[string][]byte) + } + c.kv[k.Key()] = v + return nil +} + +func (c *mem) GetKey(k Keyer) ([]byte, time.Time, error) { + c.mx.Lock() + defer c.mx.Unlock() + if c.kv == nil { + c.kv = make(map[string][]byte) + } + + if v, ok := c.kv[k.Key()]; ok { + return v, time.Now().Add(time.Hour), nil + } + return nil, time.Time{}, ErrNotCached +} + +// WarmTest effectively checks that the cache.Warmer and +// cache.Registry work together as intended: that is, if you ask the +// warmer to fetch information, the cached gets populated, and the +// Registry implementation will see it. +func TestWarm(t *testing.T) { + ref, _ := image.ParseRef("example.com/path/image:tag") + repo := ref.Name + + logger := log.NewNopLogger() + + client := &mock.Client{ + TagsFn: func() ([]string, error) { + return []string{"tag"}, nil + }, + ManifestFn: func(tag string) (image.Info, error) { + if tag != "tag" { + t.Errorf("remote client was asked for %q instead of %q", tag, "tag") + } + return image.Info{ + ID: ref, + CreatedAt: time.Now(), + }, nil + }, + } + factory := &mock.ClientFactory{Client: client} + c := &mem{} + warmer := &Warmer{clientFactory: factory, cache: c, burst: 10} + warmer.warm(context.TODO(), logger, repo, registry.NoCredentials()) + + registry := &Cache{Reader: c} + repoInfo, err := registry.GetRepository(ref.Name) + if err != nil { + t.Error(err) + } + // Otherwise, we should get what we put in ... + if len(repoInfo) != 1 { + t.Errorf("expected an image.Info item; got %#v", repoInfo) + } else { + if got := repoInfo[0].ID.String(); got != ref.String() { + t.Errorf("expected image %q from registry cache; got %q", ref.String(), got) + } + } +} diff --git a/registry/client.go b/registry/client.go index 1301a8e57..d7a3f8ab5 100644 --- a/registry/client.go +++ b/registry/client.go @@ -3,178 +3,137 @@ package registry import ( "context" "encoding/json" - "fmt" + "errors" "net/http" - "net/url" + "reflect" "time" - "github.com/go-kit/kit/log" - dockerregistry "github.com/heroku/docker-registry-client/registry" - "github.com/pkg/errors" + "github.com/docker/distribution" + "github.com/docker/distribution/manifest/manifestlist" + "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/manifest/schema2" + "github.com/docker/distribution/registry/client" + "github.com/opencontainers/go-digest" "github.com/weaveworks/flux/image" - "github.com/weaveworks/flux/registry/cache" ) -// A client represents an entity that returns manifest and tags -// information. It might be a cache, it might be a real registry. +// Client is a remote registry client for a particular image +// repository (e.g., for quay.io/weaveworks/flux). It is an interface +// so we can wrap it in instrumentation, write fake implementations, +// and so on. type Client interface { - Tags(name image.Name) ([]string, error) - Manifest(name image.Ref) (image.Info, error) - Cancel() + Tags(context.Context) ([]string, error) + Manifest(ctx context.Context, ref string) (image.Info, error) } -// --- +// ClientFactory supplies Client implementations for a given repo, +// with credentials. This is an interface so we can provide fake +// implementations. +type ClientFactory interface { + ClientFor(image.CanonicalName, Credentials) (Client, error) +} -// An implementation of Client that represents a Remote registry. -// E.g. docker hub. type Remote struct { - Registry *herokuManifestAdaptor - CancelFunc context.CancelFunc + transport http.RoundTripper + repo image.CanonicalName } -// Return the tags for this repository. -func (a *Remote) Tags(id image.Name) ([]string, error) { - return a.Registry.Tags(id.Repository()) +// Adapt to docker distribution `reference.Named`. +type named struct { + image.CanonicalName +} + +// Name returns the name of the repository. These values are used to +// build API URLs, and (it turns out) are _not_ expected to include a +// domain (e.g., quay.io). Hence, the implementation here just returns +// the path. +func (n named) Name() string { + return n.Image } -// We need to do some adapting here to convert from the return values -// from dockerregistry to our domain types. -func (a *Remote) Manifest(id image.Ref) (image.Info, error) { - manifestV2, err := a.Registry.ManifestV2(id.Repository(), id.Tag) +// Return the tags for this repository. +func (a *Remote) Tags(ctx context.Context) ([]string, error) { + repository, err := client.NewRepository(named{a.repo}, "https://"+a.repo.Domain, a.transport) if err != nil { - if err, ok := err.(*url.Error); ok { - if err, ok := (err.Err).(*dockerregistry.HttpStatusError); ok { - if err.Response.StatusCode == http.StatusNotFound { - return a.ManifestFromV1(id) - } - } - } - return image.Info{}, err - } - // The above request will happily return a bogus, empty manifest - // if handed something other than a schema2 manifest. - if manifestV2.Config.Digest == "" { - return a.ManifestFromV1(id) + return nil, err } + return repository.Tags(ctx).All(ctx) +} - // schema2 manifests have a reference to a blog that contains the - // image config. We have to fetch that in order to get the created - // datetime. - conf := manifestV2.Config - reader, err := a.Registry.DownloadLayer(id.Repository(), conf.Digest) +// Manifest fetches the metadata for an image reference; currently +// assumed to be in the same repo as that provided to `NewRemote(...)` +func (a *Remote) Manifest(ctx context.Context, ref string) (image.Info, error) { + repository, err := client.NewRepository(named{a.repo}, "https://"+a.repo.Domain, a.transport) if err != nil { return image.Info{}, err } - if reader == nil { - return image.Info{}, fmt.Errorf("nil reader from DownloadLayer") - } - - type config struct { - Created time.Time `json:created` - } - var imageConf config - - err = json.NewDecoder(reader).Decode(&imageConf) + manifests, err := repository.Manifests(ctx) if err != nil { return image.Info{}, err } - return image.Info{ - ID: id, - CreatedAt: imageConf.Created, - }, nil -} - -func (a *Remote) ManifestFromV1(id image.Ref) (image.Info, error) { - history, err := a.Registry.Manifest(id.Repository(), id.Tag) - if err != nil || history == nil { - return image.Info{}, errors.Wrap(err, "getting remote manifest") - } + var manifestDigest digest.Digest + digestOpt := client.ReturnContentDigest(&manifestDigest) + manifest, fetchErr := manifests.Get(ctx, digest.Digest(ref), digestOpt, distribution.WithTagOption{ref}) - // the manifest includes some v1-backwards-compatibility data, - // oddly called "History", which are layer metadata as JSON - // strings; these appear most-recent (i.e., topmost layer) first, - // so happily we can just decode the first entry to get a created - // time. - type v1image struct { - Created time.Time `json:"created"` - } - var topmost v1image - var img image.Info - img.ID = id - if len(history) > 0 { - if err = json.Unmarshal([]byte(history[0].V1Compatibility), &topmost); err == nil { - if !topmost.Created.IsZero() { - img.CreatedAt = topmost.Created - } - } +interpret: + if fetchErr != nil { + return image.Info{}, err } - return img, nil -} - -// Cancel the remote request -func (a *Remote) Cancel() { - a.CancelFunc() -} - -// --- - -// An implementation of Client backed by Memcache -type Cache struct { - creds Credentials - expiry time.Duration - cr cache.Reader - logger log.Logger -} + info := image.Info{ID: a.repo.ToRef(ref), Digest: manifestDigest.String()} -func (*Cache) Cancel() { - return -} - -func NewCache(creds Credentials, cr cache.Reader, expiry time.Duration, logger log.Logger) Client { - return &Cache{ - creds: creds, - expiry: expiry, - cr: cr, - logger: logger, - } -} + // TODO(michael): can we type switch? Not sure how dependable the + // underlying types are. + switch deserialised := manifest.(type) { + case *schema1.SignedManifest: + var man schema1.Manifest = deserialised.Manifest + // for decoding the v1-compatibility entry in schema1 manifests + var v1 struct { + ID string `json:"id"` + Created time.Time `json:"created"` + OS string `json:"os"` + Arch string `json:"architecture"` + } -func (c *Cache) Manifest(id image.Ref) (image.Info, error) { - creds := c.creds.credsFor(id.Registry()) - key, err := cache.NewManifestKey(creds.username, id.CanonicalRef()) - if err != nil { - return image.Info{}, err - } - val, err := c.cr.GetKey(key) - if err != nil { - return image.Info{}, err - } - var img image.Info - err = json.Unmarshal(val, &img) - if err != nil { - c.logger.Log("err", err.Error) - return image.Info{}, err - } - return img, nil -} + if err = json.Unmarshal([]byte(man.History[0].V1Compatibility), &v1); err != nil { + return image.Info{}, err + } + // This is not the ImageID that Docker uses, but assumed to + // identify the image as it's the topmost layer. + info.ImageID = v1.ID + info.CreatedAt = v1.Created + case *schema2.DeserializedManifest: + var man schema2.Manifest = deserialised.Manifest + configBytes, err := repository.Blobs(ctx).Get(ctx, man.Config.Digest) + if err != nil { + return image.Info{}, err + } -func (c *Cache) Tags(id image.Name) ([]string, error) { - creds := c.creds.credsFor(id.Registry()) - key, err := cache.NewTagKey(creds.username, id.CanonicalName()) - if err != nil { - return []string{}, err - } - val, err := c.cr.GetKey(key) - if err != nil { - return []string{}, err - } - var tags []string - err = json.Unmarshal(val, &tags) - if err != nil { - c.logger.Log("err", err.Error) - return []string{}, err + var config struct { + Arch string `json:"architecture"` + Created time.Time `json:"created"` + OS string `json:"os"` + } + if err = json.Unmarshal(configBytes, &config); err != nil { + return image.Info{}, err + } + // This _is_ what Docker uses as its Image ID. + info.ImageID = man.Config.Digest.String() + info.CreatedAt = config.Created + case *manifestlist.DeserializedManifestList: + var list manifestlist.ManifestList = deserialised.ManifestList + // TODO(michael): is it valid to just pick the first one that matches? + for _, m := range list.Manifests { + if m.Platform.OS == "linux" && m.Platform.Architecture == "amd64" { + manifest, fetchErr = manifests.Get(ctx, m.Digest, digestOpt) + goto interpret + } + } + return image.Info{}, errors.New("no suitable manifest (linux amd64) in manifestlist") + default: + t := reflect.TypeOf(manifest) + return image.Info{}, errors.New("unknown manifest type: " + t.String()) } - return tags, nil + return info, nil } diff --git a/registry/client_factory.go b/registry/client_factory.go index 8ef905ea4..942bf09b7 100644 --- a/registry/client_factory.go +++ b/registry/client_factory.go @@ -1,113 +1,102 @@ package registry import ( - "context" - "errors" "net/http" - "net/http/cookiejar" - "time" + "net/url" + "sync" + "github.com/docker/distribution/registry/client/auth" + "github.com/docker/distribution/registry/client/auth/challenge" + "github.com/docker/distribution/registry/client/transport" "github.com/go-kit/kit/log" - dockerregistry "github.com/heroku/docker-registry-client/registry" - "golang.org/x/net/publicsuffix" - "github.com/weaveworks/flux/registry/cache" + "github.com/weaveworks/flux/image" "github.com/weaveworks/flux/registry/middleware" ) -var ( - ErrNoMemcache = errors.New("no memcached") -) - -// ClientFactory creates a new client for the given host. -// Each request might require a new client. E.g. when retrieving docker -// images from docker hub, then a second from quay.io -type ClientFactory interface { - ClientFor(host string, creds Credentials) (client Client, err error) +type RemoteClientFactory struct { + Logger log.Logger + Limiters *middleware.RateLimiters + Trace bool + challengeManager challenge.Manager + mx sync.Mutex } -// --- -// A new ClientFactory for a Remote. -func NewRemoteClientFactory(l log.Logger, rlc middleware.RateLimiterConfig) ClientFactory { - return &remoteClientFactory{ - Logger: l, - rlConf: rlc, - } +type logging struct { + logger log.Logger + transport http.RoundTripper } -type remoteClientFactory struct { - Logger log.Logger - rlConf middleware.RateLimiterConfig +func (t *logging) RoundTrip(req *http.Request) (*http.Response, error) { + res, err := t.transport.RoundTrip(req) + if err == nil { + t.logger.Log("url", req.URL.String(), "status", res.Status) + } else { + t.logger.Log("url", req.URL.String(), "err", err.Error()) + } + return res, err } -func (f *remoteClientFactory) ClientFor(host string, creds Credentials) (Client, error) { - httphost := "https://" + host - - // quay.io wants us to use cookies for authorisation, so we have - // to construct one (the default client has none). This means a - // bit more constructing things to be able to make a registry - // client literal, rather than calling .New() - jar, err := cookiejar.New(&cookiejar.Options{PublicSuffixList: publicsuffix.List}) - if err != nil { - return nil, err +func (f *RemoteClientFactory) ClientFor(repo image.CanonicalName, creds Credentials) (Client, error) { + tx := f.Limiters.RoundTripper(http.DefaultTransport, repo.Domain) + if f.Trace { + tx = &logging{f.Logger, tx} } - auth := creds.credsFor(host) - - // A context we'll use to cancel requests on error - ctx, cancel := context.WithCancel(context.Background()) - // Add a timeout to the request - ctx, cancel = context.WithTimeout(ctx, requestTimeout) - // Use the wrapper to fix headers for quay.io, and remember bearer tokens - var transport http.RoundTripper - { - transport = &middleware.WWWAuthenticateFixer{Transport: http.DefaultTransport} - // Now the auth-handling wrappers that come with the library - transport = dockerregistry.WrapTransport(transport, httphost, auth.username, auth.password) - // Add timeout context - transport = &middleware.ContextRoundTripper{Transport: transport, Ctx: ctx} - // Rate limit - transport = middleware.RateLimitedRoundTripper(transport, f.rlConf, host) + f.mx.Lock() + if f.challengeManager == nil { + f.challengeManager = challenge.NewSimpleManager() } + f.mx.Unlock() + manager := f.challengeManager - herokuRegistry := herokuManifestAdaptor{ - &dockerregistry.Registry{ - URL: httphost, - Client: &http.Client{ - Transport: transport, - Jar: jar, - Timeout: requestTimeout, - }, - Logf: dockerregistry.Quiet, - }, + pingURL := url.URL{ + Scheme: "https", + Host: repo.Domain, + Path: "/v2/", } - client := &Remote{ - Registry: &herokuRegistry, - CancelFunc: cancel, + // Before we know how to authorise, need to establish which + // authorisation challenges the host will send. + if cs, err := manager.GetChallenges(pingURL); err == nil { + if len(cs) == 0 { + req, err := http.NewRequest("GET", pingURL.String(), nil) + if err != nil { + return nil, err + } + res, err := (&http.Client{ + Transport: tx, + }).Do(req) + if err != nil { + return nil, err + } + if err = manager.AddResponse(res); err != nil { + return nil, err + } + } } + + handler := auth.NewTokenHandler(tx, &store{creds}, repo.Image, "pull") + tx = transport.NewTransport(tx, auth.NewAuthorizer(manager, handler)) + + client := &Remote{transport: tx, repo: repo} return NewInstrumentedClient(client), nil } -// --- -// A new ClientFactory implementation for a Cache -func NewCacheClientFactory(l log.Logger, cache cache.Reader, cacheExpiry time.Duration) ClientFactory { - return &cacheClientFactory{ - Logger: l, - cache: cache, - CacheExpiry: cacheExpiry, - } +// credentialStore adapts our Credentials type to be an +// auth.CredentialsStore +type store struct { + creds Credentials } -type cacheClientFactory struct { - Logger log.Logger - cache cache.Reader - CacheExpiry time.Duration +func (s *store) Basic(url *url.URL) (string, string) { + auth := s.creds.credsFor(url.Host) + return auth.username, auth.password } -func (f *cacheClientFactory) ClientFor(host string, creds Credentials) (Client, error) { - if f.cache == nil { - return nil, ErrNoMemcache - } - client := NewCache(creds, f.cache, f.CacheExpiry, f.Logger) - return client, nil +func (s *store) RefreshToken(*url.URL, string) string { + return "" +} + +func (s *store) SetRefreshToken(*url.URL, string, string) { + return } diff --git a/registry/credentials.go b/registry/credentials.go index 240bbe2ab..5aeb433ba 100644 --- a/registry/credentials.go +++ b/registry/credentials.go @@ -55,7 +55,7 @@ func ParseCredentials(b []byte) (Credentials, error) { // Some users were passing in credentials in the form of // http://docker.io and http://docker.io/v1/, etc. - // So strip everything down to it's base host. + // So strip everything down to the host. // Also, the registry might be local and on a different port. // So we need to check for that because url.Parse won't parse the ip:port format very well. u, err := url.Parse(host) diff --git a/registry/doc.go b/registry/doc.go new file mode 100644 index 000000000..65de307fb --- /dev/null +++ b/registry/doc.go @@ -0,0 +1,5 @@ +/* +This package has types for dealing with image registries (e.g., +quay.io, DockerHub, Google Container Registry, ..). +*/ +package registry diff --git a/registry/middleware/quay.go b/registry/middleware/quay.go deleted file mode 100644 index 79dd75eb9..000000000 --- a/registry/middleware/quay.go +++ /dev/null @@ -1,64 +0,0 @@ -package middleware - -import ( - "net/http" - "regexp" - "strings" -) - -// Workaround for quay.io, which fails to quote the scope value in its -// WWW-Authenticate header. Annoying. This also remembers a Bearer -// token, so once you have authenticated, it can keep using it rather -// than authenticating each time. - -type WWWAuthenticateFixer struct { - Transport http.RoundTripper - tokenHeader string - tokenDomain string -} - -func (t *WWWAuthenticateFixer) RoundTrip(req *http.Request) (*http.Response, error) { - t.maybeAddToken(req) - res, err := t.Transport.RoundTrip(req) - if err == nil { - newAuthHeaders := []string{} - for _, h := range res.Header[http.CanonicalHeaderKey("WWW-Authenticate")] { - if strings.HasPrefix(h, "Bearer ") { - h = replaceUnquoted(h) - } - newAuthHeaders = append(newAuthHeaders, h) - } - res.Header[http.CanonicalHeaderKey("WWW-Authenticate")] = newAuthHeaders - } - return res, err -} - -var scopeRE *regexp.Regexp = regexp.MustCompile(`,scope=([^"].*[^"])$`) - -// This is pretty specific. quay.io leaves the `scope` parameter -// unquoted, which trips up parsers (the one in the library we're -// using, for example). So replace an unquoted value with a quoted -// value, for that parameter. -func replaceUnquoted(h string) string { - return scopeRE.ReplaceAllString(h, `,scope="$1"`) -} - -// If we've got a token from a previous roundtrip, try using it -// again. BEWARE: this means this transport should only be used when -// asking (repeatedly) about a single repository, otherwise we may -// leak authorisation. -func (t *WWWAuthenticateFixer) maybeAddToken(req *http.Request) { - authHeaders := req.Header[http.CanonicalHeaderKey("Authorization")] - for _, h := range authHeaders { - if strings.EqualFold(h[:7], "bearer ") { - if t.tokenHeader == "" { - t.tokenHeader = h - t.tokenDomain = req.URL.Host - } - return - } - } - if req.URL.Host == t.tokenDomain && t.tokenHeader != "" { - req.Header.Set("Authorization", t.tokenHeader) - } -} diff --git a/registry/middleware/rate_limiter.go b/registry/middleware/rate_limiter.go index 67a35f4cc..87ef3213a 100644 --- a/registry/middleware/rate_limiter.go +++ b/registry/middleware/rate_limiter.go @@ -9,40 +9,44 @@ import ( "golang.org/x/time/rate" ) -var limiters = make(map[string]*rate.Limiter) -var limitersMutex sync.Mutex - -type RateLimiterConfig struct { - RPS int // Rate per second per host - Burst int // Burst count per host +type RateLimiters struct { + RPS, Burst int + perHost map[string]*rate.Limiter + mu sync.Mutex } -func RateLimitedRoundTripper(rt http.RoundTripper, config RateLimiterConfig, host string) http.RoundTripper { - limitersMutex.Lock() - if _, ok := limiters[host]; !ok { - rl := rate.NewLimiter(rate.Limit(config.RPS), config.Burst) - limiters[host] = rl +// Limit returns a RoundTripper for a particular host. We expect to do +// a number of requests to a particular host at a time. +func (limiters *RateLimiters) RoundTripper(rt http.RoundTripper, host string) http.RoundTripper { + limiters.mu.Lock() + defer limiters.mu.Unlock() + + if limiters.perHost == nil { + limiters.perHost = map[string]*rate.Limiter{} + } + if _, ok := limiters.perHost[host]; !ok { + rl := rate.NewLimiter(rate.Limit(limiters.RPS), limiters.Burst) + limiters.perHost[host] = rl } - limitersMutex.Unlock() return &RoundTripRateLimiter{ - RL: limiters[host], - Transport: rt, + rl: limiters.perHost[host], + tx: rt, } } type RoundTripRateLimiter struct { - RL *rate.Limiter - Transport http.RoundTripper + rl *rate.Limiter + tx http.RoundTripper } -func (rl *RoundTripRateLimiter) RoundTrip(r *http.Request) (*http.Response, error) { +func (t *RoundTripRateLimiter) RoundTrip(r *http.Request) (*http.Response, error) { // Wait errors out if the request cannot be processed within // the deadline. This is preemptive, instead of waiting the // entire duration. - if err := rl.RL.Wait(r.Context()); err != nil { + if err := t.rl.Wait(r.Context()); err != nil { return nil, errors.Wrap(err, "rate limited") } - return rl.Transport.RoundTrip(r) + return t.tx.RoundTrip(r) } type ContextRoundTripper struct { diff --git a/registry/middleware/rate_limiter_test.go b/registry/middleware/rate_limiter_test.go deleted file mode 100644 index 816b44c8e..000000000 --- a/registry/middleware/rate_limiter_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package middleware - -import ( - "context" - "fmt" - "net/http" - "net/http/httptest" - "testing" - "time" -) - -const requestTimeout = 10 * time.Second - -// We shouldn't share roundtrippers in the ratelimiter because the context will be stale -func TestRateLimiter_WithContext(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintln(w, "Hello, client") - })) - defer ts.Close() - - rateLimit := 100 - - // A context we'll use to cancel - ctx, cancel := context.WithCancel(context.Background()) - rt := &ContextRoundTripper{Transport: http.DefaultTransport, Ctx: ctx} - rl := RateLimitedRoundTripper(rt, RateLimiterConfig{ - RPS: rateLimit, - Burst: 1, - }, ts.URL) - - client := &http.Client{ - Transport: rl, - Timeout: requestTimeout, - } - _, err := client.Get(ts.URL) - if err != nil { - t.Fatal(err) - } - cancel() // Perform the cancel. If the RL is sharing contexts, then if we use it again it will fail. - - // Now do that again, it should use a new context - // A context we'll use to cancel requests on error - ctx, cancel = context.WithCancel(context.Background()) - rt = &ContextRoundTripper{Transport: http.DefaultTransport, Ctx: ctx} - rl = RateLimitedRoundTripper(rt, RateLimiterConfig{ - RPS: rateLimit, - Burst: 1, - }, ts.URL) - client = &http.Client{ - Transport: rl, - Timeout: requestTimeout, - } - _, err = client.Get(ts.URL) - if err != nil { - t.Fatal(err) // It will fail here if it is sharing contexts - } - cancel() -} diff --git a/registry/mock.go b/registry/mock.go deleted file mode 100644 index 8969a76ec..000000000 --- a/registry/mock.go +++ /dev/null @@ -1,94 +0,0 @@ -package registry - -import ( - "github.com/pkg/errors" - - "github.com/weaveworks/flux/image" -) - -type mockClientAdapter struct { - imgs []image.Info - err error -} - -type mockRemote struct { - img image.Info - tags []string - err error -} - -type ManifestFunc func(id image.Ref) (image.Info, error) -type TagsFunc func(id image.Name) ([]string, error) -type mockDockerClient struct { - manifest ManifestFunc - tags TagsFunc -} - -func NewMockClient(manifest ManifestFunc, tags TagsFunc) Client { - return &mockDockerClient{ - manifest: manifest, - tags: tags, - } -} - -func (m *mockDockerClient) Manifest(id image.Ref) (image.Info, error) { - return m.manifest(id) -} - -func (m *mockDockerClient) Tags(id image.Name) ([]string, error) { - return m.tags(id) -} - -func (*mockDockerClient) Cancel() { - return -} - -type mockRemoteFactory struct { - c Client - err error -} - -func NewMockClientFactory(c Client, err error) ClientFactory { - return &mockRemoteFactory{ - c: c, - err: err, - } -} - -func (m *mockRemoteFactory) ClientFor(repository string, creds Credentials) (Client, error) { - return m.c, m.err -} - -type mockRegistry struct { - imgs []image.Info - err error -} - -func NewMockRegistry(images []image.Info, err error) Registry { - return &mockRegistry{ - imgs: images, - err: err, - } -} - -func (m *mockRegistry) GetRepository(id image.Name) ([]image.Info, error) { - var imgs []image.Info - for _, i := range m.imgs { - // include only if it's the same repository in the same place - if i.ID.Image == id.Image { - imgs = append(imgs, i) - } - } - return imgs, m.err -} - -func (m *mockRegistry) GetImage(id image.Ref) (image.Info, error) { - if len(m.imgs) > 0 { - for _, i := range m.imgs { - if i.ID.String() == id.String() { - return i, nil - } - } - } - return image.Info{}, errors.New("not found") -} diff --git a/registry/mock/mock.go b/registry/mock/mock.go new file mode 100644 index 000000000..b1c202d63 --- /dev/null +++ b/registry/mock/mock.go @@ -0,0 +1,63 @@ +package mock + +import ( + "context" + + "github.com/pkg/errors" + + "github.com/weaveworks/flux/image" + "github.com/weaveworks/flux/registry" +) + +type Client struct { + ManifestFn func(ref string) (image.Info, error) + TagsFn func() ([]string, error) +} + +func (m *Client) Manifest(ctx context.Context, tag string) (image.Info, error) { + return m.ManifestFn(tag) +} + +func (m *Client) Tags(context.Context) ([]string, error) { + return m.TagsFn() +} + +var _ registry.Client = &Client{} + +type ClientFactory struct { + Client registry.Client + Err error +} + +func (m *ClientFactory) ClientFor(repository image.CanonicalName, creds registry.Credentials) (registry.Client, error) { + return m.Client, m.Err +} + +var _ registry.ClientFactory = &ClientFactory{} + +type Registry struct { + Images []image.Info + Err error +} + +func (m *Registry) GetRepository(id image.Name) ([]image.Info, error) { + var imgs []image.Info + for _, i := range m.Images { + // include only if it's the same repository in the same place + if i.ID.Image == id.Image { + imgs = append(imgs, i) + } + } + return imgs, m.Err +} + +func (m *Registry) GetImage(id image.Ref) (image.Info, error) { + for _, i := range m.Images { + if i.ID.String() == id.String() { + return i, nil + } + } + return image.Info{}, errors.New("not found") +} + +var _ registry.Registry = &Registry{} diff --git a/registry/monitoring.go b/registry/monitoring.go index 84c054040..2d6be8662 100644 --- a/registry/monitoring.go +++ b/registry/monitoring.go @@ -3,6 +3,7 @@ package registry // Monitoring middlewares for registry interfaces import ( + "context" "strconv" "time" @@ -35,13 +36,11 @@ var ( }, []string{LabelRequestKind, fluxmetrics.LabelSuccess}) ) -type InstrumentedRegistry Registry - type instrumentedRegistry struct { next Registry } -func NewInstrumentedRegistry(next Registry) InstrumentedRegistry { +func NewInstrumentedRegistry(next Registry) Registry { return &instrumentedRegistry{ next: next, } @@ -65,8 +64,6 @@ func (m *instrumentedRegistry) GetImage(id image.Ref) (res image.Info, err error return } -type InstrumentedClient Client - type instrumentedClient struct { next Client } @@ -77,9 +74,9 @@ func NewInstrumentedClient(next Client) Client { } } -func (m *instrumentedClient) Manifest(id image.Ref) (res image.Info, err error) { +func (m *instrumentedClient) Manifest(ctx context.Context, ref string) (res image.Info, err error) { start := time.Now() - res, err = m.next.Manifest(id) + res, err = m.next.Manifest(ctx, ref) remoteDuration.With( LabelRequestKind, RequestKindMetadata, fluxmetrics.LabelSuccess, strconv.FormatBool(err == nil), @@ -87,16 +84,12 @@ func (m *instrumentedClient) Manifest(id image.Ref) (res image.Info, err error) return } -func (m *instrumentedClient) Tags(id image.Name) (res []string, err error) { +func (m *instrumentedClient) Tags(ctx context.Context) (res []string, err error) { start := time.Now() - res, err = m.next.Tags(id) + res, err = m.next.Tags(ctx) remoteDuration.With( LabelRequestKind, RequestKindTags, fluxmetrics.LabelSuccess, strconv.FormatBool(err == nil), ).Observe(time.Since(start).Seconds()) return } - -func (m *instrumentedClient) Cancel() { - m.next.Cancel() -} diff --git a/registry/registry.go b/registry/registry.go index d31c783bb..c957f04ed 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -1,157 +1,21 @@ -// Package registry provides domain abstractions over container registries. -// The aim is that the user only ever sees the registry information that -// has been cached. A separate process is responsible for ensuring the -// cache is up to date. The benefit of this is that we can rate limit -// the requests to prevent rate limiting on the remote side without -// affecting the UX. To the user, repository information will appear to -// be returned "quickly" -// -// This means that the cache is now a flux requirement. package registry import ( - "sort" "time" - "github.com/docker/distribution/manifest/schema1" - "github.com/go-kit/kit/log" - dockerregistry "github.com/heroku/docker-registry-client/registry" - "github.com/weaveworks/flux/image" - "github.com/weaveworks/flux/registry/cache" ) const ( requestTimeout = 10 * time.Second ) -// The Registry interface is a domain specific API to access container registries. +// Registry is a store of image metadata. type Registry interface { - GetRepository(id image.Name) ([]image.Info, error) - GetImage(id image.Ref) (image.Info, error) -} - -type registry struct { - factory ClientFactory - logger log.Logger - connections int + GetRepository(image.Name) ([]image.Info, error) + GetImage(image.Ref) (image.Info, error) } -// NewRegistry creates a new registry, to use when fetching repositories. -// Behind the scenes the registry will call ClientFactory.ClientFor(...) -// when requesting an image. This will generate a Client to access the -// backend. -func NewRegistry(c ClientFactory, l log.Logger, connections int) Registry { - return ®istry{ - factory: c, - logger: l, - connections: connections, - } -} - -// GetRepository yields a repository matching the given name, if any exists. -func (reg *registry) GetRepository(id image.Name) ([]image.Info, error) { - client, err := reg.factory.ClientFor(id.Registry(), Credentials{}) - if err != nil { - return nil, err - } - - tags, err := client.Tags(id) - if err != nil { - client.Cancel() - return nil, err - } - - // the hostlessImageName is canonicalised, in the sense that it - // includes "library" as the org, if unqualified -- e.g., - // `library/nats`. We need that to fetch the tags etc. However, we - // want the results to use the *actual* name of the images to be - // as supplied, e.g., `nats`. - return reg.tagsToRepository(client, id, tags) -} - -// Get a single Image from the registry if it exists -func (reg *registry) GetImage(id image.Ref) (image.Info, error) { - client, err := reg.factory.ClientFor(id.Registry(), Credentials{}) - if err != nil { - return image.Info{}, err - } - img, err := client.Manifest(id) - if err != nil { - client.Cancel() - return image.Info{}, err - } - return img, nil -} - -func (reg *registry) tagsToRepository(client Client, id image.Name, tags []string) ([]image.Info, error) { - // one way or another, we'll be finishing all requests - defer client.Cancel() - - type result struct { - image image.Info - err error - } - - toFetch := make(chan string, len(tags)) - fetched := make(chan result, len(tags)) - - for i := 0; i < reg.connections; i++ { - go func() { - for tag := range toFetch { - image, err := client.Manifest(id.ToRef(tag)) - if err != nil { - if err != cache.ErrNotCached { - reg.logger.Log("registry-metadata-err", err) - } - } - fetched <- result{image, err} - } - }() - } - for _, tag := range tags { - toFetch <- tag - } - close(toFetch) - - images := make([]image.Info, cap(fetched)) - for i := 0; i < cap(fetched); i++ { - res := <-fetched - if res.err != nil { - return nil, res.err - } - images[i] = res.image - } - - sort.Sort(image.ByCreatedDesc(images)) - return images, nil -} - -// --- - -// This is an interface that represents the heroku docker registry library -type HerokuRegistryLibrary interface { - Tags(repository string) (tags []string, err error) - Manifest(repository, reference string) ([]schema1.History, error) -} - -// --- - -// Convert between types. dockerregistry returns the *same* type but from a -// vendored library. Because golang doesn't like to apply interfaces to a -// vendored type, we have to provide an adaptor to isolate it. -type herokuManifestAdaptor struct { - *dockerregistry.Registry -} - -func (h herokuManifestAdaptor) Manifest(repository, reference string) ([]schema1.History, error) { - manifest, err := h.Registry.Manifest(repository, reference) - if err != nil || manifest == nil { - return nil, err - } - var result []schema1.History - for _, item := range manifest.History { - result = append(result, schema1.History{item.V1Compatibility}) - } - return result, err -} +// ImageCreds is a record of which images need which credentials, +// which is supplied to us (probably by interrogating the cluster) +type ImageCreds map[image.Name]Credentials diff --git a/registry/registry_test.go b/registry/registry_test.go deleted file mode 100644 index 7573cbd88..000000000 --- a/registry/registry_test.go +++ /dev/null @@ -1,172 +0,0 @@ -package registry - -import ( - "errors" - "testing" - "time" - - "github.com/docker/distribution/manifest/schema1" - "github.com/go-kit/kit/log" - - fluxerr "github.com/weaveworks/flux/errors" - "github.com/weaveworks/flux/image" - "github.com/weaveworks/flux/registry/cache" - "github.com/weaveworks/flux/registry/middleware" -) - -const testTagStr = "latest" -const testImageStr = "alpine:" + testTagStr -const constTime = "2017-01-13T16:22:58.009923189Z" - -var ( - id, _ = image.ParseRef(testImageStr) - man = schema1.SignedManifest{ - Manifest: schema1.Manifest{ - History: []schema1.History{ - { - V1Compatibility: `{"created":"` + constTime + `"}`, - }, - }, - }, - } -) - -var ( - testTags = []string{testTagStr, "anotherTag"} - mClient = NewMockClient( - func(repository image.Ref) (image.Info, error) { - img, _ := image.ParseInfo(testImageStr, time.Time{}) - return img, nil - }, - func(repository image.Name) ([]string, error) { - return testTags, nil - }, - ) -) - -func TestRegistry_GetRepository(t *testing.T) { - fact := NewMockClientFactory(mClient, nil) - reg := NewRegistry(fact, log.NewNopLogger(), 512) - imgs, err := reg.GetRepository(id.Name) - if err != nil { - t.Fatal(err) - } - // Dev note, the tags will look the same because we are returning the same - // Image from the mock remote. But they are distinct images. - if len(testTags) != len(imgs) { - t.Fatal("Expecting %v images, but got %v", len(testTags), len(imgs)) - } -} - -func TestRegistry_GetRepositoryFactoryError(t *testing.T) { - errFact := NewMockClientFactory(mClient, errors.New("")) - reg := NewRegistry(errFact, nil, 512) - _, err := reg.GetRepository(id.Name) - if err == nil { - t.Fatal("Expecting error") - } -} - -func TestRegistry_GetRepositoryManifestError(t *testing.T) { - errClient := NewMockClient( - func(repository image.Ref) (image.Info, error) { - return image.Info{}, errors.New("") - }, - func(repository image.Name) ([]string, error) { - return testTags, nil - }, - ) - errFact := NewMockClientFactory(errClient, nil) - reg := NewRegistry(errFact, log.NewNopLogger(), 512) - _, err := reg.GetRepository(id.Name) - if err == nil { - t.Fatal("Expecting error") - } -} - -// Note: This actually goes off to docker hub to find the Image. -// It will fail if there is not internet connection -func TestRemoteFactory_RawClient(t *testing.T) { - // No credentials required for public Image - fact := NewRemoteClientFactory(log.NewNopLogger(), middleware.RateLimiterConfig{ - RPS: 200, - Burst: 1, - }) - - // Refresh tags first - var tags []string - client, err := fact.ClientFor(id.Registry(), Credentials{}) - if err != nil { - t.Fatal(err) - } - - tags, err = client.Tags(id.Name) - if err != nil { - t.Fatal(err) - } - client.Cancel() - if len(tags) == 0 { - t.Fatal("Should have some tags") - } - - client, err = fact.ClientFor(id.Registry(), Credentials{}) - if err != nil { - t.Fatal(err) - } - id.Tag = tags[0] - newImg, err := client.Manifest(id) - if err != nil { - t.Fatal(err) - } - if newImg.ID.String() == "" { - t.Fatal("Should image ") - } - if newImg.CreatedAt.IsZero() { - t.Fatal("CreatedAt time was 0") - } - client.Cancel() -} - -func TestRemoteFactory_InvalidHost(t *testing.T) { - fact := NewRemoteClientFactory(log.NewNopLogger(), middleware.RateLimiterConfig{}) - invalidId, err := image.ParseRef("invalid.host/library/alpine:latest") - if err != nil { - t.Fatal(err) - } - client, err := fact.ClientFor(invalidId.Registry(), Credentials{}) - if err != nil { - return - } - _, err = client.Manifest(invalidId) - if err == nil { - t.Fatal("Expected error due to invalid host but got none.") - } -} - -func TestRemote_BetterError(t *testing.T) { - errClient := NewMockClient( - func(repository image.Ref) (image.Info, error) { - return image.Info{}, cache.ErrNotCached - }, - func(repository image.Name) ([]string, error) { - return []string{}, cache.ErrNotCached - }, - ) - - fact := NewMockClientFactory(errClient, nil) - reg := NewRegistry(fact, log.NewNopLogger(), 512) - _, err := reg.GetRepository(id.Name) - if err == nil { - t.Fatal("Should have errored") - } - if !fluxerr.IsMissing(err) { - t.Fatalf("Should not be bespoke error, got %q", err.Error()) - } - _, err = reg.GetImage(id) - if err == nil { - t.Fatal("Should have errored") - } - if !fluxerr.IsMissing(err) { - t.Fatalf("Should not be bespoke error, got %q", err.Error()) - } -} diff --git a/registry/warming.go b/registry/warming.go deleted file mode 100644 index 48e884eb4..000000000 --- a/registry/warming.go +++ /dev/null @@ -1,277 +0,0 @@ -// Runs a daemon to continuously warm the registry cache. -package registry - -import ( - "context" - "encoding/json" - "net" - "strings" - "sync" - "time" - - "github.com/go-kit/kit/log" - "github.com/pkg/errors" - "github.com/weaveworks/flux/image" - "github.com/weaveworks/flux/registry/cache" -) - -const refreshWhenExpiryWithin = time.Minute -const askForNewImagesInterval = time.Minute - -type Warmer struct { - Logger log.Logger - ClientFactory ClientFactory - Creds Credentials // FIXME: never supplied! - Expiry time.Duration - Writer cache.Writer - Reader cache.Reader - Burst int - Priority chan image.Name - Notify func() -} - -// This is what we get from the callback handed to us -type ImageCreds map[image.Name]Credentials - -// .. and this is what we keep in the backlog -type backlogItem struct { - image.Name - Credentials -} - -// Continuously get the images to populate the cache with, and -// populate the cache with them. -func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFunc func() ImageCreds) { - defer wg.Done() - - if w.Logger == nil || w.ClientFactory == nil || w.Expiry == 0 || w.Writer == nil || w.Reader == nil { - panic("registry.Warmer fields are nil") - } - - refresh := time.Tick(askForNewImagesInterval) - imageCreds := imagesToFetchFunc() - backlog := imageCredsToBacklog(imageCreds) - - // This loop acts keeps a kind of priority queue, whereby image - // names coming in on the `Priority` channel are looked up first. - // If there are none, images used in the cluster are refreshed; - // but no more often than once every `askForNewImagesInterval`, - // since there is no effective back-pressure on cache refreshes - // and it would spin freely otherwise). - for { - select { - case <-stop: - w.Logger.Log("stopping", "true") - return - case name := <-w.Priority: - w.Logger.Log("priority", name.String()) - // NB the implicit contract here is that the prioritised - // image has to have been running the last time we - // requested the credentials. - if creds, ok := imageCreds[name]; ok { - w.warm(name, creds) - } else { - w.Logger.Log("priority", name.String(), "err", "no creds available") - } - continue - default: - } - - if len(backlog) > 0 { - im := backlog[0] - backlog = backlog[1:] - w.warm(im.Name, im.Credentials) - } else { - select { - case <-refresh: - imageCreds = imagesToFetchFunc() - backlog = imageCredsToBacklog(imageCreds) - default: - } - } - } -} - -func imageCredsToBacklog(imageCreds ImageCreds) []backlogItem { - backlog := make([]backlogItem, len(imageCreds)) - var i int - for name, cred := range imageCreds { - backlog[i] = backlogItem{name, cred} - i++ - } - return backlog -} - -func (w *Warmer) warm(id image.Name, creds Credentials) { - client, err := w.ClientFactory.ClientFor(id.Registry(), creds) - if err != nil { - w.Logger.Log("err", err.Error()) - return - } - defer client.Cancel() - - // FIXME This can only return an empty string, because w.Creds is - // always empty. In other words, keys never include a username - // (need they?) - username := w.Creds.credsFor(id.Registry()).username - - key, err := cache.NewTagKey(username, id.CanonicalName()) - if err != nil { - w.Logger.Log("err", errors.Wrap(err, "creating key for cache")) - return - } - - var cacheTags []string - cacheTagsVal, err := w.Reader.GetKey(key) - if err == nil { - err = json.Unmarshal(cacheTagsVal, &cacheTags) - if err != nil { - w.Logger.Log("err", errors.Wrap(err, "deserializing cached tags")) - return - } - } // else assume we have no cached tags - - tags, err := client.Tags(id) - if err != nil { - if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) && !strings.Contains(err.Error(), "net/http: request canceled") { - w.Logger.Log("err", errors.Wrap(err, "requesting tags")) - } - return - } - - val, err := json.Marshal(tags) - if err != nil { - w.Logger.Log("err", errors.Wrap(err, "serializing tags to store in cache")) - return - } - - err = w.Writer.SetKey(key, val) - if err != nil { - w.Logger.Log("err", errors.Wrap(err, "storing tags in cache")) - return - } - - // Create a list of manifests that need updating - var toUpdate []image.Ref - var expired bool - for _, tag := range tags { - // See if we have the manifest already cached - // We don't want to re-download a manifest again. - newID := id.ToRef(tag) - key, err := cache.NewManifestKey(username, newID.CanonicalRef()) - if err != nil { - w.Logger.Log("err", errors.Wrap(err, "creating key for memcache")) - continue - } - expiry, err := w.Reader.GetExpiration(key) - // If err, then we don't have it yet. Update. - if err == nil { // If no error, we've already got it - // If we're outside of the expiry buffer, skip, no need to update. - if !withinExpiryBuffer(expiry, refreshWhenExpiryWithin) { - continue - } - // If we're within the expiry buffer, we need to update quick! - expired = true - } - toUpdate = append(toUpdate, newID) - } - - if len(toUpdate) == 0 { - return - } - w.Logger.Log("fetching", id.String(), "to-update", len(toUpdate)) - - if expired { - w.Logger.Log("expiring", id.String()) - } - - // The upper bound for concurrent fetches against a single host is - // w.Burst, so limit the number of fetching goroutines to that. - fetchers := make(chan struct{}, w.Burst) - awaitFetchers := &sync.WaitGroup{} - for _, imID := range toUpdate { - awaitFetchers.Add(1) - fetchers <- struct{}{} - go func(imageID image.Ref) { - defer func() { awaitFetchers.Done(); <-fetchers }() - // Get the image from the remote - img, err := client.Manifest(imageID) - if err != nil { - if err, ok := errors.Cause(err).(net.Error); ok && err.Timeout() { - // This was due to a context timeout, don't bother logging - return - } - w.Logger.Log("err", errors.Wrap(err, "requesting manifests")) - return - } - - key, err := cache.NewManifestKey(username, img.ID.CanonicalRef()) - if err != nil { - w.Logger.Log("err", errors.Wrap(err, "creating key for memcache")) - return - } - // Write back to memcache - val, err := json.Marshal(img) - if err != nil { - w.Logger.Log("err", errors.Wrap(err, "serializing tag to store in cache")) - return - } - err = w.Writer.SetKey(key, val) - if err != nil { - w.Logger.Log("err", errors.Wrap(err, "storing manifests in cache")) - return - } - }(imID) - } - awaitFetchers.Wait() - w.Logger.Log("updated", id.String()) - - if w.Notify != nil { - // If there's more tags than there used to be, there must be - // at least one new tag. - if len(cacheTags) < len(tags) { - w.Notify() - return - } - // Otherwise, check whether there are any entries in the - // fetched tags that aren't in the cached tags. - tagSet := NewStringSet(tags) - cacheTagSet := NewStringSet(cacheTags) - if !tagSet.Subset(cacheTagSet) { - w.Notify() - } - } -} - -// StringSet is a set of strings. -type StringSet map[string]struct{} - -// NewStringSet returns a StringSet containing exactly the strings -// given as arguments. -func NewStringSet(ss []string) StringSet { - res := StringSet{} - for _, s := range ss { - res[s] = struct{}{} - } - return res -} - -// Subset returns true if `s` is a subset of `t` (including the case -// of having the same members). -func (s StringSet) Subset(t StringSet) bool { - for k := range s { - if _, ok := t[k]; !ok { - return false - } - } - return true -} - -func withinExpiryBuffer(expiry time.Time, buffer time.Duration) bool { - // if the `time.Now() + buffer > expiry`, - // then we're within the expiry buffer - if time.Now().Add(buffer).After(expiry) { - return true - } - return false -} diff --git a/registry/warming_test.go b/registry/warming_test.go deleted file mode 100644 index 544907098..000000000 --- a/registry/warming_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package registry - -import ( - "context" - "testing" - "time" - - "github.com/pkg/errors" -) - -func TestWarming_ExpiryBuffer(t *testing.T) { - testTime := time.Now() - for _, x := range []struct { - expiresIn, buffer time.Duration - expectedResult bool - }{ - {time.Minute, time.Second, false}, - {time.Second, time.Minute, true}, - } { - if withinExpiryBuffer(testTime.Add(x.expiresIn), x.buffer) != x.expectedResult { - t.Fatalf("Should return %t", x.expectedResult) - } - } -} - -func TestName(t *testing.T) { - err := errors.Wrap(context.DeadlineExceeded, "getting remote manifest") - t.Log(err.Error()) - err = errors.Cause(err) - if err == context.DeadlineExceeded { - t.Log("OK") - } else { - t.Log("Not OK") - } -} diff --git a/release/releaser_test.go b/release/releaser_test.go index 1eeb1bda6..b646aff64 100644 --- a/release/releaser_test.go +++ b/release/releaser_test.go @@ -13,7 +13,7 @@ import ( "github.com/weaveworks/flux/git" "github.com/weaveworks/flux/git/gittest" "github.com/weaveworks/flux/image" - "github.com/weaveworks/flux/registry" + registryMock "github.com/weaveworks/flux/registry/mock" "github.com/weaveworks/flux/update" ) @@ -80,16 +80,18 @@ var ( } newRef, _ = image.ParseRef("quay.io/weaveworks/helloworld:master-a000002") timeNow = time.Now() - mockRegistry = registry.NewMockRegistry([]image.Info{ - image.Info{ - ID: newRef, - CreatedAt: timeNow, - }, - image.Info{ - ID: newLockedID, - CreatedAt: timeNow, + mockRegistry = ®istryMock.Registry{ + Images: []image.Info{ + { + ID: newRef, + CreatedAt: timeNow, + }, + { + ID: newLockedID, + CreatedAt: timeNow, + }, }, - }, nil) + } mockManifests = &kubernetes.Manifests{} ) @@ -316,16 +318,18 @@ func Test_ImageStatus(t *testing.T) { }, } - upToDateRegistry := registry.NewMockRegistry([]image.Info{ - image.Info{ - ID: oldRef, - CreatedAt: timeNow, - }, - image.Info{ - ID: sidecarRef, - CreatedAt: timeNow, + upToDateRegistry := ®istryMock.Registry{ + Images: []image.Info{ + { + ID: oldRef, + CreatedAt: timeNow, + }, + { + ID: sidecarRef, + CreatedAt: timeNow, + }, }, - }, nil) + } testSvcSpec, _ := update.ParseResourceSpec(testSvc.ID.String()) for _, tst := range []struct {