Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Image metadata lookup and automation for push hooks #835

Merged
merged 4 commits into from
Nov 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ func main() {
shutdownWg.Add(1)
go daemon.GitPollLoop(shutdown, shutdownWg, log.With(logger, "component", "sync-loop"))

cacheWarmer.Notify = daemon.AskForImagePoll
shutdownWg.Add(1)
go cacheWarmer.Loop(shutdown, shutdownWg, image_creds)

Expand Down
8 changes: 4 additions & 4 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,11 @@ func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) DaemonJo
// On the chance pushing failed because it was not
// possible to fast-forward, ask for a sync so the
// next attempt is more likely to succeed.
d.askForSync()
d.AskForSync()
return nil, err
}
if anythingAutomated {
d.askForImagePoll()
d.AskForImagePoll()
}

var err error
Expand Down Expand Up @@ -303,7 +303,7 @@ func (d *Daemon) release(spec update.Spec, c release.Changes) DaemonJobFunc {
// On the chance pushing failed because it was not
// possible to fast-forward, ask for a sync so the
// next attempt is more likely to succeed.
d.askForSync()
d.AskForSync()
return nil, err
}
revision, err = working.HeadRevision(ctx)
Expand All @@ -324,7 +324,7 @@ func (d *Daemon) release(spec update.Spec, c release.Changes) DaemonJobFunc {
// may be comms difficulties or other sources of problems; here, we
// always succeed because it's just bookkeeping.
func (d *Daemon) SyncNotify(ctx context.Context) error {
d.askForSync()
d.AskForSync()
return nil
}

Expand Down
12 changes: 6 additions & 6 deletions daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (d *Daemon) GitPollLoop(stop chan struct{}, wg *sync.WaitGroup, logger log.
imagePollTimer := time.NewTimer(d.RegistryPollInterval)

// Ask for a sync, and to poll images, straight away
d.askForSync()
d.askForImagePoll()
d.AskForSync()
d.AskForImagePoll()
for {
select {
case <-stop:
Expand All @@ -78,13 +78,13 @@ func (d *Daemon) GitPollLoop(stop chan struct{}, wg *sync.WaitGroup, logger log.
imagePollTimer.Stop()
imagePollTimer = time.NewTimer(d.RegistryPollInterval)
case <-imagePollTimer.C:
d.askForImagePoll()
d.AskForImagePoll()
case <-d.syncSoon:
pullThen(d.doSync)
case <-gitPollTimer.C:
// Time to poll for new commits (unless we're already
// about to do that)
d.askForSync()
d.AskForSync()
case job := <-d.Jobs.Ready():
queueLength.Set(float64(d.Jobs.Len()))
jobLogger := log.With(logger, "jobID", job.ID)
Expand All @@ -108,7 +108,7 @@ func (d *Daemon) GitPollLoop(stop chan struct{}, wg *sync.WaitGroup, logger log.
}

// Ask for a sync, or if there's one waiting, let that happen.
func (d *LoopVars) askForSync() {
func (d *LoopVars) AskForSync() {
d.ensureInit()
select {
case d.syncSoon <- struct{}{}:
Expand All @@ -117,7 +117,7 @@ func (d *LoopVars) askForSync() {
}

// Ask for an image poll, or if there's one waiting, let that happen.
func (d *LoopVars) askForImagePoll() {
func (d *LoopVars) AskForImagePoll() {
d.ensureInit()
select {
case d.pollImagesSoon <- struct{}{}:
Expand Down
128 changes: 112 additions & 16 deletions registry/warming.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,24 @@ const askForNewImagesInterval = time.Minute
type Warmer struct {
Logger log.Logger
ClientFactory ClientFactory
Creds Credentials
Creds Credentials // FIXME: never supplied!
Expiry time.Duration
Writer cache.Writer
Reader cache.Reader
Burst int
Priority chan image.Name
Notify func()
}

// This is what we get from the callback handed to us
type ImageCreds map[image.Name]Credentials

// .. and this is what we keep in the backlog
type backlogItem struct {
image.Name
Credentials
}

// Continuously get the images to populate the cache with, and
// populate the cache with them.
func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFunc func() ImageCreds) {
Expand All @@ -39,24 +48,60 @@ func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFun
panic("registry.Warmer fields are nil")
}

for k, v := range imagesToFetchFunc() {
w.warm(k, v)
}
refresh := time.Tick(askForNewImagesInterval)
imageCreds := imagesToFetchFunc()
backlog := imageCredsToBacklog(imageCreds)

newImages := time.Tick(askForNewImagesInterval)
// This loop acts keeps a kind of priority queue, whereby image
// names coming in on the `Priority` channel are looked up first.
// If there are none, images used in the cluster are refreshed;
// but no more often than once every `askForNewImagesInterval`,
// since there is no effective back-pressure on cache refreshes
// and it would spin freely otherwise).
for {
select {
case <-stop:
w.Logger.Log("stopping", "true")
return
case <-newImages:
for k, v := range imagesToFetchFunc() {
w.warm(k, v)
case name := <-w.Priority:
w.Logger.Log("priority", name.String())
// NB the implicit contract here is that the prioritised
// image has to have been running the last time we
// requested the credentials.
if creds, ok := imageCreds[name]; ok {
w.warm(name, creds)
} else {
w.Logger.Log("priority", name.String(), "err", "no creds available")
}
continue
default:
}

if len(backlog) > 0 {
im := backlog[0]
backlog = backlog[1:]
w.warm(im.Name, im.Credentials)
} else {
select {
case <-refresh:
imageCreds = imagesToFetchFunc()
backlog = imageCredsToBacklog(imageCreds)
default:
}
}
}
}

func imageCredsToBacklog(imageCreds ImageCreds) []backlogItem {
backlog := make([]backlogItem, len(imageCreds))
var i int
for name, cred := range imageCreds {
backlog[i] = backlogItem{name, cred}
i++
}
return backlog
}

func (w *Warmer) warm(id image.Name, creds Credentials) {
client, err := w.ClientFactory.ClientFor(id.Registry(), creds)
if err != nil {
Expand All @@ -65,10 +110,27 @@ func (w *Warmer) warm(id image.Name, creds Credentials) {
}
defer client.Cancel()

// FIXME This can only return an empty string, because w.Creds is
// always empty. In other words, keys never include a username
// (need they?)
username := w.Creds.credsFor(id.Registry()).username

// Refresh tags first
// Only, for example, "library/alpine" because we have the host information in the client above.
key, err := cache.NewTagKey(username, id.CanonicalName())
if err != nil {
w.Logger.Log("err", errors.Wrap(err, "creating key for cache"))
return
}

var cacheTags []string
cacheTagsVal, err := w.Reader.GetKey(key)
if err == nil {
err = json.Unmarshal(cacheTagsVal, &cacheTags)
if err != nil {
w.Logger.Log("err", errors.Wrap(err, "deserializing cached tags"))
return
}
} // else assume we have no cached tags

tags, err := client.Tags(id)
if err != nil {
if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) && !strings.Contains(err.Error(), "net/http: request canceled") {
Expand All @@ -83,12 +145,6 @@ func (w *Warmer) warm(id image.Name, creds Credentials) {
return
}

key, err := cache.NewTagKey(username, id.CanonicalName())
if err != nil {
w.Logger.Log("err", errors.Wrap(err, "creating key for cache"))
return
}

err = w.Writer.SetKey(key, val)
if err != nil {
w.Logger.Log("err", errors.Wrap(err, "storing tags in cache"))
Expand Down Expand Up @@ -169,6 +225,46 @@ func (w *Warmer) warm(id image.Name, creds Credentials) {
}
awaitFetchers.Wait()
w.Logger.Log("updated", id.String())

if w.Notify != nil {
// If there's more tags than there used to be, there must be
// at least one new tag.
if len(cacheTags) < len(tags) {
w.Notify()
return
}
// Otherwise, check whether there are any entries in the
// fetched tags that aren't in the cached tags.
tagSet := NewStringSet(tags)
cacheTagSet := NewStringSet(cacheTags)
if !tagSet.Subset(cacheTagSet) {
w.Notify()
}
}
}

// StringSet is a set of strings.
type StringSet map[string]struct{}

// NewStringSet returns a StringSet containing exactly the strings
// given as arguments.
func NewStringSet(ss []string) StringSet {
res := StringSet{}
for _, s := range ss {
res[s] = struct{}{}
}
return res
}

// Subset returns true if `s` is a subset of `t` (including the case
// of having the same members).
func (s StringSet) Subset(t StringSet) bool {
for k := range s {
if _, ok := t[k]; !ok {
return false
}
}
return true
}

func withinExpiryBuffer(expiry time.Time, buffer time.Duration) bool {
Expand Down