Skip to content

Commit

Permalink
Fix data race issue on copyFn when there are multiple containers or i…
Browse files Browse the repository at this point in the history
…nitContainers (#422)

Fixes #421 

Creates a new `ImageCopier` object with its own `imagePullPolicy` to
avoid a data race on the `container` object.

This new object is dedicated to the image copy process as it was done
before but with its own subcontext and abstracted from the core mutation
webhook processing.

A new configurable `imageCopyDeadline` parameter has been added
alongside an implementation of timeout throughout the different steps of
the image copy. Every function called as part of the copy has been
updated to include context evaluation and support abortion in case of
context timeout.
  • Loading branch information
Caomoji authored Jan 26, 2023
1 parent f0f6439 commit a133c78
Show file tree
Hide file tree
Showing 14 changed files with 416 additions and 132 deletions.
6 changes: 6 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ A mutating webhook for Kubernetes, pointing the images to a new location.`,
log.Err(err).Str("policy", cfg.ImageCopyPolicy).Msg("parsing image copy policy failed")
}

imageCopyDeadline := config.DefaultImageCopyDeadline
if cfg.ImageCopyDeadline != 0 {
imageCopyDeadline = cfg.ImageCopyDeadline
}

imagePullSecretProvider := setupImagePullSecretsProvider()

wh, err := webhook.NewImageSwapperWebhookWithOpts(
Expand All @@ -89,6 +94,7 @@ A mutating webhook for Kubernetes, pointing the images to a new location.`,
webhook.ImagePullSecretsProvider(imagePullSecretProvider),
webhook.ImageSwapPolicy(imageSwapPolicy),
webhook.ImageCopyPolicy(imageCopyPolicy),
webhook.ImageCopyDeadline(imageCopyDeadline),
)
if err != nil {
log.Err(err).Msg("error creating webhook")
Expand Down
9 changes: 7 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ The option `imageSwapPolicy` (default: `exists`) defines the mutation strategy u
The option `imageCopyPolicy` (default: `delayed`) defines the image copy strategy used.

* `delayed`: Submits the copy job to a process queue and moves on.
* `immediate`: Submits the copy job to a process queue and waits for it to finish (deadline 8s).
* `force`: Attempts to immediately copy the image (deadline 8s).
* `immediate`: Submits the copy job to a process queue and waits for it to finish (deadline defined by `imageCopyDeadline`).
* `force`: Attempts to immediately copy the image (deadline defined by `imageCopyDeadline`).

## ImageCopyDeadline

The option `imageCopyDeadline` (default: `8s`) defines the duration after which the image copy if aborted.

This option only applies for `immediate` and `force` image copy strategies.


## Source
Expand Down
15 changes: 10 additions & 5 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,24 @@ package config

import (
"fmt"
"time"
)

const DefaultImageCopyDeadline = 8 * time.Second

type Config struct {
LogLevel string `yaml:"logLevel" validate:"oneof=trace debug info warn error fatal"`
LogFormat string `yaml:"logFormat" validate:"oneof=json console"`

ListenAddress string

DryRun bool `yaml:"dryRun"`
ImageSwapPolicy string `yaml:"imageSwapPolicy" validate:"oneof=always exists"`
ImageCopyPolicy string `yaml:"imageCopyPolicy" validate:"oneof=delayed immediate force"`
Source Source `yaml:"source"`
Target Target `yaml:"target"`
DryRun bool `yaml:"dryRun"`
ImageSwapPolicy string `yaml:"imageSwapPolicy" validate:"oneof=always exists"`
ImageCopyPolicy string `yaml:"imageCopyPolicy" validate:"oneof=delayed immediate force"`
ImageCopyDeadline time.Duration `yaml:"imageCopyDeadline"`

Source Source `yaml:"source"`
Target Target `yaml:"target"`

TLSCertFile string
TLSKeyFile string
Expand Down
6 changes: 4 additions & 2 deletions pkg/registry/client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package registry

import "context"

// Client provides methods required to be implemented by the various target registry clients, e.g. ECR, Docker, Quay.
type Client interface {
CreateRepository(string) error
CreateRepository(ctx context.Context, name string) error
RepositoryExists() bool
CopyImage() error
PullImage() error
PutImage() error
ImageExists(ref string) bool
ImageExists(ctx context.Context, ref string) bool

// Endpoint returns the domain of the registry
Endpoint() string
Expand Down
19 changes: 8 additions & 11 deletions pkg/registry/ecr.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package registry

import (
"context"
"encoding/base64"
"net/http"
"os/exec"
Expand All @@ -18,8 +19,6 @@ import (
"github.com/rs/zerolog/log"
)

var execCommand = exec.Command

type ECRClient struct {
client ecriface.ECRAPI
ecrDomain string
Expand All @@ -36,12 +35,12 @@ func (e *ECRClient) Credentials() string {
return string(e.authToken)
}

func (e *ECRClient) CreateRepository(name string) error {
func (e *ECRClient) CreateRepository(ctx context.Context, name string) error {
if _, found := e.cache.Get(name); found {
return nil
}

_, err := e.client.CreateRepository(&ecr.CreateRepositoryInput{
_, err := e.client.CreateRepositoryWithContext(ctx, &ecr.CreateRepositoryInput{
RepositoryName: aws.String(name),
ImageScanningConfiguration: &ecr.ImageScanningConfiguration{
ScanOnPush: aws.Bool(true),
Expand All @@ -68,7 +67,7 @@ func (e *ECRClient) CreateRepository(name string) error {

if len(e.accessPolicy) > 0 {
log.Debug().Str("repo", name).Str("accessPolicy", e.accessPolicy).Msg("setting access policy on repo")
_, err := e.client.SetRepositoryPolicy(&ecr.SetRepositoryPolicyInput{
_, err := e.client.SetRepositoryPolicyWithContext(ctx, &ecr.SetRepositoryPolicyInput{
PolicyText: &e.accessPolicy,
RegistryId: &e.targetAccount,
RepositoryName: aws.String(name),
Expand All @@ -82,7 +81,7 @@ func (e *ECRClient) CreateRepository(name string) error {

if len(e.lifecyclePolicy) > 0 {
log.Debug().Str("repo", name).Str("lifecyclePolicy", e.lifecyclePolicy).Msg("setting lifecycle policy on repo")
_, err := e.client.PutLifecyclePolicy(&ecr.PutLifecyclePolicyInput{
_, err := e.client.PutLifecyclePolicyWithContext(ctx, &ecr.PutLifecyclePolicyInput{
LifecyclePolicyText: &e.lifecyclePolicy,
RegistryId: &e.targetAccount,
RepositoryName: aws.String(name),
Expand Down Expand Up @@ -130,7 +129,7 @@ func (e *ECRClient) PutImage() error {
panic("implement me")
}

func (e *ECRClient) ImageExists(ref string) bool {
func (e *ECRClient) ImageExists(ctx context.Context, ref string) bool {
if _, found := e.cache.Get(ref); found {
return true
}
Expand All @@ -143,10 +142,8 @@ func (e *ECRClient) ImageExists(ref string) bool {
"--creds", e.Credentials(),
}

log.Trace().Str("app", app).Strs("args", args).Msg("executing command to inspect image")
cmd := execCommand(app, args...)

if _, err := cmd.Output(); err != nil {
log.Ctx(ctx).Trace().Str("app", app).Strs("args", args).Msg("executing command to inspect image")
if err := exec.CommandContext(ctx, app, args...).Run(); err != nil {
return false
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/secrets/dummy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package secrets

import v1 "k8s.io/api/core/v1"
import (
"context"

v1 "k8s.io/api/core/v1"
)

// DummyImagePullSecretsProvider does nothing
type DummyImagePullSecretsProvider struct {
Expand All @@ -12,6 +16,6 @@ func NewDummyImagePullSecretsProvider() ImagePullSecretsProvider {
}

// GetImagePullSecrets returns an empty ImagePullSecretsResult
func (p *DummyImagePullSecretsProvider) GetImagePullSecrets(pod *v1.Pod) (*ImagePullSecretsResult, error) {
func (p *DummyImagePullSecretsProvider) GetImagePullSecrets(ctx context.Context, pod *v1.Pod) (*ImagePullSecretsResult, error) {
return NewImagePullSecretsResult(), nil
}
3 changes: 2 additions & 1 deletion pkg/secrets/dummy_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package secrets

import (
"context"
"reflect"
"testing"

Expand Down Expand Up @@ -41,7 +42,7 @@ func TestDummyImagePullSecretsProvider_GetImagePullSecrets(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &DummyImagePullSecretsProvider{}
got, err := p.GetImagePullSecrets(tt.args.pod)
got, err := p.GetImagePullSecrets(context.Background(), tt.args.pod)
if (err != nil) != tt.wantErr {
t.Errorf("GetImagePullSecrets() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
10 changes: 5 additions & 5 deletions pkg/secrets/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,17 @@ func NewKubernetesImagePullSecretsProvider(clientset kubernetes.Interface) Image
}

// GetImagePullSecrets returns all secrets with their respective content
func (p *KubernetesImagePullSecretsProvider) GetImagePullSecrets(pod *v1.Pod) (*ImagePullSecretsResult, error) {
func (p *KubernetesImagePullSecretsProvider) GetImagePullSecrets(ctx context.Context, pod *v1.Pod) (*ImagePullSecretsResult, error) {
var secrets = make(map[string][]byte)

imagePullSecrets := pod.Spec.ImagePullSecrets

// retrieve secret names from pod ServiceAccount (spec.imagePullSecrets)
serviceAccount, err := p.kubernetesClient.CoreV1().
ServiceAccounts(pod.Namespace).
Get(context.TODO(), pod.Spec.ServiceAccountName, metav1.GetOptions{})
Get(ctx, pod.Spec.ServiceAccountName, metav1.GetOptions{})
if err != nil {
log.Err(err).Msg("error fetching referenced service account, continue without service account imagePullSecrets")
log.Ctx(ctx).Warn().Msg("error fetching referenced service account, continue without service account imagePullSecrets")
}

if serviceAccount != nil {
Expand All @@ -86,9 +86,9 @@ func (p *KubernetesImagePullSecretsProvider) GetImagePullSecrets(pod *v1.Pod) (*
continue
}

secret, err := p.kubernetesClient.CoreV1().Secrets(pod.Namespace).Get(context.TODO(), imagePullSecret.Name, metav1.GetOptions{})
secret, err := p.kubernetesClient.CoreV1().Secrets(pod.Namespace).Get(ctx, imagePullSecret.Name, metav1.GetOptions{})
if err != nil {
log.Err(err).Msg("error fetching secret, continue without imagePullSecrets")
log.Ctx(ctx).Err(err).Msg("error fetching secret, continue without imagePullSecrets")
}

if secret == nil || secret.Type != v1.SecretTypeDockerConfigJson {
Expand Down
2 changes: 1 addition & 1 deletion pkg/secrets/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestKubernetesCredentialProvider_GetImagePullSecrets(t *testing.T) {
_, _ = clientSet.CoreV1().Secrets("test-ns").Create(context.TODO(), podSecret, metav1.CreateOptions{})

provider := NewKubernetesImagePullSecretsProvider(clientSet)
result, err := provider.GetImagePullSecrets(pod)
result, err := provider.GetImagePullSecrets(context.Background(), pod)

assert.NoError(t, err)
assert.NotNil(t, result)
Expand Down
8 changes: 6 additions & 2 deletions pkg/secrets/provider.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package secrets

import v1 "k8s.io/api/core/v1"
import (
"context"

v1 "k8s.io/api/core/v1"
)

type ImagePullSecretsProvider interface {
GetImagePullSecrets(pod *v1.Pod) (*ImagePullSecretsResult, error)
GetImagePullSecrets(ctx context.Context, pod *v1.Pod) (*ImagePullSecretsResult, error)
}
Loading

0 comments on commit a133c78

Please sign in to comment.