diff --git a/cmd/fluxd/main.go b/cmd/fluxd/main.go index 9a79d18552..df543d2b95 100644 --- a/cmd/fluxd/main.go +++ b/cmd/fluxd/main.go @@ -86,7 +86,7 @@ func main() { gitPollInterval = fs.Duration("git-poll-interval", 5*time.Minute, "period at which to poll git repo for new commits") // registry - memcachedHostname = fs.String("memcached-hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.") + memcachedHostname = fs.String("memcached-hostname", "memcached", "Hostname for memcached service.") memcachedTimeout = fs.Duration("memcached-timeout", time.Second, "Maximum time to wait before giving up on memcached requests.") memcachedService = fs.String("memcached-service", "memcached", "SRV service used to discover memcache servers.") registryCacheExpiry = fs.Duration("registry-cache-expiry", 1*time.Hour, "Duration to keep cached image info. Must be < 1 month.") @@ -235,19 +235,17 @@ func main() { { // Cache client, for use by registry and cache warmer var cacheClient cache.Client - if *memcachedHostname != "" { - memcacheClient := registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{ - Host: *memcachedHostname, - Service: *memcachedService, - Expiry: *registryCacheExpiry, - Timeout: *memcachedTimeout, - UpdateInterval: 1 * time.Minute, - Logger: log.With(logger, "component", "memcached"), - MaxIdleConns: *registryBurst, - }) - defer memcacheClient.Stop() - cacheClient = cache.InstrumentClient(memcacheClient) - } + memcacheClient := registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{ + Host: *memcachedHostname, + Service: *memcachedService, + Expiry: *registryCacheExpiry, + Timeout: *memcachedTimeout, + UpdateInterval: 1 * time.Minute, + Logger: log.With(logger, "component", "memcached"), + MaxIdleConns: *registryBurst, + }) + defer memcacheClient.Stop() + cacheClient = cache.InstrumentClient(memcacheClient) cacheRegistry = &cache.Cache{ Reader: cacheClient, @@ -266,12 +264,11 @@ func main() { } // Warmer - warmerLogger := log.With(logger, "component", "warmer") - cacheWarmer = &cache.Warmer{ - Logger: warmerLogger, - ClientFactory: remoteFactory, - Cache: cacheClient, - Burst: *registryBurst, + var err error + cacheWarmer, err = cache.NewWarmer(remoteFactory, cacheClient, *registryBurst) + if err != nil { + logger.Log("err", err) + os.Exit(1) } } @@ -446,7 +443,7 @@ func main() { cacheWarmer.Notify = daemon.AskForImagePoll cacheWarmer.Priority = daemon.ImageRefresh shutdownWg.Add(1) - go cacheWarmer.Loop(shutdown, shutdownWg, image_creds) + go cacheWarmer.Loop(log.With(logger, "component", "warmer"), shutdown, shutdownWg, image_creds) // Update daemonRef so that upstream and handlers point to fully working daemon daemonRef.UpdatePlatform(daemon) diff --git a/deploy/flux-deployment.yaml b/deploy/flux-deployment.yaml index de1625f803..b79aad8885 100644 --- a/deploy/flux-deployment.yaml +++ b/deploy/flux-deployment.yaml @@ -32,13 +32,13 @@ spec: - name: git-key mountPath: /etc/fluxd/ssh args: - # if you deployed memcached, you can supply these arguments to - # tell fluxd to use it. You may need to change the namespace - # (`default`) if you run fluxd in another namespace. - - --memcached-hostname=memcached.default.svc.cluster.local - - --memcached-timeout=100ms - - --memcached-service=memcached - - --registry-cache-expiry=20m + + # if you deployed memcached in a different namespace to flux, + # or with a different service name, you can supply these + # following two arguments to tell fluxd how to connect to it. + # - --memcached-hostname=memcached.default.svc.cluster.local + # - --memcached-service=memcached + # replace (at least) the following URL - --git-url=git@github.com:weaveworks/flux-example - --git-branch=master @@ -46,6 +46,3 @@ spec: # (e.g., Weave Cloud). The token is particular to the service. # - --connect=wss://cloud.weave.works/api/flux # - --token=abc123abc123abc123abc123 - # override -b and -t arguments to ssh-keygen - # - --ssh-keygen-bits=2048 - - --ssh-keygen-type=ed25519 diff --git a/registry/cache/memcached/integration_test.go b/registry/cache/memcached/integration_test.go index ea209d2d85..92b9b4ffc4 100644 --- a/registry/cache/memcached/integration_test.go +++ b/registry/cache/memcached/integration_test.go @@ -42,13 +42,7 @@ func TestWarming_WarmerWriteCacheRead(t *testing.T) { r := &cache.Cache{mc} - w := &cache.Warmer{ - Logger: log.With(logger, "component", "warmer"), - ClientFactory: remote, - Cache: mc, - Burst: 125, - } - + w, _ := cache.NewWarmer(remote, mc, 125) shutdown := make(chan struct{}) shutdownWg := &sync.WaitGroup{} defer func() { @@ -57,7 +51,7 @@ func TestWarming_WarmerWriteCacheRead(t *testing.T) { }() shutdownWg.Add(1) - go w.Loop(shutdown, shutdownWg, func() registry.ImageCreds { + go w.Loop(log.With(logger, "component", "warmer"), shutdown, shutdownWg, func() registry.ImageCreds { return registry.ImageCreds{ id.Name: registry.NoCredentials(), } diff --git a/registry/cache/warming.go b/registry/cache/warming.go index f7923bb7ca..6c15bd39b3 100644 --- a/registry/cache/warming.go +++ b/registry/cache/warming.go @@ -20,14 +20,26 @@ const askForNewImagesInterval = time.Minute // Warmer refreshes the information kept in the cache from remote // registries. type Warmer struct { - Logger log.Logger - ClientFactory registry.ClientFactory - Cache Client - Burst int + clientFactory registry.ClientFactory + cache Client + burst int Priority chan image.Name Notify func() } +// NewWarmer creates cache warmer that (when Loop is invoked) will +// periodically refresh the values kept in the cache. +func NewWarmer(cf registry.ClientFactory, cacheClient Client, burst int) (*Warmer, error) { + if cf == nil || cacheClient == nil || burst <= 0 { + return nil, errors.New("arguments must be non-nil (or > 0 in the case of burst)") + } + return &Warmer{ + clientFactory: cf, + cache: cacheClient, + burst: burst, + }, nil +} + // .. and this is what we keep in the backlog type backlogItem struct { image.Name @@ -36,13 +48,9 @@ type backlogItem struct { // Continuously get the images to populate the cache with, and // populate the cache with them. -func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFunc func() registry.ImageCreds) { +func (w *Warmer) Loop(logger log.Logger, stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFunc func() registry.ImageCreds) { defer wg.Done() - if w.Logger == nil || w.ClientFactory == nil || w.Cache == nil { - panic("registry.Warmer fields are nil") - } - refresh := time.Tick(askForNewImagesInterval) imageCreds := imagesToFetchFunc() backlog := imageCredsToBacklog(imageCreds) @@ -61,17 +69,17 @@ func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFun for { select { case <-stop: - w.Logger.Log("stopping", "true") + logger.Log("stopping", "true") return case name := <-w.Priority: - w.Logger.Log("priority", name.String()) + logger.Log("priority", name.String()) // NB the implicit contract here is that the prioritised // image has to have been running the last time we // requested the credentials. if creds, ok := imageCreds[name]; ok { - w.warm(ctx, name, creds) + w.warm(ctx, logger, name, creds) } else { - w.Logger.Log("priority", name.String(), "err", "no creds available") + logger.Log("priority", name.String(), "err", "no creds available") } continue default: @@ -80,7 +88,7 @@ func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFun if len(backlog) > 0 { im := backlog[0] backlog = backlog[1:] - w.warm(ctx, im.Name, im.Credentials) + w.warm(ctx, logger, im.Name, im.Credentials) } else { select { case <-refresh: @@ -102,17 +110,17 @@ func imageCredsToBacklog(imageCreds registry.ImageCreds) []backlogItem { return backlog } -func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credentials) { - client, err := w.ClientFactory.ClientFor(id.CanonicalName(), creds) +func (w *Warmer) warm(ctx context.Context, logger log.Logger, id image.Name, creds registry.Credentials) { + client, err := w.clientFactory.ClientFor(id.CanonicalName(), creds) if err != nil { - w.Logger.Log("err", err.Error()) + logger.Log("err", err.Error()) return } // This is what we're going to write back to the cache var repo ImageRepository repoKey := NewRepositoryKey(id.CanonicalName()) - bytes, _, err := w.Cache.GetKey(repoKey) + bytes, _, err := w.cache.GetKey(repoKey) if err == nil { err = json.Unmarshal(bytes, &repo) } else if err == ErrNotCached { @@ -120,7 +128,7 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent } if err != nil { - w.Logger.Log("err", errors.Wrap(err, "fetching previous result from cache")) + logger.Log("err", errors.Wrap(err, "fetching previous result from cache")) return } @@ -133,17 +141,17 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent defer func() { bytes, err := json.Marshal(repo) if err == nil { - err = w.Cache.SetKey(repoKey, bytes) + err = w.cache.SetKey(repoKey, bytes) } if err != nil { - w.Logger.Log("err", errors.Wrap(err, "writing result to cache")) + logger.Log("err", errors.Wrap(err, "writing result to cache")) } }() tags, err := client.Tags(ctx) if err != nil { if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) && !strings.Contains(err.Error(), "net/http: request canceled") { - w.Logger.Log("err", errors.Wrap(err, "requesting tags")) + logger.Log("err", errors.Wrap(err, "requesting tags")) repo.LastError = err.Error() } return @@ -158,7 +166,7 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent // See if we have the manifest already cached newID := id.ToRef(tag) key := NewManifestKey(newID.CanonicalRef()) - bytes, expiry, err := w.Cache.GetKey(key) + bytes, expiry, err := w.cache.GetKey(key) // If err, then we don't have it yet. Update. switch { case err != nil: @@ -179,12 +187,12 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent var successCount int if len(toUpdate) > 0 { - w.Logger.Log("fetching", id.String(), "total", len(toUpdate), "expired", expired, "missing", missing) + logger.Log("fetching", id.String(), "total", len(toUpdate), "expired", expired, "missing", missing) var successMx sync.Mutex // The upper bound for concurrent fetches against a single host is // w.Burst, so limit the number of fetching goroutines to that. - fetchers := make(chan struct{}, w.Burst) + fetchers := make(chan struct{}, w.burst) awaitFetchers := &sync.WaitGroup{} updates: for _, imID := range toUpdate { @@ -204,7 +212,7 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent // This was due to a context timeout, don't bother logging return } - w.Logger.Log("err", errors.Wrap(err, "requesting manifests")) + logger.Log("err", errors.Wrap(err, "requesting manifests")) return } @@ -212,12 +220,12 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent // Write back to memcached val, err := json.Marshal(img) if err != nil { - w.Logger.Log("err", errors.Wrap(err, "serializing tag to store in cache")) + logger.Log("err", errors.Wrap(err, "serializing tag to store in cache")) return } - err = w.Cache.SetKey(key, val) + err = w.cache.SetKey(key, val) if err != nil { - w.Logger.Log("err", errors.Wrap(err, "storing manifests in cache")) + logger.Log("err", errors.Wrap(err, "storing manifests in cache")) return } successMx.Lock() @@ -227,7 +235,7 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent }(imID) } awaitFetchers.Wait() - w.Logger.Log("updated", id.String(), "count", successCount) + logger.Log("updated", id.String(), "count", successCount) } // We managed to fetch new metadata for everything we were missing diff --git a/registry/cache/warming_test.go b/registry/cache/warming_test.go index 829e5896ef..29159a55d7 100644 --- a/registry/cache/warming_test.go +++ b/registry/cache/warming_test.go @@ -49,6 +49,8 @@ func TestWarm(t *testing.T) { ref, _ := image.ParseRef("example.com/path/image:tag") repo := ref.Name + logger := log.NewNopLogger() + client := &mock.Client{ TagsFn: func() ([]string, error) { return []string{"tag"}, nil @@ -65,8 +67,8 @@ func TestWarm(t *testing.T) { } factory := &mock.ClientFactory{Client: client} c := &mem{} - warmer := &Warmer{Logger: log.NewNopLogger(), ClientFactory: factory, Cache: c, Burst: 10} - warmer.warm(context.TODO(), repo, registry.NoCredentials()) + warmer := &Warmer{clientFactory: factory, cache: c, burst: 10} + warmer.warm(context.TODO(), logger, repo, registry.NoCredentials()) registry := &Cache{Reader: c} repoInfo, err := registry.GetRepository(ref.Name)