Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Image metadata lookup and automation for push hooks #835

Merged
merged 4 commits into from
Nov 15, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ func main() {
shutdownWg.Add(1)
go daemon.GitPollLoop(shutdown, shutdownWg, log.With(logger, "component", "sync-loop"))

cacheWarmer.Notify = daemon.AskForImagePoll
shutdownWg.Add(1)
go cacheWarmer.Loop(shutdown, shutdownWg, image_creds)

Expand Down
8 changes: 4 additions & 4 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,11 @@ func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) DaemonJo
// On the chance pushing failed because it was not
// possible to fast-forward, ask for a sync so the
// next attempt is more likely to succeed.
d.askForSync()
d.AskForSync()
return nil, err
}
if anythingAutomated {
d.askForImagePoll()
d.AskForImagePoll()
}

var err error
Expand Down Expand Up @@ -303,7 +303,7 @@ func (d *Daemon) release(spec update.Spec, c release.Changes) DaemonJobFunc {
// On the chance pushing failed because it was not
// possible to fast-forward, ask for a sync so the
// next attempt is more likely to succeed.
d.askForSync()
d.AskForSync()
return nil, err
}
revision, err = working.HeadRevision(ctx)
Expand All @@ -324,7 +324,7 @@ func (d *Daemon) release(spec update.Spec, c release.Changes) DaemonJobFunc {
// may be comms difficulties or other sources of problems; here, we
// always succeed because it's just bookkeeping.
func (d *Daemon) SyncNotify(ctx context.Context) error {
d.askForSync()
d.AskForSync()
return nil
}

Expand Down
12 changes: 6 additions & 6 deletions daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (d *Daemon) GitPollLoop(stop chan struct{}, wg *sync.WaitGroup, logger log.
imagePollTimer := time.NewTimer(d.RegistryPollInterval)

// Ask for a sync, and to poll images, straight away
d.askForSync()
d.askForImagePoll()
d.AskForSync()
d.AskForImagePoll()
for {
select {
case <-stop:
Expand All @@ -78,13 +78,13 @@ func (d *Daemon) GitPollLoop(stop chan struct{}, wg *sync.WaitGroup, logger log.
imagePollTimer.Stop()
imagePollTimer = time.NewTimer(d.RegistryPollInterval)
case <-imagePollTimer.C:
d.askForImagePoll()
d.AskForImagePoll()
case <-d.syncSoon:
pullThen(d.doSync)
case <-gitPollTimer.C:
// Time to poll for new commits (unless we're already
// about to do that)
d.askForSync()
d.AskForSync()
case job := <-d.Jobs.Ready():
queueLength.Set(float64(d.Jobs.Len()))
jobLogger := log.With(logger, "jobID", job.ID)
Expand All @@ -108,7 +108,7 @@ func (d *Daemon) GitPollLoop(stop chan struct{}, wg *sync.WaitGroup, logger log.
}

// Ask for a sync, or if there's one waiting, let that happen.
func (d *LoopVars) askForSync() {
func (d *LoopVars) AskForSync() {
d.ensureInit()
select {
case d.syncSoon <- struct{}{}:
Expand All @@ -117,7 +117,7 @@ func (d *LoopVars) askForSync() {
}

// Ask for an image poll, or if there's one waiting, let that happen.
func (d *LoopVars) askForImagePoll() {
func (d *LoopVars) AskForImagePoll() {
d.ensureInit()
select {
case d.pollImagesSoon <- struct{}{}:
Expand Down
119 changes: 103 additions & 16 deletions registry/warming.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"net"
"sort"
"strings"
"sync"
"time"
Expand All @@ -21,15 +22,24 @@ const askForNewImagesInterval = time.Minute
type Warmer struct {
Logger log.Logger
ClientFactory ClientFactory
Creds Credentials
Creds Credentials // FIXME: never supplied!
Expiry time.Duration
Writer cache.Writer
Reader cache.Reader
Burst int
Priority chan image.Name
Notify func()
}

// This is what we get from the callback handed to us
type ImageCreds map[image.Name]Credentials

// .. and this is what we keep in the backlog
type backlogItem struct {
image.Name
Credentials
}

// Continuously get the images to populate the cache with, and
// populate the cache with them.
func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFunc func() ImageCreds) {
Expand All @@ -39,24 +49,60 @@ func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFun
panic("registry.Warmer fields are nil")
}

for k, v := range imagesToFetchFunc() {
w.warm(k, v)
}
refresh := time.Tick(askForNewImagesInterval)
imageCreds := imagesToFetchFunc()
backlog := imageCredsToBacklog(imageCreds)

newImages := time.Tick(askForNewImagesInterval)
// This loop acts keeps a kind of priority queue, whereby image
// names coming in on the `Priority` channel are looked up first.
// If there are none, images used in the cluster are refreshed;
// but no more often than once every `askForNewImagesInterval`,
// since there is no effective back-pressure on cache refreshes
// and it would spin freely otherwise).
for {
select {
case <-stop:
w.Logger.Log("stopping", "true")
return
case <-newImages:
for k, v := range imagesToFetchFunc() {
w.warm(k, v)
case name := <-w.Priority:
w.Logger.Log("priority", name.String())
// NB the implicit contract here is that the prioritised
// image has to have been running the last time we
// requested the credentials.
if creds, ok := imageCreds[name]; ok {
w.warm(name, creds)
} else {
w.Logger.Log("priority", name.String(), "err", "no creds available")
}
continue
default:
}

if len(backlog) > 0 {
im := backlog[0]
backlog = backlog[1:]
w.warm(im.Name, im.Credentials)
} else {
select {
case <-refresh:
imageCreds = imagesToFetchFunc()
backlog = imageCredsToBacklog(imageCreds)
default:
}
}
}
}

func imageCredsToBacklog(imageCreds ImageCreds) []backlogItem {
backlog := make([]backlogItem, len(imageCreds))
var i int
for name, cred := range imageCreds {
backlog[i] = backlogItem{name, cred}
i++
}
return backlog
}

func (w *Warmer) warm(id image.Name, creds Credentials) {
client, err := w.ClientFactory.ClientFor(id.Registry(), creds)
if err != nil {
Expand All @@ -65,10 +111,27 @@ func (w *Warmer) warm(id image.Name, creds Credentials) {
}
defer client.Cancel()

// FIXME This can only return an empty string, because w.Creds is
// always empty. In other words, keys never include a username
// (need they?)
username := w.Creds.credsFor(id.Registry()).username

// Refresh tags first
// Only, for example, "library/alpine" because we have the host information in the client above.
key, err := cache.NewTagKey(username, id.CanonicalName())
if err != nil {
w.Logger.Log("err", errors.Wrap(err, "creating key for cache"))
return
}

var cacheTags []string
cacheTagsVal, err := w.Reader.GetKey(key)
if err == nil {
err = json.Unmarshal(cacheTagsVal, &cacheTags)
if err != nil {
w.Logger.Log("err", errors.Wrap(err, "deserializing cached tags"))
return
}
} // else assume we have no cached tags

tags, err := client.Tags(id)
if err != nil {
if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) && !strings.Contains(err.Error(), "net/http: request canceled") {
Expand All @@ -83,12 +146,6 @@ func (w *Warmer) warm(id image.Name, creds Credentials) {
return
}

key, err := cache.NewTagKey(username, id.CanonicalName())
if err != nil {
w.Logger.Log("err", errors.Wrap(err, "creating key for cache"))
return
}

err = w.Writer.SetKey(key, val)
if err != nil {
w.Logger.Log("err", errors.Wrap(err, "storing tags in cache"))
Expand Down Expand Up @@ -169,6 +226,36 @@ func (w *Warmer) warm(id image.Name, creds Credentials) {
}
awaitFetchers.Wait()
w.Logger.Log("updated", id.String())

if w.Notify != nil {
// If there's more tags than there used to be, there must be
// at least one new tag.
if len(cacheTags) < len(tags) {
w.Notify()
return
}
// Otherwise, check whether there are any entries in the
// fetched tags that aren't in the cached tags, ignoring any
// in the cached tags that aren't in fetched tags.
sort.Strings(cacheTags)

This comment was marked as abuse.

This comment was marked as abuse.

sort.Strings(tags)
var i, j int
for i < len(tags) && j < len(cacheTags) {
switch strings.Compare(tags[i], cacheTags[j]) {
case 0:
i++
j++
case -1:
w.Notify()
return
case 1:
j++
}
}
if i < len(tags)-1 {
w.Notify()
}
}
}

func withinExpiryBuffer(expiry time.Time, buffer time.Duration) bool {
Expand Down