From 1fc19ef4e7db4f3653c554e333cbf6dd7bd8d55b Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Tue, 17 Dec 2024 13:52:35 +1100 Subject: [PATCH] Derive image check pull policies from post-patch podSpec --- charts/agent-stack-k8s/values.schema.json | 12 + cmd/controller/controller.go | 10 + cmd/controller/controller_test.go | 2 + examples/config.yaml | 2 + go.mod | 12 +- go.sum | 27 +- internal/controller/config/config.go | 5 + internal/controller/controller.go | 28 +- internal/controller/scheduler/job_watcher.go | 33 +- internal/controller/scheduler/metrics.go | 13 +- internal/controller/scheduler/pod_watcher.go | 382 +++++++++++------- internal/controller/scheduler/scheduler.go | 299 +++++++++----- .../controller/scheduler/scheduler_test.go | 6 +- internal/integration/fixtures/never-pull.yaml | 12 + internal/integration/integration_test.go | 25 +- 15 files changed, 563 insertions(+), 305 deletions(-) create mode 100644 internal/integration/fixtures/never-pull.yaml diff --git a/charts/agent-stack-k8s/values.schema.json b/charts/agent-stack-k8s/values.schema.json index 5b637d32..df8afd8a 100644 --- a/charts/agent-stack-k8s/values.schema.json +++ b/charts/agent-stack-k8s/values.schema.json @@ -491,6 +491,18 @@ } } }, + "default-image-pull-policy": { + "type": "string", + "description": "Configures a default image pull policy for containers that do not specify a pull policy, or containers created by the stack itself", + "default": "IfNotPresent", + "examples": ["Always", "IfNotPresent", "Never", ""] + }, + "default-image-check-pull-policy": { + "type": "string", + "description": "Configures a default image pull policy for image-check init containers, used if an image pull policy is not set for the corresponding container in a podSpec or podSpecPatch", + "default": "", + "examples": ["Always", "IfNotPresent", "Never", ""] + }, "pod-spec-patch": { "$ref": "https://kubernetesjsonschema.dev/master/_definitions.json#/definitions/io.k8s.api.core.v1.PodSpec" } diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index 370ba057..530147e8 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -115,6 +115,16 @@ func AddConfigFlags(cmd *cobra.Command) { config.DefaultEmptyJobGracePeriod, "Duration after starting a Kubernetes job that the controller will wait before considering failing the job due to a missing pod (e.g. when the podSpec specifies a missing service account)", ) + cmd.Flags().String( + "default-image-pull-policy", + "IfNotPresent", + "Configures a default image pull policy for containers that do not specify a pull policy and non-init containers created by the stack itself", + ) + cmd.Flags().String( + "default-image-check-pull-policy", + "", + "Sets a default PullPolicy for image-check init containers, used if an image pull policy is not set for the corresponding container in a podSpec or podSpecPatch", + ) cmd.Flags().Bool( "prohibit-kubernetes-plugin", false, diff --git a/cmd/controller/controller_test.go b/cmd/controller/controller_test.go index 74f1f4b4..7577ba4d 100644 --- a/cmd/controller/controller_test.go +++ b/cmd/controller/controller_test.go @@ -37,6 +37,8 @@ func TestReadAndParseConfig(t *testing.T) { ClusterUUID: "beefcafe-abbe-baba-abba-deedcedecade", ProhibitKubernetesPlugin: true, GraphQLEndpoint: "http://graphql.buildkite.localhost/v1", + DefaultImagePullPolicy: "Never", + DefaultImageCheckPullPolicy: "IfNotPresent", WorkspaceVolume: &corev1.Volume{ Name: "workspace-2-the-reckoning", diff --git a/examples/config.yaml b/examples/config.yaml index c3d60921..3245c478 100644 --- a/examples/config.yaml +++ b/examples/config.yaml @@ -11,6 +11,8 @@ job-creation-concurrency: 5 max-in-flight: 100 namespace: my-buildkite-ns org: my-buildkite-org +default-image-pull-policy: Never +default-image-check-pull-policy: IfNotPresent # Setting a custom GraphQL endpoint is usually only useful if you have a # different instance of Buildkite itself available to run. diff --git a/go.mod b/go.mod index 0f0c483c..2e7c0907 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/Khan/genqlient v0.7.0 github.com/buildkite/go-buildkite/v3 v3.13.0 github.com/buildkite/roko v1.2.0 + github.com/distribution/reference v0.6.0 github.com/go-playground/locales v0.14.1 github.com/go-playground/universal-translator v0.18.1 github.com/go-playground/validator/v10 v10.23.0 @@ -69,7 +70,7 @@ require ( github.com/dustinkirkland/golang-petname v0.0.0-20240428194347-eebcea082ee0 // indirect github.com/eapache/queue/v2 v2.0.0-20230407133247-75960ed334e4 // indirect github.com/ebitengine/purego v0.7.1 // indirect - github.com/fatih/color v1.16.0 // indirect + github.com/fatih/color v1.18.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect @@ -95,6 +96,9 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect + github.com/onsi/ginkgo/v2 v2.22.0 // indirect + github.com/onsi/gomega v1.36.0 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect github.com/outcaste-io/ristretto v0.2.3 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/power-devops/perfstat v0.0.0-20220216144756-c35f1ee13d7c // indirect @@ -149,7 +153,7 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/denisbrodbeck/machineid v1.0.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect - github.com/emicklei/go-restful/v3 v3.11.0 // indirect + github.com/emicklei/go-restful/v3 v3.12.1 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-chi/chi/v5 v5.1.0 // indirect github.com/go-logr/logr v1.4.2 // indirect @@ -194,7 +198,7 @@ require ( github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/secure-systems-lab/go-securesystemslib v0.8.0 // indirect github.com/spf13/afero v1.11.0 // indirect - github.com/spf13/cast v1.6.0 // indirect + github.com/spf13/cast v1.7.0 // indirect github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.10.0 github.com/subosito/gotenv v1.6.0 // indirect @@ -216,7 +220,7 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.29.0 // indirect - golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect + golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/net v0.31.0 // indirect golang.org/x/oauth2 v0.24.0 // indirect golang.org/x/sys v0.27.0 // indirect diff --git a/go.sum b/go.sum index eab70183..bc2718e7 100644 --- a/go.sum +++ b/go.sum @@ -159,6 +159,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA= +github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= +github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/dnephin/pflag v1.0.7 h1:oxONGlWxhmUct0YzKTgrpQv9AUA1wtPBn7zuSjJqptk= github.com/dnephin/pflag v1.0.7/go.mod h1:uxE91IoWURlOiTUIA8Mq5ZZkAv3dPUfZNaT80Zm7OQE= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -170,16 +172,17 @@ github.com/eapache/queue/v2 v2.0.0-20230407133247-75960ed334e4 h1:8EXxF+tCLqaVk8 github.com/eapache/queue/v2 v2.0.0-20230407133247-75960ed334e4/go.mod h1:I5sHm0Y0T1u5YjlyqC5GVArM7aNZRUYtTjmJ8mPJFds= github.com/ebitengine/purego v0.7.1 h1:6/55d26lG3o9VCZX8lping+bZcmShseiqlh2bnUDiPA= github.com/ebitengine/purego v0.7.1/go.mod h1:ah1In8AOtksoNK6yk5z1HTJeUkC1Ez4Wk2idgGslMwQ= -github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= -github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtzpL63nKAU= +github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= -github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= +github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= +github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= @@ -367,10 +370,12 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/oleiade/reflections v1.1.0 h1:D+I/UsXQB4esMathlt0kkZRJZdUDmhv5zGi/HOwYTWo= github.com/oleiade/reflections v1.1.0/go.mod h1:mCxx0QseeVCHs5Um5HhJeCKVC7AwS8kO67tky4rdisA= -github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM= -github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= -github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4= -github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= +github.com/onsi/ginkgo/v2 v2.22.0 h1:Yed107/8DjTr0lKCNt7Dn8yQ6ybuDRQoMGrNFKzMfHg= +github.com/onsi/ginkgo/v2 v2.22.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= +github.com/onsi/gomega v1.36.0 h1:Pb12RlruUtj4XUuPUqeEWc6j5DkVVVA49Uf6YLfC95Y= +github.com/onsi/gomega v1.36.0/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/outcaste-io/ristretto v0.2.3 h1:AK4zt/fJ76kjlYObOeNwh4T3asEuaCmp26pOvUOL9w0= @@ -448,8 +453,8 @@ github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0b github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= -github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= -github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= +github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= @@ -563,8 +568,8 @@ golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOM golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY= -golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/internal/controller/config/config.go b/internal/controller/config/config.go index b5bb5ed2..c3eec1e3 100644 --- a/internal/controller/config/config.go +++ b/internal/controller/config/config.go @@ -64,6 +64,9 @@ type Config struct { DefaultSidecarParams *SidecarParams `json:"default-sidecar-params" validate:"omitempty"` DefaultMetadata Metadata `json:"default-metadata" validate:"omitempty"` + DefaultImagePullPolicy corev1.PullPolicy `json:"default-image-pull-policy" validate:"omitempty"` + DefaultImageCheckPullPolicy corev1.PullPolicy `json:"default-image-check-pull-policy" validate:"omitempty"` + // ProhibitKubernetesPlugin can be used to prevent alterations to the pod // from the job (the kubernetes "plugin" in pipeline.yml). If enabled, // jobs with a "kubernetes" plugin will fail. @@ -120,6 +123,8 @@ func (c Config) MarshalLogObject(enc zapcore.ObjectEncoder) error { if err := enc.AddReflected("default-metadata", c.DefaultMetadata); err != nil { return err } + enc.AddString("default-image-pull-policy", string(c.DefaultImagePullPolicy)) + enc.AddString("default-image-check-pull-policy", string(c.DefaultImageCheckPullPolicy)) return nil } diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 3cca6362..40b046e4 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -87,19 +87,21 @@ func Run( // Scheduler does the complicated work of converting a Buildkite job into // a pod to run that job. It talks to the k8s API to create pods. sched := scheduler.New(logger.Named("scheduler"), k8sClient, scheduler.Config{ - Namespace: cfg.Namespace, - Image: cfg.Image, - AgentTokenSecretName: cfg.AgentTokenSecret, - JobTTL: cfg.JobTTL, - AdditionalRedactedVars: cfg.AdditionalRedactedVars, - WorkspaceVolume: cfg.WorkspaceVolume, - AgentConfig: cfg.AgentConfig, - DefaultCheckoutParams: cfg.DefaultCheckoutParams, - DefaultCommandParams: cfg.DefaultCommandParams, - DefaultSidecarParams: cfg.DefaultSidecarParams, - DefaultMetadata: cfg.DefaultMetadata, - PodSpecPatch: cfg.PodSpecPatch, - ProhibitK8sPlugin: cfg.ProhibitKubernetesPlugin, + Namespace: cfg.Namespace, + Image: cfg.Image, + AgentTokenSecretName: cfg.AgentTokenSecret, + JobTTL: cfg.JobTTL, + AdditionalRedactedVars: cfg.AdditionalRedactedVars, + WorkspaceVolume: cfg.WorkspaceVolume, + AgentConfig: cfg.AgentConfig, + DefaultCheckoutParams: cfg.DefaultCheckoutParams, + DefaultCommandParams: cfg.DefaultCommandParams, + DefaultSidecarParams: cfg.DefaultSidecarParams, + DefaultMetadata: cfg.DefaultMetadata, + DefaultImagePullPolicy: cfg.DefaultImagePullPolicy, + DefaultImageCheckPullPolicy: cfg.DefaultImageCheckPullPolicy, + PodSpecPatch: cfg.PodSpecPatch, + ProhibitK8sPlugin: cfg.ProhibitKubernetesPlugin, }) informerFactory, err := NewInformerFactory(k8sClient, cfg.Namespace, cfg.Tags) diff --git a/internal/controller/scheduler/job_watcher.go b/internal/controller/scheduler/job_watcher.go index 3e202c89..74c39f0e 100644 --- a/internal/controller/scheduler/job_watcher.go +++ b/internal/controller/scheduler/job_watcher.go @@ -38,7 +38,7 @@ type jobWatcher struct { // Tracks stalling jobs (jobs that have yet to create pods). stallingJobsMu sync.Mutex - stallingJobs map[*batchv1.Job]struct{} + stallingJobs map[uuid.UUID]*batchv1.Job // Tracks jobs that are being cleaned up (to avoid repeats). ignoredJobsMu sync.RWMutex @@ -58,7 +58,7 @@ func NewJobWatcher(logger *zap.Logger, k8sClient kubernetes.Interface, cfg *conf logger: logger, k8s: k8sClient, cfg: cfg, - stallingJobs: make(map[*batchv1.Job]struct{}), + stallingJobs: make(map[uuid.UUID]*batchv1.Job), ignoredJobs: make(map[uuid.UUID]struct{}), } jobsStallingGaugeFunc = func() int { @@ -118,13 +118,14 @@ func (w *jobWatcher) OnDelete(prev any) { if kjob == nil { return } - w.removeFromStalling(kjob) jobUUID, err := jobUUIDForObject(kjob) if err != nil { return } + w.removeFromStalling(jobUUID) + // The job is gone, so we can stop ignoring it (if it comes back). w.unignoreJob(jobUUID) @@ -145,10 +146,10 @@ func (w *jobWatcher) runChecks(ctx context.Context, kjob *batchv1.Job) { } if model.JobFinished(kjob) { - w.removeFromStalling(kjob) + w.removeFromStalling(jobUUID) w.checkFinishedWithoutPod(ctx, log, kjob) } else { - w.checkStalledWithoutPod(log, kjob) + w.checkStalledWithoutPod(log, jobUUID, kjob) } } @@ -171,7 +172,7 @@ func (w *jobWatcher) checkFinishedWithoutPod(ctx context.Context, log *zap.Logge w.failJob(ctx, log, kjob, message) } -func (w *jobWatcher) checkStalledWithoutPod(log *zap.Logger, kjob *batchv1.Job) { +func (w *jobWatcher) checkStalledWithoutPod(log *zap.Logger, jobUUID uuid.UUID, kjob *batchv1.Job) { log.Debug("Checking job for stalling without a pod") // If the job is not finished and there is no pod, it should start one @@ -184,7 +185,7 @@ func (w *jobWatcher) checkStalledWithoutPod(log *zap.Logger, kjob *batchv1.Job) } if pods > 0 { // All's well with the world. - w.removeFromStalling(kjob) + w.removeFromStalling(jobUUID) return } @@ -193,7 +194,7 @@ func (w *jobWatcher) checkStalledWithoutPod(log *zap.Logger, kjob *batchv1.Job) return } - w.addToStalling(kjob) + w.addToStalling(jobUUID, kjob) } func (w *jobWatcher) fetchEvents(ctx context.Context, log *zap.Logger, kjob *batchv1.Job) string { @@ -248,16 +249,16 @@ func (w *jobWatcher) formatEvents(evlist *corev1.EventList) string { return tw.Render() } -func (w *jobWatcher) addToStalling(kjob *batchv1.Job) { +func (w *jobWatcher) addToStalling(jobUUID uuid.UUID, kjob *batchv1.Job) { w.stallingJobsMu.Lock() defer w.stallingJobsMu.Unlock() - w.stallingJobs[kjob] = struct{}{} + w.stallingJobs[jobUUID] = kjob } -func (w *jobWatcher) removeFromStalling(kjob *batchv1.Job) { +func (w *jobWatcher) removeFromStalling(jobUUID uuid.UUID) { w.stallingJobsMu.Lock() defer w.stallingJobsMu.Unlock() - delete(w.stallingJobs, kjob) + delete(w.stallingJobs, jobUUID) } func (w *jobWatcher) stalledJobChecker(ctx context.Context) { @@ -274,21 +275,17 @@ func (w *jobWatcher) stalledJobChecker(ctx context.Context) { // Gather stalled jobs var stalled []*batchv1.Job w.stallingJobsMu.Lock() - for kjob := range w.stallingJobs { + for jobUUID, kjob := range w.stallingJobs { if time.Since(kjob.Status.StartTime.Time) < w.cfg.EmptyJobGracePeriod { continue } // ignore it from now until it is deleted - jobUUID, err := jobUUIDForObject(kjob) - if err != nil { - continue - } w.ignoreJob(jobUUID) // Move it from w.stalling into stalled stalled = append(stalled, kjob) - delete(w.stallingJobs, kjob) + delete(w.stallingJobs, jobUUID) } w.stallingJobsMu.Unlock() diff --git a/internal/controller/scheduler/metrics.go b/internal/controller/scheduler/metrics.go index f1846d20..e1ad7804 100644 --- a/internal/controller/scheduler/metrics.go +++ b/internal/controller/scheduler/metrics.go @@ -133,9 +133,10 @@ var ( // Pod watcher metrics var ( - // Overridden to return len(jobCancelCheckers) by podWatcher. - jobCancelCheckerGaugeFunc = func() int { return 0 } - podWatcherIgnoredJobsGaugeFunc = func() int { return 0 } + // Overridden by podWatcher. + jobCancelCheckerGaugeFunc = func() int { return 0 } + podWatcherIgnoredJobsGaugeFunc = func() int { return 0 } + watchingForImageFailureGaugeFunc = func() int { return 0 } _ = promauto.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: promNamespace, @@ -149,6 +150,12 @@ var ( Name: "num_ignored_jobs", Help: "Current count of jobs ignored for podWatcher checks", }, func() float64 { return float64(podWatcherIgnoredJobsGaugeFunc()) }) + _ = promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: promNamespace, + Subsystem: "pod_watcher", + Name: "num_watching_for_image_failure", + Help: "Current count of pods being watched for potential image-related failures", + }, func() float64 { return float64(watchingForImageFailureGaugeFunc()) }) podWatcherOnAddEventCounter = promauto.NewCounter(prometheus.CounterOpts{ Namespace: promNamespace, diff --git a/internal/controller/scheduler/pod_watcher.go b/internal/controller/scheduler/pod_watcher.go index b5c4b6c0..e4010cc4 100644 --- a/internal/controller/scheduler/pod_watcher.go +++ b/internal/controller/scheduler/pod_watcher.go @@ -1,17 +1,16 @@ package scheduler import ( + "cmp" "context" "errors" - "fmt" "regexp" "slices" - "strings" + "strconv" "sync" "time" "github.com/buildkite/agent-stack-k8s/v2/api" - "github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags" "github.com/buildkite/agent-stack-k8s/v2/internal/controller/config" agentcore "github.com/buildkite/agent/v3/core" @@ -44,6 +43,11 @@ type podWatcher struct { ignoredJobsMu sync.RWMutex ignoredJobs map[uuid.UUID]struct{} + // Pods being watched for image-related failures (ImagePullBackOff, + // ErrImageNeverPull, etc) + watchingForImageFailureMu sync.Mutex + watchingForImageFailure map[uuid.UUID]*corev1.Pod + // The job cancel checkers query the job state every so often. jobCancelCheckerInterval time.Duration @@ -57,19 +61,18 @@ type podWatcher struct { // library outside of our control is a carve-out from the usual rule.) // The context is needed to ensure job cancel checkers are cleaned up. resourceEventHandlerCtx context.Context - - agentTags map[string]string } // NewPodWatcher creates an informer that does various things with pods and // Buildkite jobs: // - If an init container fails, the BK Agent REST API will be used to fail // the job (since an agent hasn't run yet). -// - If a container stays in ImagePullBackOff state for too long, the BK -// Agent REST API will be used to fail the job and the pod will be evicted. -// - If a container stays in ImagePullBackOff, and the pod somehow got through -// all the init containers (including the image pull checks...) the BK -// GraphQL API will be used to cancel the job instead. +// - If a container stays in ImagePullBackOff or ErrImageNeverPull state for +// too long, the BK Agent REST API will be used to fail the job and the pod +// will be evicted. +// - If a container stays in ImagePullBackOff or ErrImageNeverPull, and the +// pod somehow got through all the init containers (including the image +// checks...) the BK GraphQL API will be used to cancel the job instead. // - If a pod is pending, every so often Buildkite will be checked to see if // the corresponding job has been cancelled so that the pod can be evicted // early. @@ -83,11 +86,6 @@ func NewPodWatcher(logger *zap.Logger, k8s kubernetes.Interface, cfg *config.Con jobCancelCheckerInterval = config.DefaultJobCancelCheckerPollInterval } - agentTags, errs := agenttags.TagMapFromTags(cfg.Tags) - if len(errs) > 0 { - logger.Warn("parsing agent tags", zap.Errors("errors", errs)) - } - pw := &podWatcher{ logger: logger, k8s: k8s, @@ -96,8 +94,8 @@ func NewPodWatcher(logger *zap.Logger, k8s kubernetes.Interface, cfg *config.Con imagePullBackOffGracePeriod: imagePullBackOffGracePeriod, jobCancelCheckerInterval: jobCancelCheckerInterval, ignoredJobs: make(map[uuid.UUID]struct{}), + watchingForImageFailure: make(map[uuid.UUID]*corev1.Pod), cancelCheckerChs: make(map[uuid.UUID]*onceChan), - agentTags: agentTags, } podWatcherIgnoredJobsGaugeFunc = func() int { pw.ignoredJobsMu.RLock() @@ -109,6 +107,11 @@ func NewPodWatcher(logger *zap.Logger, k8s kubernetes.Interface, cfg *config.Con defer pw.cancelCheckerChsMu.Unlock() return len(pw.cancelCheckerChs) } + watchingForImageFailureGaugeFunc = func() int { + pw.watchingForImageFailureMu.Lock() + defer pw.watchingForImageFailureMu.Unlock() + return len(pw.watchingForImageFailure) + } return pw } @@ -120,14 +123,15 @@ func (w *podWatcher) RegisterInformer(ctx context.Context, factory informers.Sha } w.resourceEventHandlerCtx = ctx // 😡 go factory.Start(ctx.Done()) + go w.imageFailureChecker(ctx, w.logger) return nil } -func (w *podWatcher) OnDelete(maybePod any) { +func (w *podWatcher) OnDelete(previousState any) { podWatcherOnDeleteEventCounter.Inc() - pod, wasPod := maybePod.(*corev1.Pod) - if !wasPod { + pod, _ := previousState.(*corev1.Pod) + if pod == nil { return } @@ -138,38 +142,34 @@ func (w *podWatcher) OnDelete(maybePod any) { return } + // No need to continue watching for image-related failures or cancellation. + w.stopWatchingForImageFailure(jobUUID) w.stopJobCancelChecker(jobUUID) // The pod is gone, so we can stop ignoring it (if it comes back). w.unignoreJob(jobUUID) } -func (w *podWatcher) OnAdd(maybePod any, isInInitialList bool) { +func (w *podWatcher) OnAdd(currentState any, _ bool) { podWatcherOnAddEventCounter.Inc() - pod, wasPod := maybePod.(*corev1.Pod) - if !wasPod { + pod, _ := currentState.(*corev1.Pod) + if pod == nil { return } w.runChecks(w.resourceEventHandlerCtx, pod) } -func (w *podWatcher) OnUpdate(oldMaybePod, newMaybePod any) { +func (w *podWatcher) OnUpdate(_, currentState any) { podWatcherOnUpdateEventCounter.Inc() - oldPod, oldWasPod := newMaybePod.(*corev1.Pod) - newPod, newWasPod := newMaybePod.(*corev1.Pod) - - // This nonsense statement is only necessary because the types are too loose. - // Most likely both old and new are going to be Pods. - switch { - case newWasPod: - w.runChecks(w.resourceEventHandlerCtx, newPod) - - case oldWasPod: - w.runChecks(w.resourceEventHandlerCtx, oldPod) + // The previous state of the objects is not needed to run checks. + pod, _ := currentState.(*corev1.Pod) + if pod == nil { + return } + w.runChecks(w.resourceEventHandlerCtx, pod) } func (w *podWatcher) runChecks(ctx context.Context, pod *corev1.Pod) { @@ -180,6 +180,25 @@ func (w *podWatcher) runChecks(ctx context.Context, pod *corev1.Pod) { return } + // If it's no longer pending, stop the job cancel checker. If its neither + // pending nor running, stop watching for image-related failures. + // (This makes sense to do whether or not the job UUID is ignored.) + switch pod.Status.Phase { + case corev1.PodPending: + // Continue watching for failures or cancellation, if already doing so. + + case corev1.PodRunning: + // Running: the agent container has started or is about to start, and it + // can handle the cancellation and exit. + w.stopJobCancelChecker(jobUUID) + + default: + // Succeeded, Failed: it's already over. + // Unknown: probably shouldn't interfere. + w.stopWatchingForImageFailure(jobUUID) + w.stopJobCancelChecker(jobUUID) + } + if w.isIgnored(jobUUID) { log.Debug("Job is currently ignored for podWatcher checks") return @@ -189,21 +208,27 @@ func (w *podWatcher) runChecks(ctx context.Context, pod *corev1.Pod) { // (Note: users can define their own init containers through podSpec.) w.failOnInitContainerFailure(ctx, log, pod) - // Check for a container stuck in ImagePullBackOff or InvalidImageName, - // and fail or cancel the job accordingly. - w.failOnImagePullFailure(ctx, log, pod, jobUUID) + // Check for Buildkite job cancellation while the pod is pending. + // Check that the pod doesn't stay in ImagePullBackOff or ErrImageNeverPull + // for too long. + switch pod.Status.Phase { + case corev1.PodPending: + w.watchForImageFailure(jobUUID, pod) + w.startJobCancelChecker(ctx, log, pod.ObjectMeta, jobUUID) - // Check whether the agent container has started yet, and start or stop the - // job cancel checker accordingly. - w.startOrStopJobCancelChecker(ctx, log, pod, jobUUID) + case corev1.PodRunning: + w.watchForImageFailure(jobUUID, pod) + } } -func (w *podWatcher) failOnImagePullFailure(ctx context.Context, log *zap.Logger, pod *corev1.Pod, jobUUID uuid.UUID) { - log.Debug("Checking pod containers for ImagePullBackOff or InvalidImageName") - - failImmediately := false +// podHasFailingImages returns a slice of container statuses when a pod has +// a container in an image-related failing state (ImagePullBackOff, +// ErrImageNeverPull, etc) for too long. If the slice is empty or nil, the pod +// is not failing (yet). +func (w *podWatcher) podHasFailingImages(log *zap.Logger, pod *corev1.Pod) []corev1.ContainerStatus { + failImmediately := false // becomes true for InvalidImageName - images := make(map[string]struct{}) + var statuses []corev1.ContainerStatus // If any init container fails to pull, whether it's one we added // specifically to check for pull failure, the pod won't run. @@ -212,11 +237,13 @@ func (w *podWatcher) failOnImagePullFailure(ctx context.Context, log *zap.Logger if waiting == nil { continue } + switch waiting.Reason { - case "ImagePullBackOff": - images[containerStatus.Image] = struct{}{} + case "ImagePullBackOff", "ErrImageNeverPull": + statuses = append(statuses, containerStatus) + case "InvalidImageName": - images[containerStatus.Image] = struct{}{} + statuses = append(statuses, containerStatus) failImmediately = true } } @@ -229,99 +256,46 @@ func (w *podWatcher) failOnImagePullFailure(ctx context.Context, log *zap.Logger if waiting == nil { continue } + switch waiting.Reason { - case "ImagePullBackOff": + case "ImagePullBackOff", "ErrImageNeverPull": if !isSystemContainer(&containerStatus) { log.Info("Ignoring container during ImagePullBackOff watch.", zap.String("name", containerStatus.Name)) continue } - images[containerStatus.Image] = struct{}{} + statuses = append(statuses, containerStatus) + case "InvalidImageName": - images[containerStatus.Image] = struct{}{} + statuses = append(statuses, containerStatus) failImmediately = true } } - if len(images) == 0 { + if len(statuses) == 0 { // All's well with the world. - return + return nil } - if !failImmediately { // apply the grace period - if pod.Status.StartTime == nil { - // Status could be unpopulated, or it hasn't started yet. - return - } - startedAt := pod.Status.StartTime.Time - if startedAt.IsZero() || time.Since(startedAt) < w.imagePullBackOffGracePeriod { - // Not started yet, or started recently - return - } + if failImmediately { + return statuses } - // Get the current job state from BK. - // What we do next depends on what state it is in. - resp, err := api.GetCommandJob(ctx, w.gql, jobUUID.String()) - if err != nil { - log.Warn("Failed to query command job", zap.Error(err)) - return + // Apply the grace period + if pod.Status.StartTime == nil { + // Status could be unpopulated, or it hasn't started yet. + return nil } - job, ok := resp.Job.(*api.GetCommandJobJobJobTypeCommand) - if !ok { - log.Warn("Job was not a command job") - return + startedAt := pod.Status.StartTime.Time + if startedAt.IsZero() || time.Since(startedAt) < w.imagePullBackOffGracePeriod { + // Not started yet, or started recently + return nil } - log = log.With(zap.String("job_state", string(job.State))) - - switch job.State { - case api.JobStatesScheduled: - // We can acquire it and fail it ourselves. - log.Info("One or more job containers are in ImagePullBackOff. Failing.") - message := w.formatImagePullFailureMessage(images) - switch err := acquireAndFailForObject(ctx, log, w.k8s, w.cfg, pod, message); { - case errors.Is(err, agentcore.ErrJobAcquisitionRejected): - podWatcherBuildkiteJobFailErrorsCounter.Inc() - // If the error was because BK rejected the job acquisition, then - // it's moved on to a state where we need to cancel instead. - // (The init container probably successfully pulled, but another - // pull of the same image later on failed after the agent started.) - log.Info("Attempting to cancel job instead") - w.cancelJob(ctx, log, pod, jobUUID) - return - - case err != nil: - podWatcherBuildkiteJobFailErrorsCounter.Inc() - - // Maybe the job was cancelled in the meantime? - log.Error("Could not fail Buildkite job", zap.Error(err)) - return - } - podWatcherBuildkiteJobFailsCounter.Inc() - // Also evict the pod, because it won't die on its own. - w.evictPod(ctx, log, pod, jobUUID) - - case api.JobStatesAccepted, api.JobStatesAssigned, api.JobStatesRunning: - // An agent is already doing something with the job - now canceling - // is the only lever available. - log.Info("One or more job containers are in ImagePullBackOff. Cancelling.") - w.cancelJob(ctx, log, pod, jobUUID) - - case api.JobStatesCanceling, api.JobStatesCanceled, api.JobStatesFinished, api.JobStatesSkipped: - // If the job is in one of these states, we can neither acquire nor - // cancel it (now or in the future). - log.Debug("Job not acquirable or cancelable") - w.ignoreJob(jobUUID) - - default: - // Most states don't make sense for a command job that we've started - // a pod for (e.g. blocked, broken, expired, pending, waiting, ...) - // Maybe the meanings of states has changed since this build? - // Log a message but don't do anything. - log.Warn("Job not in actionable state") - } + return statuses } +// failOnInitContainerFailure looks for init containers that failed, and fails +// the job on Buildkite. func (w *podWatcher) failOnInitContainerFailure(ctx context.Context, log *zap.Logger, pod *corev1.Pod) { log.Debug("Checking pod for failed init containers") @@ -380,19 +354,25 @@ func (w *podWatcher) formatInitContainerFails(terms map[string]*corev1.Container return "The following init containers failed:\n\n" + tw.Render() } -func (w *podWatcher) formatImagePullFailureMessage(images map[string]struct{}) string { - // Format the failed images into a nice sorted list. - imagesList := make([]string, 0, len(images)) - for image := range images { - imagesList = append(imagesList, image) - } - slices.Sort(imagesList) - var message strings.Builder - message.WriteString("The following container images couldn't be pulled:\n") - for _, image := range imagesList { - fmt.Fprintf(&message, " * %q\n", image) - } - return message.String() +func (w *podWatcher) formatImagePullFailureMessage(statuses []corev1.ContainerStatus) string { + slices.SortFunc(statuses, func(a, b corev1.ContainerStatus) int { + return cmp.Compare(a.Name, b.Name) + }) + + tw := table.NewWriter() + tw.SetStyle(table.StyleColoredDark) + tw.AppendHeader(table.Row{"CONTAINER", "IMAGE REF", "REASON", "MESSAGE"}) + tw.AppendSeparator() + for _, status := range statuses { + // So far this function is only used for Waiting statuses. + tw.AppendRow(table.Row{ + status.Name, + strconv.Quote(status.Image), // may be malformed, hence quoting + status.State.Waiting.Reason, + status.State.Waiting.Message, + }) + } + return "The following images could not be pulled or were unavailable:\n\n" + tw.Render() } func (w *podWatcher) evictPod(ctx context.Context, log *zap.Logger, pod *corev1.Pod, jobUUID uuid.UUID) { @@ -435,17 +415,119 @@ func (w *podWatcher) cancelJob(ctx context.Context, log *zap.Logger, pod *corev1 w.ignoreJob(jobUUID) } -func (w *podWatcher) startOrStopJobCancelChecker(ctx context.Context, log *zap.Logger, pod *corev1.Pod, jobUUID uuid.UUID) { - switch pod.Status.Phase { - case corev1.PodPending: - w.startJobCancelChecker(ctx, log, pod.ObjectMeta, jobUUID) +// imageFailureChecker is a goroutine that periodically checks pending and +// running pods for container statuses such as ImagePullBackOff, +// ErrImageNeverPull, etc. +func (w *podWatcher) imageFailureChecker(ctx context.Context, log *zap.Logger) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + // continue below + } + + var failingPods []failingPod + + // Move failing pods from pendingPods to failingPods. + w.watchingForImageFailureMu.Lock() + for jobUUID, pod := range w.watchingForImageFailure { + statuses := w.podHasFailingImages(log, pod) + if len(statuses) == 0 { + continue + } + failingPods = append(failingPods, failingPod{jobUUID: jobUUID, pod: pod, statuses: statuses}) + delete(w.watchingForImageFailure, jobUUID) + } + w.watchingForImageFailureMu.Unlock() + + // Fail/cancel the corresponding jobs on Buildkite, and evict the + // pods. + for _, fp := range failingPods { + w.failForImageFailure(ctx, log, fp) + } + } +} + +// failingPod captures information about a pending or running pod that is now +// failing. +type failingPod struct { + jobUUID uuid.UUID + pod *corev1.Pod + statuses []corev1.ContainerStatus +} + +// failForImageFailure fails or cancels the corresponding job on Buildkite, and +// evicts the pod as needed. +func (w *podWatcher) failForImageFailure(ctx context.Context, log *zap.Logger, failingPod failingPod) { + jobUUID := failingPod.jobUUID + pod := failingPod.pod + statuses := failingPod.statuses + + // Get the current job state from BK. + // What we do next depends on what state it is in. + resp, err := api.GetCommandJob(ctx, w.gql, jobUUID.String()) + if err != nil { + log.Warn("Failed to query command job", zap.Error(err)) + return + } + job, ok := resp.Job.(*api.GetCommandJobJobJobTypeCommand) + if !ok { + log.Warn("Job was not a command job") + return + } + + log = log.With(zap.String("job_state", string(job.State))) + + switch job.State { + case api.JobStatesScheduled: + // We can acquire it and fail it ourselves. + log.Info("One or more job containers are waiting too long for images. Failing.") + message := w.formatImagePullFailureMessage(statuses) + switch err := acquireAndFailForObject(ctx, log, w.k8s, w.cfg, pod, message); { + case errors.Is(err, agentcore.ErrJobAcquisitionRejected): + podWatcherBuildkiteJobFailErrorsCounter.Inc() + // If the error was because BK rejected the job acquisition, then + // it's moved on to a state where we need to cancel instead. + // (The init container probably successfully pulled, but another + // pull of the same image later on failed after the agent started.) + log.Info("Attempting to cancel job instead") + w.cancelJob(ctx, log, pod, jobUUID) + return + + case err != nil: + podWatcherBuildkiteJobFailErrorsCounter.Inc() + + // Maybe the job was cancelled in the meantime? + log.Error("Could not fail Buildkite job", zap.Error(err)) + return + } + podWatcherBuildkiteJobFailsCounter.Inc() + // Also evict the pod, because it won't die on its own. + w.evictPod(ctx, log, pod, jobUUID) + + case api.JobStatesAccepted, api.JobStatesAssigned, api.JobStatesRunning: + // An agent is already doing something with the job - now canceling + // is the only lever available. + log.Info("One or more job containers are in ImagePullBackOff. Cancelling.") + w.cancelJob(ctx, log, pod, jobUUID) + + case api.JobStatesCanceling, api.JobStatesCanceled, api.JobStatesFinished, api.JobStatesSkipped: + // If the job is in one of these states, we can neither acquire nor + // cancel it (now or in the future). + log.Debug("Job not acquirable or cancelable") + w.ignoreJob(jobUUID) default: - // Running: the agent container has started or is about to start, and it - // can handle the cancellation and exit. - // Succeeded, Failed: it's already over. - // Unknown: probably shouldn't interfere. - w.stopJobCancelChecker(jobUUID) + // Most states don't make sense for a command job that we've started + // a pod for (e.g. blocked, broken, expired, pending, waiting, ...) + // Maybe the meanings of states has changed since this build? + // Log a message but don't do anything. + log.Warn("Job not in actionable state") } } @@ -499,7 +581,7 @@ func (w *podWatcher) jobCancelChecker(ctx context.Context, stopCh <-chan struct{ log.Warn("Job was not a command job") continue } - log = log.With(zap.String("job_state", string(job.State))) + log := log.With(zap.String("job_state", string(job.State))) switch job.State { case api.JobStatesCanceled, api.JobStatesCanceling: @@ -546,6 +628,18 @@ func (w *podWatcher) isIgnored(jobUUID uuid.UUID) bool { return ignore } +func (w *podWatcher) watchForImageFailure(jobUUID uuid.UUID, pod *corev1.Pod) { + w.watchingForImageFailureMu.Lock() + defer w.watchingForImageFailureMu.Unlock() + w.watchingForImageFailure[jobUUID] = pod +} + +func (w *podWatcher) stopWatchingForImageFailure(jobUUID uuid.UUID) { + w.watchingForImageFailureMu.Lock() + defer w.watchingForImageFailureMu.Unlock() + delete(w.watchingForImageFailure, jobUUID) +} + // onceChan stores a channel and a [sync.Once] to be used for closing the // channel at most once. type onceChan struct { diff --git a/internal/controller/scheduler/scheduler.go b/internal/controller/scheduler/scheduler.go index 23052031..19c3ac3b 100644 --- a/internal/controller/scheduler/scheduler.go +++ b/internal/controller/scheduler/scheduler.go @@ -1,12 +1,14 @@ package scheduler import ( + "cmp" "context" "encoding/json" "errors" "fmt" "maps" "net/url" + "slices" "strconv" "strings" "time" @@ -19,6 +21,8 @@ import ( "github.com/buildkite/agent/v3/clicommand" + "github.com/distribution/reference" + "github.com/jedib0t/go-pretty/v6/table" "go.uber.org/zap" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -30,29 +34,31 @@ import ( ) const ( - agentTokenKey = "BUILDKITE_AGENT_TOKEN" - AgentContainerName = "agent" - CopyAgentContainerName = "copy-agent" - ImagePullCheckContainerNamePrefix = "imagepullcheck-" - CheckoutContainerName = "checkout" + agentTokenKey = "BUILDKITE_AGENT_TOKEN" + AgentContainerName = "agent" + CopyAgentContainerName = "copy-agent" + ImageCheckContainerNamePrefix = "imagecheck-" + CheckoutContainerName = "checkout" ) var errK8sPluginProhibited = errors.New("the kubernetes plugin is prohibited by this controller, but was configured on this job") type Config struct { - Namespace string - Image string - AgentTokenSecretName string - JobTTL time.Duration - AdditionalRedactedVars []string - WorkspaceVolume *corev1.Volume - AgentConfig *config.AgentConfig - DefaultCheckoutParams *config.CheckoutParams - DefaultCommandParams *config.CommandParams - DefaultSidecarParams *config.SidecarParams - DefaultMetadata config.Metadata - PodSpecPatch *corev1.PodSpec - ProhibitK8sPlugin bool + Namespace string + Image string + AgentTokenSecretName string + JobTTL time.Duration + AdditionalRedactedVars []string + WorkspaceVolume *corev1.Volume + AgentConfig *config.AgentConfig + DefaultCheckoutParams *config.CheckoutParams + DefaultCommandParams *config.CommandParams + DefaultSidecarParams *config.SidecarParams + DefaultMetadata config.Metadata + DefaultImagePullPolicy corev1.PullPolicy + DefaultImageCheckPullPolicy corev1.PullPolicy + PodSpecPatch *corev1.PodSpec + ProhibitK8sPlugin bool } func New(logger *zap.Logger, client kubernetes.Interface, cfg Config) *worker { @@ -396,9 +402,10 @@ func (w *worker) Build(podSpec *corev1.PodSpec, skipCheckout bool, inputs buildI c.Command = []string{"/workspace/tini-static"} c.Args = []string{"--", "/workspace/buildkite-agent", "bootstrap"} - // The image *should* be present since we just pulled it with an init - // container, but weirder things have happened. - c.ImagePullPolicy = corev1.PullIfNotPresent + if c.ImagePullPolicy == "" { + c.ImagePullPolicy = w.cfg.DefaultImagePullPolicy + } + c.Env = append(c.Env, containerEnv...) c.Env = append(c.Env, corev1.EnvVar{ @@ -443,7 +450,7 @@ func (w *worker) Build(podSpec *corev1.PodSpec, skipCheckout bool, inputs buildI Args: []string{"--", "/workspace/buildkite-agent", "bootstrap"}, WorkingDir: "/workspace", VolumeMounts: volumeMounts, - ImagePullPolicy: corev1.PullIfNotPresent, + ImagePullPolicy: w.cfg.DefaultImagePullPolicy, Env: append(containerEnv, corev1.EnvVar{ Name: "BUILDKITE_COMMAND", @@ -548,52 +555,6 @@ func (w *worker) Build(podSpec *corev1.PodSpec, skipCheckout bool, inputs buildI ) } - // Init containers. These run in order before the regular containers. - // We run some init containers before any specified in the given podSpec. - // - // We use an init container to copy buildkite-agent into /workspace. - // We also use init containers to check that images can be pulled before - // any other containers run. - // - // Why not let Kubernetes worry about pulling images as needed? Well... - // If Kubernetes can't pull an image, the container stays in Waiting with - // ImagePullBackOff. But Kubernetes also tries to start containers ASAP. - // This behaviour is fine for when you are using Kubernetes to run services, - // such as a web server or database, because you are DevOps and are dealing - // with Kubernetes more directly. - // Since the agent, command, checkout etc are in separate containers, we can - // be in the awkward situation of having started a BK job with an agent - // running happily in the agent server container, but any of the other pod - // containers can still be waiting on an image that can't be pulled. - // - // Over here in the agent-stack-k8s controller, we can detect - // ImagePullBackOff using the k8s API (see imagePullBackOffWatcher.go) but - // our options for pulling the plug on a job that's already started are - // limited, because we can't steal responsibility for the job from the - // already-running agent. - // - // We can: - // * kill the agent container (agent lost, which looks weird) - // * use GraphQL to cancel the job, rely on the agent to count the - // containers that connected to it through the socket, and spit out an - // error in the log that is easy to miss. (This is what we used to do.) - // Both those options suck. - // - // So instead, we pull each required image in its own init container and - // set the entrypoint to the equivalent of "/bin/true". - // If the image pull fails, we can use agentcore to fail the job directly. - // This early detection approach is also useful in a CI/CD context since the - // user is more likely to be playing with pipeline configurations. - // - // The main downside to pre-pulling images with init containers is that - // init containers do not run in parallel, so Kubernetes might well decide - // not to pull them in parallel. Also there's no agent running to report - // that we're currently waiting for the image pull. (In the BK UI, the job - // will sit in "waiting for agent" for a bit.) - // - // TODO: investigate agent modifications to accept handover of a started - // job (i.e. make the controller acquire the job, log some k8s progress, - // then hand over the job token to the agent in the pod.) initContainers := []corev1.Container{ { // This container copies buildkite-agent and tini-static into @@ -614,31 +575,174 @@ func (w *worker) Build(podSpec *corev1.PodSpec, skipCheckout bool, inputs buildI }, } - // Pre-pull these images. (Note that even when specifying PullAlways, - // layers can still be cached on the node.) - preflightImagePulls := map[string]struct{}{} + // Only attempt the job once. + podSpec.RestartPolicy = corev1.RestartPolicyNever + + // Allow podSpec to be overridden by the controller config and the k8s plugin. + // Patch from the controller config is applied first. + if w.cfg.PodSpecPatch != nil { + patched, err := PatchPodSpec(podSpec, w.cfg.PodSpecPatch) + if err != nil { + return nil, fmt.Errorf("failed to apply podSpec patch from agent: %w", err) + } + podSpec = patched + w.logger.Debug("Applied podSpec patch from agent", zap.Any("patched", patched)) + } + + // If present, patch from the k8s plugin is applied second. + if inputs.k8sPlugin != nil && inputs.k8sPlugin.PodSpecPatch != nil { + patched, err := PatchPodSpec(podSpec, inputs.k8sPlugin.PodSpecPatch) + if err != nil { + return nil, fmt.Errorf("failed to apply podSpec patch from k8s plugin: %w", err) + } + podSpec = patched + w.logger.Debug("Applied podSpec patch from k8s plugin", zap.Any("patched", patched)) + } + + // Use init containers to check that images can be used or pulled before + // any other containers run. These are added _after_ podSpecPatch is applied + // since podSpecPatch may freely modify each container's image ref. + // This process also checks that image refs are well-formed so we can fail + // the job early. + // Init containers run before the agent, so we can acquire and fail the job + // if an init container stays in ImagePullBackOff for too long. + // + // Rank pull policy by preference. If two containers specify different pull + // policies, the most preferred policy is used for the image check. + // Always is most preferred, Never is least preferred. + var pullPolicyPreference = map[corev1.PullPolicy]int{ + corev1.PullNever: 1, + corev1.PullIfNotPresent: 2, + corev1.PullAlways: 3, + } + + // First decide which images to pre-check and what pull policy to use. + type policyAndRef struct { + policy corev1.PullPolicy + ref reference.Reference + } + type imageParseErr struct { + container string + image string + err error + } + var invalidRefs []imageParseErr + + preflightImageChecks := make(map[string]policyAndRef) for _, c := range podSpec.Containers { - preflightImagePulls[c.Image] = struct{}{} + ref, err := reference.Parse(c.Image) + if err != nil { + invalidRefs = append(invalidRefs, imageParseErr{container: c.Name, image: c.Image, err: err}) + continue + } + + pnr, has := preflightImageChecks[c.Image] + if !has || pnr.policy == "" { + preflightImageChecks[c.Image] = policyAndRef{ + policy: c.ImagePullPolicy, + ref: ref, + } + continue + } + + // Pick the more preferred pull policy of either what is present in + // preflightImagePulls or what the user set for c.ImagePullPolicy. + // Most to least preferred: Always, default, IfNotPresent, Never. + // (If two containers specify different policies, then to check that + // both will run, using the policy more likely to pull means the other + // is more likely to have an image if it succeeds, and more likely to + // fail quickly if the pull fails. + r := pullPolicyPreference[c.ImagePullPolicy] + s := pullPolicyPreference[pnr.policy] + if r > s { + preflightImageChecks[c.Image] = policyAndRef{ + policy: c.ImagePullPolicy, + ref: ref, + } + } } - for _, c := range podSpec.EphemeralContainers { - preflightImagePulls[c.Image] = struct{}{} + + // We don't need to add more init containers to check images when existing + // init containers will do this for us implicitly. + cullCheckForExisting := func(c corev1.Container) { + if _, err := reference.Parse(c.Image); err != nil { + invalidRefs = append(invalidRefs, imageParseErr{container: c.Name, image: c.Image, err: err}) + return + } + + pnr, has := preflightImageChecks[c.Image] + if !has { + return // not an image we'd add a check for + } + + r := pullPolicyPreference[c.ImagePullPolicy] + s := pullPolicyPreference[pnr.policy] + // If the existing init container has the same or higher preference than + // the image check container we would add, then no need to add it. + if r >= s { + delete(preflightImageChecks, c.Image) + } } - // w.cfg.Image is the first init container, so we don't need to add another - // container specifically to check it can pull. Same goes for user-supplied - // init containers. - delete(preflightImagePulls, w.cfg.Image) - for _, c := range podSpec.InitContainers { - delete(preflightImagePulls, c.Image) + + cullCheckForExisting(initContainers[0]) // the workspace setup container + for _, c := range podSpec.InitContainers { // user-defined init containers + cullCheckForExisting(c) + } + + // If any of the image refs were bad, return a single error with all of them + // in a sorted table. + if len(invalidRefs) > 0 { + if len(invalidRefs) == 1 { + ie := invalidRefs[0] + return nil, fmt.Errorf("%w %q for container %q", ie.err, ie.image, ie.container) + } + + slices.SortFunc(invalidRefs, func(a, b imageParseErr) int { + return cmp.Compare(a.container, b.container) + }) + tw := table.NewWriter() + tw.SetStyle(table.StyleColoredDark) + tw.AppendHeader(table.Row{"CONTAINER", "IMAGE REF", "PARSE ERROR"}) + tw.AppendSeparator() + for _, ie := range invalidRefs { + tw.AppendRow(table.Row{ + strconv.Quote(ie.container), // could also be invalid at this stage + strconv.Quote(ie.image), // it couldn't be parsed, so quote it + ie.err, + }) + } + return nil, errors.New("invalid image references\n\n" + tw.Render()) } + // Create the pre-flight image check containers. i := 0 - for image := range preflightImagePulls { - name := ImagePullCheckContainerNamePrefix + strconv.Itoa(i) - w.logger.Info("creating preflight image pull init container", zap.String("name", name), zap.String("image", image)) + for image, pnr := range preflightImageChecks { + name := ImageCheckContainerNamePrefix + strconv.Itoa(i) + policy := pnr.policy + if policy == "" { + policy = w.cfg.DefaultImageCheckPullPolicy + } + if policy == "" { + // If pull policy is unspecified by either the podSpec{,Patch} or + // controller configuration, use Always, unless there is a + // digest that has pinned the ref, in which case use IfNotPresent. + // (No need to pull a pinned image ref if it's present.) + policy = corev1.PullAlways + if _, hasDigest := pnr.ref.(reference.Digested); hasDigest { + policy = corev1.PullIfNotPresent + } + } + + w.logger.Debug( + "adding preflight image check init container", + zap.String("name", name), + zap.String("image", image), + zap.String("policy", string(policy)), + ) initContainers = append(initContainers, corev1.Container{ Name: name, Image: image, - ImagePullPolicy: corev1.PullAlways, + ImagePullPolicy: policy, // `tini-static --version` is sorta like `true`, but: // (a) always exists (we *just* copied it into /workspace), and // (b) is statically compiled, so should be compatible with whatever @@ -653,32 +757,9 @@ func (w *worker) Build(podSpec *corev1.PodSpec, skipCheckout bool, inputs buildI i++ } + // Prepend all the init containers defined above to the podspec. podSpec.InitContainers = append(initContainers, podSpec.InitContainers...) - // Only attempt the job once. - podSpec.RestartPolicy = corev1.RestartPolicyNever - - // Allow podSpec to be overridden by the agent configuration and the k8s plugin - - // Patch from the agent is applied first - if w.cfg.PodSpecPatch != nil { - patched, err := PatchPodSpec(podSpec, w.cfg.PodSpecPatch) - if err != nil { - return nil, fmt.Errorf("failed to apply podSpec patch from agent: %w", err) - } - podSpec = patched - w.logger.Debug("Applied podSpec patch from agent", zap.Any("patched", patched)) - } - - if inputs.k8sPlugin != nil && inputs.k8sPlugin.PodSpecPatch != nil { - patched, err := PatchPodSpec(podSpec, inputs.k8sPlugin.PodSpecPatch) - if err != nil { - return nil, fmt.Errorf("failed to apply podSpec patch from k8s plugin: %w", err) - } - podSpec = patched - w.logger.Debug("Applied podSpec patch from k8s plugin", zap.Any("patched", patched)) - } - kjob.Spec.Template.Spec = *podSpec return kjob, nil @@ -729,7 +810,7 @@ func (w *worker) createCheckoutContainer( Image: w.cfg.Image, WorkingDir: "/workspace", VolumeMounts: volumeMounts, - ImagePullPolicy: corev1.PullIfNotPresent, + ImagePullPolicy: w.cfg.DefaultImagePullPolicy, Env: []corev1.EnvVar{ { Name: "BUILDKITE_KUBERNETES_EXEC", diff --git a/internal/controller/scheduler/scheduler_test.go b/internal/controller/scheduler/scheduler_test.go index a23d7ae1..57ad8127 100644 --- a/internal/controller/scheduler/scheduler_test.go +++ b/internal/controller/scheduler/scheduler_test.go @@ -158,6 +158,7 @@ func TestJobPluginConversion(t *testing.T) { nil, scheduler.Config{ AgentTokenSecretName: "token-secret", + Image: "buildkite/agent:latest", }, ) inputs, err := worker.ParseJob(job) @@ -242,6 +243,7 @@ func TestTagEnv(t *testing.T) { nil, scheduler.Config{ AgentTokenSecretName: "token-secret", + Image: "buildkite/agent:latest", }, ) inputs, err := worker.ParseJob(job) @@ -275,7 +277,9 @@ func TestJobWithNoKubernetesPlugin(t *testing.T) { Command: "echo hello world", AgentQueryRules: []string{}, } - worker := scheduler.New(zaptest.NewLogger(t), nil, scheduler.Config{}) + worker := scheduler.New(zaptest.NewLogger(t), nil, scheduler.Config{ + Image: "buildkite/agent:latest", + }) inputs, err := worker.ParseJob(job) require.NoError(t, err) kjob, err := worker.Build(&corev1.PodSpec{}, false, inputs) diff --git a/internal/integration/fixtures/never-pull.yaml b/internal/integration/fixtures/never-pull.yaml new file mode 100644 index 00000000..7b42def8 --- /dev/null +++ b/internal/integration/fixtures/never-pull.yaml @@ -0,0 +1,12 @@ +steps: + - label: ":x:" + agents: + queue: "{{.queue}}" + plugins: + - kubernetes: + podSpec: + containers: + - image: buildkite/agent-extreme:never # will not exist + imagePullPolicy: Never # should lead to ErrImageNeverPull + command: + - echo 'this ought to fail' diff --git a/internal/integration/integration_test.go b/internal/integration/integration_test.go index 0d006853..f76bde94 100644 --- a/internal/integration/integration_test.go +++ b/internal/integration/integration_test.go @@ -441,7 +441,28 @@ func TestImagePullBackOffFailed(t *testing.T) { time.Sleep(5 * time.Second) // trying to reduce flakes: logs not immediately available logs := tc.FetchLogs(build) assert.Contains(t, logs, "other job has run") - assert.Contains(t, logs, "The following container images couldn't be pulled:\n * \"buildkite/non-existant-image:latest\"") + assert.Contains(t, logs, "The following images could not be pulled or were unavailable:\n") + assert.Contains(t, logs, `"buildkite/non-existant-image:latest"`) + assert.Contains(t, logs, "ImagePullBackOff") +} + +func TestPullPolicyNeverMissingImage(t *testing.T) { + tc := testcase{ + T: t, + Fixture: "never-pull.yaml", + Repo: repoHTTP, + GraphQL: api.NewClient(cfg.BuildkiteToken, cfg.GraphQLEndpoint), + }.Init() + ctx := context.Background() + pipelineID := tc.PrepareQueueAndPipelineWithCleanup(ctx) + tc.StartController(ctx, cfg) + build := tc.TriggerBuild(ctx, pipelineID) + tc.AssertFail(ctx, build) + time.Sleep(5 * time.Second) // trying to reduce flakes: logs not immediately available + logs := tc.FetchLogs(build) + assert.Contains(t, logs, "The following images could not be pulled or were unavailable:\n") + assert.Contains(t, logs, `"buildkite/agent-extreme:never"`) + assert.Contains(t, logs, "ErrImageNeverPull") } func TestBrokenInitContainer(t *testing.T) { @@ -475,7 +496,7 @@ func TestInvalidImageRefFormat(t *testing.T) { build := tc.TriggerBuild(ctx, pipelineID) tc.AssertFail(ctx, build) time.Sleep(5 * time.Second) // trying to reduce flakes: logs not immediately available - tc.AssertLogsContain(build, "The following container images couldn't be pulled:\n * \"buildkite/agent:latest plus some extra junk\"") + tc.AssertLogsContain(build, `invalid reference format "buildkite/agent:latest plus some extra junk" for container "container-0"`) } func TestArtifactsUploadFailedJobs(t *testing.T) {