Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

registry package rewrite #851

Merged
merged 13 commits into from
Dec 19, 2017
17 changes: 5 additions & 12 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
name = "k8s.io/client-go"
version = "4.0.0"

[[constraint]]
name = "github.com/heroku/docker-registry-client"
source = "github.com/weaveworks/docker-registry-client"
branch = "master"

[[override]]
name = "github.com/ugorji/go"
revision = "8c0409fcbb70099c748d71f714529204975f6c3f"

[[constraint]]
name = "github.com/docker/distribution"
branch = "master"
91 changes: 38 additions & 53 deletions cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import (
"github.com/weaveworks/flux/image"
"github.com/weaveworks/flux/job"
"github.com/weaveworks/flux/registry"
registryMemcache "github.com/weaveworks/flux/registry/cache"
"github.com/weaveworks/flux/registry/cache"
registryMemcache "github.com/weaveworks/flux/registry/cache/memcached"
registryMiddleware "github.com/weaveworks/flux/registry/middleware"
"github.com/weaveworks/flux/remote"
"github.com/weaveworks/flux/ssh"
Expand Down Expand Up @@ -85,11 +86,11 @@ func main() {

gitPollInterval = fs.Duration("git-poll-interval", 5*time.Minute, "period at which to poll git repo for new commits")
// registry
memcachedHostname = fs.String("memcached-hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.")
memcachedHostname = fs.String("memcached-hostname", "memcached", "Hostname for memcached service.")
memcachedTimeout = fs.Duration("memcached-timeout", time.Second, "Maximum time to wait before giving up on memcached requests.")
memcachedService = fs.String("memcached-service", "memcached", "SRV service used to discover memcache servers.")
registryCacheExpiry = fs.Duration("registry-cache-expiry", 20*time.Minute, "Duration to keep cached registry tag info. Must be < 1 month.")
registryPollInterval = fs.Duration("registry-poll-interval", 5*time.Minute, "period at which to poll registry for new images")
registryCacheExpiry = fs.Duration("registry-cache-expiry", 1*time.Hour, "Duration to keep cached image info. Must be < 1 month.")
registryPollInterval = fs.Duration("registry-poll-interval", 5*time.Minute, "period at which to check for updated images")
registryRPS = fs.Int("registry-rps", 200, "maximum registry requests per second per host")
registryBurst = fs.Int("registry-burst", defaultRemoteConnections, "maximum number of warmer connections to remote and memcache")

Expand Down Expand Up @@ -229,61 +230,45 @@ func main() {
}

// Registry components
var cache registry.Registry
var cacheWarmer registry.Warmer
var cacheRegistry registry.Registry
var cacheWarmer *cache.Warmer
{
// Cache
var memcacheRegistry registryMemcache.Client
if *memcachedHostname != "" {
memcacheRegistry = registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{
Host: *memcachedHostname,
Service: *memcachedService,
Timeout: *memcachedTimeout,
UpdateInterval: 1 * time.Minute,
Logger: log.With(logger, "component", "memcached"),
MaxIdleConns: defaultMemcacheConnections,
})
memcacheRegistry = registryMemcache.InstrumentMemcacheClient(memcacheRegistry)
defer memcacheRegistry.Stop()
}
var memcacheWarmer registryMemcache.Client
if *memcachedHostname != "" {
memcacheWarmer = registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{
Host: *memcachedHostname,
Service: *memcachedService,
Timeout: *memcachedTimeout,
UpdateInterval: 1 * time.Minute,
Logger: log.With(logger, "component", "memcached"),
MaxIdleConns: *registryBurst,
})
memcacheWarmer = registryMemcache.InstrumentMemcacheClient(memcacheWarmer)
defer memcacheWarmer.Stop()
}
// Cache client, for use by registry and cache warmer
var cacheClient cache.Client
memcacheClient := registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{
Host: *memcachedHostname,
Service: *memcachedService,
Expiry: *registryCacheExpiry,
Timeout: *memcachedTimeout,
UpdateInterval: 1 * time.Minute,
Logger: log.With(logger, "component", "memcached"),
MaxIdleConns: *registryBurst,
})
defer memcacheClient.Stop()
cacheClient = cache.InstrumentClient(memcacheClient)

cacheLogger := log.With(logger, "component", "cache")
cache = registry.NewRegistry(
registry.NewCacheClientFactory(cacheLogger, memcacheRegistry, *registryCacheExpiry),
cacheLogger,
defaultMemcacheConnections,
)
cache = registry.NewInstrumentedRegistry(cache)
cacheRegistry = &cache.Cache{
Reader: cacheClient,
}
cacheRegistry = registry.NewInstrumentedRegistry(cacheRegistry)

// Remote
// Remote client, for warmer to refresh entries
registryLogger := log.With(logger, "component", "registry")
remoteFactory := registry.NewRemoteClientFactory(registryLogger, registryMiddleware.RateLimiterConfig{
registryLimits := &registryMiddleware.RateLimiters{
RPS: *registryRPS,
Burst: *registryBurst,
})
}
remoteFactory := &registry.RemoteClientFactory{
Logger: registryLogger,
Limiters: registryLimits,
}

// Warmer
warmerLogger := log.With(logger, "component", "warmer")
cacheWarmer = registry.Warmer{
Logger: warmerLogger,
ClientFactory: remoteFactory,
Expiry: *registryCacheExpiry,
Reader: memcacheWarmer,
Writer: memcacheWarmer,
Burst: *registryBurst,
var err error
cacheWarmer, err = cache.NewWarmer(remoteFactory, cacheClient, *registryBurst)
if err != nil {
logger.Log("err", err)
os.Exit(1)
}
}

Expand Down Expand Up @@ -439,7 +424,7 @@ func main() {
V: version,
Cluster: k8s,
Manifests: k8sManifests,
Registry: cache,
Registry: cacheRegistry,
ImageRefresh: make(chan image.Name, 100), // size chosen by fair dice roll
Repo: repo, Checkout: checkout,
Jobs: jobs,
Expand All @@ -458,7 +443,7 @@ func main() {
cacheWarmer.Notify = daemon.AskForImagePoll
cacheWarmer.Priority = daemon.ImageRefresh
shutdownWg.Add(1)
go cacheWarmer.Loop(shutdown, shutdownWg, image_creds)
go cacheWarmer.Loop(log.With(logger, "component", "warmer"), shutdown, shutdownWg, image_creds)

// Update daemonRef so that upstream and handlers point to fully working daemon
daemonRef.UpdatePlatform(daemon)
Expand Down
24 changes: 16 additions & 8 deletions daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/weaveworks/flux/job"
"github.com/weaveworks/flux/policy"
"github.com/weaveworks/flux/registry"
registryMock "github.com/weaveworks/flux/registry/mock"
"github.com/weaveworks/flux/remote"
"github.com/weaveworks/flux/resource"
"github.com/weaveworks/flux/update"
Expand Down Expand Up @@ -321,6 +322,11 @@ func TestDaemon_JobStatusWithNoCache(t *testing.T) {
w.ForJobSucceeded(d, id)
}

func makeImageInfo(ref string, t time.Time) image.Info {
r, _ := image.ParseRef(ref)
return image.Info{ID: r, CreatedAt: t}
}

func mockDaemon(t *testing.T) (*Daemon, func(), *cluster.Mock, *mockEventWriter) {
logger := log.NewNopLogger()

Expand Down Expand Up @@ -397,14 +403,16 @@ func mockDaemon(t *testing.T) (*Daemon, func(), *cluster.Mock, *mockEventWriter)

var imageRegistry registry.Registry
{
img1, _ := image.ParseInfo(currentHelloImage, time.Now())
img2, _ := image.ParseInfo(newHelloImage, time.Now().Add(1*time.Second))
img3, _ := image.ParseInfo("another/service:latest", time.Now().Add(1*time.Second))
imageRegistry = registry.NewMockRegistry([]image.Info{
img1,
img2,
img3,
}, nil)
img1 := makeImageInfo(currentHelloImage, time.Now())
img2 := makeImageInfo(newHelloImage, time.Now().Add(1*time.Second))
img3 := makeImageInfo("another/service:latest", time.Now().Add(1*time.Second))
imageRegistry = &registryMock.Registry{
Images: []image.Info{
img1,
img2,
img3,
},
}
}

events := &mockEventWriter{}
Expand Down
4 changes: 2 additions & 2 deletions daemon/loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/weaveworks/flux/git"
"github.com/weaveworks/flux/git/gittest"
"github.com/weaveworks/flux/job"
"github.com/weaveworks/flux/registry"
registryMock "github.com/weaveworks/flux/registry/mock"
"github.com/weaveworks/flux/resource"
)

Expand Down Expand Up @@ -66,7 +66,7 @@ func daemon(t *testing.T) (*Daemon, func()) {
d := &Daemon{
Cluster: k8s,
Manifests: k8s,
Registry: registry.NewMockRegistry(nil, nil),
Registry: &registryMock.Registry{},
Checkout: working,
Jobs: jobs,
JobStatusCache: &job.StatusCache{Size: 100},
Expand Down
17 changes: 7 additions & 10 deletions deploy/flux-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,17 @@ spec:
- name: git-key
mountPath: /etc/fluxd/ssh
args:
# if you deployed memcached, you can supply these arguments to
# tell fluxd to use it. You may need to change the namespace
# (`default`) if you run fluxd in another namespace.
- --memcached-hostname=memcached.default.svc.cluster.local
- --memcached-timeout=100ms
- --memcached-service=memcached
- --registry-cache-expiry=20m

# if you deployed memcached in a different namespace to flux,
# or with a different service name, you can supply these
# following two arguments to tell fluxd how to connect to it.
# - --memcached-hostname=memcached.default.svc.cluster.local
# - --memcached-service=memcached

# replace (at least) the following URL
- [email protected]:weaveworks/flux-example
- --git-branch=master
# include these next two to connect to an "upstream" service
# (e.g., Weave Cloud). The token is particular to the service.
# - --connect=wss://cloud.weave.works/api/flux
# - --token=abc123abc123abc123abc123
# override -b and -t arguments to ssh-keygen
# - --ssh-keygen-bits=2048
- --ssh-keygen-type=ed25519
42 changes: 24 additions & 18 deletions image/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,32 +221,49 @@ func (i Ref) WithNewTag(t string) Ref {
return img
}

// Info has the metadata we are able to determine about an image, from
// its registry.
// Info has the metadata we are able to determine about an image ref,
// from its registry.
type Info struct {
ID Ref
// the reference to this image; probably a tagged image name
ID Ref
// the digest we got when fetching the metadata, which will be
// different each time a manifest is uploaded for the reference
Digest string
// an identifier for the *image* this reference points to; this
// will be the same for references that point at the same image
// (but does not necessarily equal Docker's image ID)
ImageID string
// the time at which the image pointed at was created
CreatedAt time.Time
}

// MarshalJSON returns the Info value in JSON (as bytes). It is
// implemented so that we can omit the `CreatedAt` value when it's
// zero, which would otherwise be tricky for e.g., JavaScript to
// detect.
func (im Info) MarshalJSON() ([]byte, error) {
type InfoAlias Info // alias to shed existing MarshalJSON implementation
var t string
if !im.CreatedAt.IsZero() {
t = im.CreatedAt.UTC().Format(time.RFC3339Nano)
}
encode := struct {
ID Ref
InfoAlias
CreatedAt string `json:",omitempty"`
}{im.ID, t}
}{InfoAlias(im), t}
return json.Marshal(encode)
}

// UnmarshalJSON populates an Info from JSON (as bytes). It's the
// companion to MarshalJSON above.
func (im *Info) UnmarshalJSON(b []byte) error {
type InfoAlias Info
unencode := struct {
ID Ref
InfoAlias
CreatedAt string `json:",omitempty"`
}{}
json.Unmarshal(b, &unencode)
im.ID = unencode.ID
*im = Info(unencode.InfoAlias)
if unencode.CreatedAt == "" {
im.CreatedAt = time.Time{}
} else {
Expand All @@ -259,17 +276,6 @@ func (im *Info) UnmarshalJSON(b []byte) error {
return nil
}

func ParseInfo(s string, createdAt time.Time) (Info, error) {
id, err := ParseRef(s)
if err != nil {
return Info{}, err
}
return Info{
ID: id,
CreatedAt: createdAt,
}, nil
}

// ByCreatedDesc is a shim used to sort image info by creation date
type ByCreatedDesc []Info

Expand Down
Loading