From e53bdc8ec06199a70273be5425e4d947c81c7ea3 Mon Sep 17 00:00:00 2001 From: joey Date: Sat, 8 Apr 2023 18:31:08 +0800 Subject: [PATCH] The cancellation of taskruns is now done through the entrypoint binary through a new flag called 'stop_on_cancel'. This removes the need for deleting the pods to cancel a taskrun, allowing examination of the logs on the pods from cancelled taskruns. Part of work on issue #3238 Signed-off-by: chengjoey --- cmd/entrypoint/main.go | 20 +++- cmd/entrypoint/runner.go | 5 +- cmd/entrypoint/runner_test.go | 21 +++- cmd/entrypoint/waiter.go | 16 ++- cmd/entrypoint/waiter_test.go | 71 ++++++++++- docs/taskruns.md | 3 + pkg/apis/config/feature_flags.go | 8 ++ pkg/apis/config/feature_flags_test.go | 1 + .../testdata/feature-flags-all-flags-set.yaml | 1 + pkg/entrypoint/entrypointer.go | 65 ++++++++-- pkg/entrypoint/entrypointer_test.go | 111 +++++++++++++++++- pkg/pod/entrypoint.go | 46 +++++++- pkg/pod/entrypoint_test.go | 49 ++++++-- pkg/pod/pod.go | 11 +- pkg/pod/pod_test.go | 62 ++++++++++ pkg/reconciler/taskrun/taskrun.go | 15 ++- 16 files changed, 466 insertions(+), 39 deletions(-) diff --git a/cmd/entrypoint/main.go b/cmd/entrypoint/main.go index f13c512e584..f62528c2275 100644 --- a/cmd/entrypoint/main.go +++ b/cmd/entrypoint/main.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "context" "encoding/json" "errors" "flag" @@ -58,6 +59,7 @@ var ( enableSpire = flag.Bool("enable_spire", false, "If specified by configmap, this enables spire signing and verification") socketPath = flag.String("spire_socket_path", "unix:///spiffe-workload-api/spire-agent.sock", "Experimental: The SPIRE agent socket for SPIFFE workload API.") resultExtractionMethod = flag.String("result_from", featureFlags.ResultExtractionMethodTerminationMessage, "The method using which to extract results from tasks. Default is using the termination message.") + stopOnCancel = flag.Bool("stop_on_cancel", false, "If specified, stop the step when the taskrun is cancelled") ) const ( @@ -137,6 +139,11 @@ func main() { } } + ctx := context.Background() + var cancel context.CancelFunc + ctx, cancel = context.WithCancel(ctx) + defer cancel() + var spireWorkloadAPI spire.EntrypointerAPIClient if enableSpire != nil && *enableSpire && socketPath != nil && *socketPath != "" { spireConfig := config.SpireConfig{ @@ -146,12 +153,14 @@ func main() { } e := entrypoint.Entrypointer{ + Ctx: ctx, + Cancel: cancel, Command: append(cmd, commandArgs...), WaitFiles: strings.Split(*waitFiles, ","), WaitFileContent: *waitFileContent, PostFile: *postFile, TerminationPath: *terminationPath, - Waiter: &realWaiter{waitPollingInterval: defaultWaitPollingInterval, breakpointOnFailure: *breakpointOnFailure}, + Waiter: &realWaiter{ctx: ctx, waitPollingInterval: defaultWaitPollingInterval, breakpointOnFailure: *breakpointOnFailure}, Runner: &realRunner{ stdoutPath: *stdoutPath, stderrPath: *stderrPath, @@ -164,6 +173,7 @@ func main() { StepMetadataDir: *stepMetadataDir, SpireWorkloadAPI: spireWorkloadAPI, ResultExtractionMethod: *resultExtractionMethod, + StopOnCancel: *stopOnCancel, } // Copy any creds injected by the controller into the $HOME directory of the current @@ -181,6 +191,14 @@ func main() { case termination.MessageLengthError: log.Print(err.Error()) os.Exit(1) + case entrypoint.ContextError: + if errors.Is(err, entrypoint.ContextCanceledError) { + log.Print("Step was cancelled") + os.Exit(int(syscall.SIGKILL)) + } else { + log.Print(err.Error()) + os.Exit(1) + } case *exec.ExitError: // Copied from https://stackoverflow.com/questions/10385551/get-exit-code-go // This works on both Unix and Windows. Although diff --git a/cmd/entrypoint/runner.go b/cmd/entrypoint/runner.go index c9031e4ae73..89e8c825a7f 100644 --- a/cmd/entrypoint/runner.go +++ b/cmd/entrypoint/runner.go @@ -136,7 +136,10 @@ func (rr *realRunner) Run(ctx context.Context, args ...string) error { // Wait for command to exit if err := cmd.Wait(); err != nil { if errors.Is(ctx.Err(), context.DeadlineExceeded) { - return context.DeadlineExceeded + return entrypoint.ContextDeadlineExceededError + } + if errors.Is(ctx.Err(), context.Canceled) { + return entrypoint.ContextCanceledError } return err } diff --git a/cmd/entrypoint/runner_test.go b/cmd/entrypoint/runner_test.go index b2af31eb1da..bcc485b4e5b 100644 --- a/cmd/entrypoint/runner_test.go +++ b/cmd/entrypoint/runner_test.go @@ -28,6 +28,8 @@ import ( "syscall" "testing" "time" + + "github.com/tektoncd/pipeline/pkg/entrypoint" ) // TestRealRunnerSignalForwarding will artificially put an interrupt signal (SIGINT) in the rr.signals chan. @@ -183,10 +185,27 @@ func TestRealRunnerTimeout(t *testing.T) { defer cancel() if err := rr.Run(ctx, "sleep", "0.01"); err != nil { - if !errors.Is(err, context.DeadlineExceeded) { + if !errors.Is(err, entrypoint.ContextDeadlineExceededError) { t.Fatalf("unexpected error received: %v", err) } } else { t.Fatalf("step didn't timeout") } } + +func TestRealRunnerCanceled(t *testing.T) { + rr := realRunner{} + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(1 * time.Second) + cancel() + }() + + if err := rr.Run(ctx, "sleep", "3"); err != nil { + if !errors.Is(err, entrypoint.ContextCanceledError) { + t.Fatalf("unexpected error received: %v", err) + } + } else { + t.Fatalf("step didn't cancel") + } +} diff --git a/cmd/entrypoint/waiter.go b/cmd/entrypoint/waiter.go index 29ebf9b4441..4fd659c2435 100644 --- a/cmd/entrypoint/waiter.go +++ b/cmd/entrypoint/waiter.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "context" "fmt" "os" "time" @@ -26,6 +27,7 @@ import ( // realWaiter actually waits for files, by polling. type realWaiter struct { + ctx context.Context waitPollingInterval time.Duration breakpointOnFailure bool } @@ -51,7 +53,19 @@ func (rw *realWaiter) Wait(file string, expectContent bool, breakpointOnFailure if file == "" { return nil } - for ; ; time.Sleep(rw.waitPollingInterval) { + for { + select { + case <-rw.ctx.Done(): + switch rw.ctx.Err() { + case context.Canceled: + return entrypoint.ContextCanceledError + case context.DeadlineExceeded: + return entrypoint.ContextDeadlineExceededError + default: + } + return nil + case <-time.After(rw.waitPollingInterval): + } if info, err := os.Stat(file); err == nil { if !expectContent || info.Size() > 0 { return nil diff --git a/cmd/entrypoint/waiter_test.go b/cmd/entrypoint/waiter_test.go index 422211802bc..17281e00605 100644 --- a/cmd/entrypoint/waiter_test.go +++ b/cmd/entrypoint/waiter_test.go @@ -17,11 +17,14 @@ limitations under the License. package main import ( + "context" "errors" "os" "strings" "testing" "time" + + "github.com/tektoncd/pipeline/pkg/entrypoint" ) const testWaitPollingInterval = 50 * time.Millisecond @@ -35,7 +38,7 @@ func TestRealWaiterWaitMissingFile(t *testing.T) { t.Errorf("error creating temp file: %v", err) } os.Remove(tmp.Name()) - rw := realWaiter{} + rw := realWaiter{ctx: context.Background()} doneCh := make(chan struct{}) go func() { err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmp.Name(), false, false) @@ -63,7 +66,7 @@ func TestRealWaiterWaitWithFile(t *testing.T) { t.Errorf("error creating temp file: %v", err) } defer os.Remove(tmp.Name()) - rw := realWaiter{} + rw := realWaiter{ctx: context.Background()} doneCh := make(chan struct{}) go func() { err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmp.Name(), false, false) @@ -87,7 +90,7 @@ func TestRealWaiterWaitMissingContent(t *testing.T) { t.Errorf("error creating temp file: %v", err) } defer os.Remove(tmp.Name()) - rw := realWaiter{} + rw := realWaiter{ctx: context.Background()} doneCh := make(chan struct{}) go func() { err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmp.Name(), true, false) @@ -114,7 +117,7 @@ func TestRealWaiterWaitWithContent(t *testing.T) { t.Errorf("error creating temp file: %v", err) } defer os.Remove(tmp.Name()) - rw := realWaiter{} + rw := realWaiter{ctx: context.Background()} doneCh := make(chan struct{}) go func() { err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmp.Name(), true, false) @@ -142,7 +145,7 @@ func TestRealWaiterWaitWithErrorWaitfile(t *testing.T) { } tmpFileName := strings.Replace(tmp.Name(), ".err", "", 1) defer os.Remove(tmp.Name()) - rw := realWaiter{} + rw := realWaiter{ctx: context.Background()} doneCh := make(chan struct{}) go func() { // error of type skipError is returned after encountering a error waitfile @@ -173,7 +176,7 @@ func TestRealWaiterWaitWithBreakpointOnFailure(t *testing.T) { } tmpFileName := strings.Replace(tmp.Name(), ".err", "", 1) defer os.Remove(tmp.Name()) - rw := realWaiter{} + rw := realWaiter{ctx: context.Background()} doneCh := make(chan struct{}) go func() { // When breakpoint on failure is enabled skipError shouldn't be returned for a error waitfile @@ -191,3 +194,59 @@ func TestRealWaiterWaitWithBreakpointOnFailure(t *testing.T) { t.Errorf("expected Wait() to have detected a non-zero file size by now") } } + +func TestRealWaiterWaitWithContextCanceled(t *testing.T) { + tmp, err := os.CreateTemp("", "real_waiter_test_file") + if err != nil { + t.Errorf("error creating temp file: %v", err) + } + defer os.Remove(tmp.Name()) + ctx, cancel := context.WithCancel(context.Background()) + rw := realWaiter{ctx: ctx} + errCh := make(chan error) + go func() { + err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmp.Name(), true, false) + if err == nil { + t.Errorf("expected context canceled error") + } + errCh <- err + }() + cancel() + delay := time.NewTimer(2 * testWaitPollingInterval) + select { + case err := <-errCh: + if !errors.Is(err, entrypoint.ContextCanceledError) { + t.Errorf("expected ContextCanceledError, got %T", err) + } + case <-delay.C: + t.Errorf("expected Wait() to have a ContextCanceledError") + } +} + +func TestRealWaiterWaitWithTimeout(t *testing.T) { + tmp, err := os.CreateTemp("", "real_waiter_test_file") + if err != nil { + t.Errorf("error creating temp file: %v", err) + } + defer os.Remove(tmp.Name()) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) + defer cancel() + rw := realWaiter{ctx: ctx} + errCh := make(chan error) + go func() { + err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmp.Name(), true, false) + if err == nil { + t.Errorf("expected context deadline error") + } + errCh <- err + }() + delay := time.NewTimer(2 * time.Second) + select { + case err := <-errCh: + if !errors.Is(err, entrypoint.ContextDeadlineExceededError) { + t.Errorf("expected ContextDeadlineExceededError, got %T", err) + } + case <-delay.C: + t.Errorf("expected Wait() to have a ContextDeadlineExceededError") + } +} diff --git a/docs/taskruns.md b/docs/taskruns.md index 2c7b9fc897d..4dae3c0f237 100644 --- a/docs/taskruns.md +++ b/docs/taskruns.md @@ -803,6 +803,9 @@ When you cancel a TaskRun, the running pod associated with that `TaskRun` is del means that the logs of the `TaskRun` are not preserved. The deletion of the `TaskRun` pod is necessary in order to stop `TaskRun` step containers from running. +**Note: if `enable-cancel-using-entrypoint` is set to +`"true"` in the `feature-flags`, the pod associated with that `TaskRun` will not be deleted** + Example of cancelling a `TaskRun`: ```yaml diff --git a/pkg/apis/config/feature_flags.go b/pkg/apis/config/feature_flags.go index aa532fafdf9..3685a2405e0 100644 --- a/pkg/apis/config/feature_flags.go +++ b/pkg/apis/config/feature_flags.go @@ -62,6 +62,8 @@ const ( DefaultEnableAPIFields = StableAPIFields // DefaultSendCloudEventsForRuns is the default value for "send-cloudevents-for-runs". DefaultSendCloudEventsForRuns = false + // DefaultEnableCancelUsingEntrypoint is the default value for "enable-cancel-using-entrypoint" + DefaultEnableCancelUsingEntrypoint = false // EnforceNonfalsifiabilityWithSpire is the value used for "enable-nonfalsifiability" when SPIRE is used to enable non-falsifiability. EnforceNonfalsifiabilityWithSpire = "spire" // EnforceNonfalsifiabilityNone is the value used for "enable-nonfalsifiability" when non-falsifiability is not enabled. @@ -76,6 +78,8 @@ const ( DefaultResultExtractionMethod = ResultExtractionMethodTerminationMessage // DefaultMaxResultSize is the default value in bytes for the size of a result DefaultMaxResultSize = 4096 + // EnableCancelUsingEntrypoint is the flag used to enable cancelling a pod using the entrypoint + EnableCancelUsingEntrypoint = "enable-cancel-using-entrypoint" disableAffinityAssistantKey = "disable-affinity-assistant" disableCredsInitKey = "disable-creds-init" @@ -105,6 +109,7 @@ type FeatureFlags struct { SendCloudEventsForRuns bool AwaitSidecarReadiness bool EnforceNonfalsifiability string + EnableCancelUsingEntrypoint bool // VerificationNoMatchPolicy is the feature flag for "trusted-resources-verification-no-match-policy" // VerificationNoMatchPolicy can be set to "ignore", "warn" and "fail" values. // ignore: skip trusted resources verification when no matching verification policies found @@ -190,6 +195,9 @@ func NewFeatureFlagsFromMap(cfgMap map[string]string) (*FeatureFlags, error) { if err := setMaxResultSize(cfgMap, DefaultMaxResultSize, &tc.MaxResultSize); err != nil { return nil, err } + if err := setFeature(EnableCancelUsingEntrypoint, DefaultEnableCancelUsingEntrypoint, &tc.EnableCancelUsingEntrypoint); err != nil { + return nil, err + } // Given that they are alpha features, Tekton Bundles and Custom Tasks should be switched on if // enable-api-fields is "alpha". If enable-api-fields is not "alpha" then fall back to the value of diff --git a/pkg/apis/config/feature_flags_test.go b/pkg/apis/config/feature_flags_test.go index c63d7d8a922..73fca325dc1 100644 --- a/pkg/apis/config/feature_flags_test.go +++ b/pkg/apis/config/feature_flags_test.go @@ -68,6 +68,7 @@ func TestNewFeatureFlagsFromConfigMap(t *testing.T) { EnableProvenanceInStatus: true, ResultExtractionMethod: "termination-message", MaxResultSize: 4096, + EnableCancelUsingEntrypoint: true, }, fileName: "feature-flags-all-flags-set", }, diff --git a/pkg/apis/config/testdata/feature-flags-all-flags-set.yaml b/pkg/apis/config/testdata/feature-flags-all-flags-set.yaml index 07f5f33a9de..4259a6282ba 100644 --- a/pkg/apis/config/testdata/feature-flags-all-flags-set.yaml +++ b/pkg/apis/config/testdata/feature-flags-all-flags-set.yaml @@ -29,3 +29,4 @@ data: enforce-nonfalsifiability: "spire" trusted-resources-verification-no-match-policy: "fail" enable-provenance-in-status: "true" + enable-cancel-using-entrypoint: "true" diff --git a/pkg/entrypoint/entrypointer.go b/pkg/entrypoint/entrypointer.go index 6d7273b5a14..ebd9da6b729 100644 --- a/pkg/entrypoint/entrypointer.go +++ b/pkg/entrypoint/entrypointer.go @@ -26,11 +26,13 @@ import ( "path/filepath" "strconv" "strings" + "syscall" "time" "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + "github.com/tektoncd/pipeline/pkg/pod" "github.com/tektoncd/pipeline/pkg/spire" "github.com/tektoncd/pipeline/pkg/termination" "go.uber.org/zap" @@ -43,9 +45,25 @@ const ( FailOnError = "stopAndFail" ) +type ContextError string + +func (e ContextError) Error() string { + return string(e) +} + +var ( + ContextDeadlineExceededError = ContextError(context.DeadlineExceeded.Error()) + ContextCanceledError = ContextError(context.Canceled.Error()) +) + // Entrypointer holds fields for running commands with redirected // entrypoints. type Entrypointer struct { + // Ctx context, used to pass signals + Ctx context.Context + // Cancel is a function to cancel the entrypoint + Cancel context.CancelFunc + // Command is the original specified command and args. Command []string @@ -87,6 +105,9 @@ type Entrypointer struct { ResultsDirectory string // ResultExtractionMethod is the method using which the controller extracts the results from the task pod. ResultExtractionMethod string + + // StopOnCancel indicates if the entrypoint should stop the command when taskrun is canceled + StopOnCancel bool } // Waiter encapsulates waiting for files to exist. @@ -120,6 +141,13 @@ func (e Entrypointer) Go() error { _ = logger.Sync() }() + // start a goroutine to listen for cancellation signals + go func() { + if err := e.waitingCancellation(); err != nil { + logger.Error("Error while waiting for cancellation", zap.Error(err)) + } + }() + for _, f := range e.WaitFiles { if err := e.Waiter.Wait(f, e.WaitFileContent, e.BreakpointOnFailure); err != nil { // An error happened while waiting, so we bail @@ -143,21 +171,16 @@ func (e Entrypointer) Go() error { ResultType: v1beta1.InternalTektonResultType, }) - ctx := context.Background() var err error - if e.Timeout != nil && *e.Timeout < time.Duration(0) { err = fmt.Errorf("negative timeout specified") } - if err == nil { - var cancel context.CancelFunc - if e.Timeout != nil && *e.Timeout != time.Duration(0) { - ctx, cancel = context.WithTimeout(ctx, *e.Timeout) - defer cancel() + if e.Timeout != nil && *e.Timeout > time.Duration(0) { + e.Ctx, e.Cancel = context.WithTimeout(e.Ctx, *e.Timeout) } - err = e.Runner.Run(ctx, e.Command...) - if errors.Is(err, context.DeadlineExceeded) { + err = e.Runner.Run(e.Ctx, e.Command...) + if errors.Is(err, ContextDeadlineExceededError) { output = append(output, v1beta1.RunResult{ Key: "Reason", Value: "TimeoutExceeded", @@ -168,6 +191,15 @@ func (e Entrypointer) Go() error { var ee *exec.ExitError switch { + case err != nil && errors.Is(err, ContextCanceledError): + logger.Info("Step was canceling") + output = append(output, v1beta1.RunResult{ + Key: "Reason", + Value: "Cancelled", + ResultType: v1beta1.InternalTektonResultType, + }) + e.WritePostFile(e.PostFile, e.Ctx.Err()) + e.WriteExitCodeFile(e.StepMetadataDir, syscall.SIGKILL.String()) case err != nil && e.BreakpointOnFailure: logger.Info("Skipping writing to PostFile") case e.OnError == ContinueOnError && errors.As(err, &ee): @@ -196,7 +228,7 @@ func (e Entrypointer) Go() error { if e.ResultsDirectory != "" { resultPath = e.ResultsDirectory } - if err := e.readResultsFromDisk(ctx, resultPath); err != nil { + if err := e.readResultsFromDisk(e.Ctx, resultPath); err != nil { logger.Fatalf("Error while handling results: %s", err) } } @@ -267,3 +299,16 @@ func (e Entrypointer) WriteExitCodeFile(stepPath, content string) { exitCodeFile := filepath.Join(stepPath, "exitCode") e.PostWriter.Write(exitCodeFile, content) } + +func (e Entrypointer) waitingCancellation() error { + if !e.StopOnCancel { + return nil + } + if err := e.Waiter.Wait(pod.DownwardMountCancelFile, true, false); err != nil { + return err + } + if e.Ctx.Err() == nil { + e.Cancel() + } + return nil +} diff --git a/pkg/entrypoint/entrypointer_test.go b/pkg/entrypoint/entrypointer_test.go index 2d6bcfa5275..e98ab4f49ac 100644 --- a/pkg/entrypoint/entrypointer_test.go +++ b/pkg/entrypoint/entrypointer_test.go @@ -100,6 +100,7 @@ func TestEntrypointerFailures(t *testing.T) { defer os.Remove(terminationFile.Name()) } err := Entrypointer{ + Ctx: context.Background(), Command: []string{"echo", "some", "args"}, WaitFiles: c.waitFiles, PostFile: c.postFile, @@ -177,6 +178,7 @@ func TestEntrypointer(t *testing.T) { defer os.Remove(terminationFile.Name()) } err := Entrypointer{ + Ctx: context.Background(), Command: append([]string{c.entrypoint}, c.args...), WaitFiles: c.waitFiles, PostFile: c.postFile, @@ -315,6 +317,7 @@ func TestReadResultsFromDisk(t *testing.T) { } e := Entrypointer{ + Ctx: context.Background(), Results: resultsFilePath, TerminationPath: terminationPath, ResultExtractionMethod: config.ResultExtractionMethodTerminationMessage, @@ -350,7 +353,7 @@ func TestEntrypointer_ReadBreakpointExitCodeFromDisk(t *testing.T) { if err = os.WriteFile(tmp.Name(), []byte(fmt.Sprintf("%d", expectedExitCode)), 0700); err != nil { t.Errorf("error while writing to temp file create temp file for testing exit code written by breakpoint") } - e := Entrypointer{} + e := Entrypointer{Ctx: context.Background()} // test reading the exit code from error waitfile actualExitCode, err := e.BreakpointExitCode(tmp.Name()) if actualExitCode != expectedExitCode { @@ -398,6 +401,7 @@ func TestEntrypointer_OnError(t *testing.T) { defer os.Remove(terminationFile.Name()) } err := Entrypointer{ + Ctx: context.Background(), Command: []string{"echo", "some", "args"}, WaitFiles: []string{}, PostFile: c.postFile, @@ -545,6 +549,7 @@ func TestEntrypointerResults(t *testing.T) { } err := Entrypointer{ + Ctx: context.Background(), Command: append([]string{c.entrypoint}, c.args...), WaitFiles: c.waitFiles, PostFile: c.postFile, @@ -601,6 +606,93 @@ func TestEntrypointerResults(t *testing.T) { } } +func Test_waitingCancellation(t *testing.T) { + type args struct { + stopOnCancel bool + } + testCases := []struct { + name string + args args + expectCtxErr error + }{ + { + name: "stopOnCancel is false", + args: args{ + stopOnCancel: false, + }, + }, + { + name: "stopOnCancel is true and want context canceled", + args: args{ + stopOnCancel: true, + }, + expectCtxErr: context.Canceled, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + fw := &fakeWaiter{} + err := Entrypointer{ + Ctx: ctx, + Cancel: cancel, + Waiter: fw, + StopOnCancel: tc.args.stopOnCancel, + }.waitingCancellation() + if err != nil { + t.Fatalf("Entrypointer waitingCancellation failed: %v", err) + } + if tc.expectCtxErr != nil && ctx.Err() != tc.expectCtxErr { + t.Errorf("expected context error %v, got %v", tc.expectCtxErr, ctx.Err()) + } + }) + } +} + +func TestEntrypointerStopOnCancel(t *testing.T) { + testCases := []struct { + name string + stopOnCancel bool + expectError error + }{ + { + name: "stopOnCancel is false, expect no error", + }, + { + name: "stopOnCancel is true, expect context canceled error", + stopOnCancel: true, + expectError: ContextCanceledError, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + terminationPath := "termination" + if terminationFile, err := os.CreateTemp("", "termination"); err != nil { + t.Fatalf("unexpected error creating temporary termination file: %v", err) + } else { + terminationPath = terminationFile.Name() + defer os.Remove(terminationFile.Name()) + } + ctx, cancel := context.WithCancel(context.Background()) + fw := &fakeWaiter{} + fr := &fakeLongRunner{duration: 1 * time.Second} + fp := &fakePostWriter{} + err := Entrypointer{ + Ctx: ctx, + Cancel: cancel, + Waiter: fw, + StopOnCancel: tc.stopOnCancel, + Runner: fr, + PostWriter: fp, + TerminationPath: terminationPath, + }.Go() + if err != tc.expectError { + t.Errorf("expected error %v, got %v", tc.expectError, err) + } + }) + } +} + type fakeWaiter struct{ waited []string } func (f *fakeWaiter) Wait(file string, _ bool, _ bool) error { @@ -671,6 +763,23 @@ func (f *fakeExitErrorRunner) Run(ctx context.Context, args ...string) error { return exec.Command("ls", "/bogus/path").Run() } +type fakeLongRunner struct{ duration time.Duration } + +func (f *fakeLongRunner) Run(ctx context.Context, _ ...string) error { + select { + case <-time.After(f.duration): + return nil + case <-ctx.Done(): + if ctx.Err() == context.Canceled { + return ContextCanceledError + } + if ctx.Err() == context.DeadlineExceeded { + return ContextDeadlineExceededError + } + return nil + } +} + type fakeResultsWriter struct { args *[]string resultsToWrite map[string]string diff --git a/pkg/pod/entrypoint.go b/pkg/pod/entrypoint.go index e62b04b6fc3..9bbee443866 100644 --- a/pkg/pod/entrypoint.go +++ b/pkg/pod/entrypoint.go @@ -61,6 +61,10 @@ const ( sidecarPrefix = "sidecar-" breakpointOnFailure = "onFailure" + + downwardMountCancelFile = "cancel" + cancelAnnotation = "tekton.dev/cancel" + cancelAnnotationValue = "CANCEL" ) var ( @@ -83,6 +87,12 @@ var ( MountPath: pipeline.StepsDir, } + downwardCancelVolumeItem = corev1.DownwardAPIVolumeFile{ + Path: downwardMountCancelFile, + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: fmt.Sprintf("metadata.annotations['%s']", cancelAnnotation), + }, + } // TODO(#1605): Signal sidecar readiness by injecting entrypoint, // remove dependency on Downward API. downwardVolume = corev1.Volume{ @@ -105,6 +115,7 @@ var ( // since the volume itself is readonly, but including for completeness. ReadOnly: true, } + DownwardMountCancelFile = filepath.Join(downwardMountPoint, downwardMountCancelFile) ) // orderContainers returns the specified steps, modified so that they are @@ -114,7 +125,7 @@ var ( // command, we must have fetched the image's ENTRYPOINT before calling this // method, using entrypoint_lookup.go. // Additionally, Step timeouts are added as entrypoint flag. -func orderContainers(commonExtraEntrypointArgs []string, steps []corev1.Container, taskSpec *v1beta1.TaskSpec, breakpointConfig *v1beta1.TaskRunDebug, waitForReadyAnnotation bool) ([]corev1.Container, error) { +func orderContainers(commonExtraEntrypointArgs []string, steps []corev1.Container, taskSpec *v1beta1.TaskSpec, breakpointConfig *v1beta1.TaskRunDebug, waitForReadyAnnotation, enableCancelUsingEntrypoint bool) ([]corev1.Container, error) { if len(steps) == 0 { return nil, errors.New("No steps specified") } @@ -172,6 +183,10 @@ func orderContainers(commonExtraEntrypointArgs []string, steps []corev1.Containe } } + if enableCancelUsingEntrypoint { + argsForEntrypoint = append(argsForEntrypoint, "-stop_on_cancel") + } + cmd, args := s.Command, s.Args if len(cmd) > 0 { argsForEntrypoint = append(argsForEntrypoint, "-entrypoint", cmd[0]) @@ -209,7 +224,7 @@ func collectResultsName(results []v1beta1.TaskResult) string { return strings.Join(resultNames, ",") } -var replaceReadyPatchBytes []byte +var replaceReadyPatchBytes, replaceCancelPatchBytes []byte func init() { // https://stackoverflow.com/questions/55573724/create-a-patch-to-add-a-kubernetes-annotation @@ -223,6 +238,33 @@ func init() { if err != nil { log.Fatalf("failed to marshal replace ready patch bytes: %v", err) } + + cancelAnnotationPath := "/metadata/annotations/" + strings.Replace(cancelAnnotation, "/", "~1", 1) + replaceCancelPatchBytes, err = json.Marshal([]jsonpatch.JsonPatchOperation{{ + Operation: "replace", + Path: cancelAnnotationPath, + Value: cancelAnnotationValue, + }}) + if err != nil { + log.Fatalf("failed to marshal replace cancel patch bytes: %v", err) + } +} + +// CancelPod updates the Pod's annotations to signal the cancellation +// by projecting the cancel annotation via the Downward API. +// Do not wrap the error, return directly, because the caller needs to judge whether it is a not found error +func CancelPod(ctx context.Context, kubeClient kubernetes.Interface, namespace, podName string) error { + // PATCH the Pod's annotations to replace the cancel annotation with the + // "CANCEL" value, to signal the pod to be cancelled. + _, err := kubeClient.CoreV1().Pods(namespace).Patch(ctx, podName, types.JSONPatchType, replaceCancelPatchBytes, metav1.PatchOptions{}) + return err +} + +// DeletePod delete specify pod by namespace and pod name. +// Do not wrap the error, return directly, because the caller needs to judge whether it is a not found error +func DeletePod(ctx context.Context, kubeClient kubernetes.Interface, namespace, podName string) error { + err := kubeClient.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{}) + return err } // UpdateReady updates the Pod's annotations to signal the first step to start diff --git a/pkg/pod/entrypoint_test.go b/pkg/pod/entrypoint_test.go index 261862f1360..6cb5161be45 100644 --- a/pkg/pod/entrypoint_test.go +++ b/pkg/pod/entrypoint_test.go @@ -95,7 +95,7 @@ func TestOrderContainers(t *testing.T) { }, TerminationMessagePath: "/tekton/termination", }} - got, err := orderContainers([]string{}, steps, nil, nil, true) + got, err := orderContainers([]string{}, steps, nil, nil, true, false) if err != nil { t.Fatalf("orderContainers: %v", err) } @@ -163,7 +163,7 @@ func TestOrderContainersWithResultsSidecarLogs(t *testing.T) { }, TerminationMessagePath: "/tekton/termination", }} - got, err := orderContainers([]string{"-dont_send_results_to_termination_path"}, steps, nil, nil, true) + got, err := orderContainers([]string{"-dont_send_results_to_termination_path"}, steps, nil, nil, true, false) if err != nil { t.Fatalf("orderContainers: %v", err) } @@ -209,7 +209,7 @@ func TestOrderContainersWithNoWait(t *testing.T) { VolumeMounts: []corev1.VolumeMount{volumeMount}, TerminationMessagePath: "/tekton/termination", }} - got, err := orderContainers([]string{}, steps, nil, nil, false) + got, err := orderContainers([]string{}, steps, nil, nil, false, false) if err != nil { t.Fatalf("orderContainers: %v", err) } @@ -243,7 +243,7 @@ func TestOrderContainersWithDebugOnFailure(t *testing.T) { taskRunDebugConfig := &v1beta1.TaskRunDebug{ Breakpoint: []string{"onFailure"}, } - got, err := orderContainers([]string{}, steps, nil, taskRunDebugConfig, true) + got, err := orderContainers([]string{}, steps, nil, taskRunDebugConfig, true, false) if err != nil { t.Fatalf("orderContainers: %v", err) } @@ -321,7 +321,38 @@ func TestEntryPointResults(t *testing.T) { }, TerminationMessagePath: "/tekton/termination", }} - got, err := orderContainers([]string{}, steps, &taskSpec, nil, true) + got, err := orderContainers([]string{}, steps, &taskSpec, nil, true, false) + if err != nil { + t.Fatalf("orderContainers: %v", err) + } + if d := cmp.Diff(want, got); d != "" { + t.Errorf("Diff %s", diff.PrintWantGot(d)) + } +} + +func TestEntryPointEnableCancelUsingEntrypoint(t *testing.T) { + steps := []corev1.Container{{ + Image: "step-1", + Command: []string{"cmd"}, + Args: []string{"arg1", "arg2"}, + }} + want := []corev1.Container{{ + Image: "step-1", + Command: []string{entrypointBinary}, + Args: []string{ + "-wait_file", "/tekton/downward/ready", + "-wait_file_content", + "-post_file", "/tekton/run/0/out", + "-termination_path", "/tekton/termination", + "-step_metadata_dir", "/tekton/run/0/status", + "-stop_on_cancel", + "-entrypoint", "cmd", "--", + "arg1", "arg2", + }, + VolumeMounts: []corev1.VolumeMount{downwardMount}, + TerminationMessagePath: "/tekton/termination", + }} + got, err := orderContainers([]string{}, steps, nil, nil, true, true) if err != nil { t.Fatalf("orderContainers: %v", err) } @@ -362,7 +393,7 @@ func TestEntryPointResultsSingleStep(t *testing.T) { VolumeMounts: []corev1.VolumeMount{downwardMount}, TerminationMessagePath: "/tekton/termination", }} - got, err := orderContainers([]string{}, steps, &taskSpec, nil, true) + got, err := orderContainers([]string{}, steps, &taskSpec, nil, true, false) if err != nil { t.Fatalf("orderContainers: %v", err) } @@ -399,7 +430,7 @@ func TestEntryPointSingleResultsSingleStep(t *testing.T) { VolumeMounts: []corev1.VolumeMount{downwardMount}, TerminationMessagePath: "/tekton/termination", }} - got, err := orderContainers([]string{}, steps, &taskSpec, nil, true) + got, err := orderContainers([]string{}, steps, &taskSpec, nil, true, false) if err != nil { t.Fatalf("orderContainers: %v", err) } @@ -470,7 +501,7 @@ func TestEntryPointOnError(t *testing.T) { err: errors.New("task step onError must be either \"continue\" or \"stopAndFail\" but it is set to an invalid value \"invalid-on-error\""), }} { t.Run(tc.desc, func(t *testing.T) { - got, err := orderContainers([]string{}, steps, &tc.taskSpec, nil, true) + got, err := orderContainers([]string{}, steps, &tc.taskSpec, nil, true, false) if len(tc.wantContainers) == 0 { if err == nil { t.Fatalf("expected an error for an invalid value for onError but received none") @@ -569,7 +600,7 @@ func TestEntryPointStepOutputConfigs(t *testing.T) { }, TerminationMessagePath: "/tekton/termination", }} - got, err := orderContainers([]string{}, steps, &taskSpec, nil, true) + got, err := orderContainers([]string{}, steps, &taskSpec, nil, true, false) if err != nil { t.Fatalf("orderContainers: %v", err) } diff --git a/pkg/pod/pod.go b/pkg/pod/pod.go index 3bdd2fc49a6..ae508d2be0b 100644 --- a/pkg/pod/pod.go +++ b/pkg/pod/pod.go @@ -123,6 +123,7 @@ func (b *Builder) Build(ctx context.Context, taskRun *v1beta1.TaskRun, taskSpec defaultForbiddenEnv := config.FromContextOrDefaults(ctx).Defaults.DefaultForbiddenEnv alphaAPIEnabled := featureFlags.EnableAPIFields == config.AlphaAPIFields sidecarLogsResultsEnabled := config.FromContextOrDefaults(ctx).FeatureFlags.ResultExtractionMethod == config.ResultExtractionMethodSidecarLogs + enableCancelUsingEntrypoint := featureFlags.EnableCancelUsingEntrypoint // Add our implicit volumes first, so they can be overridden by the user if they prefer. volumes = append(volumes, implicitVolumes...) @@ -204,16 +205,20 @@ func (b *Builder) Build(ctx context.Context, taskRun *v1beta1.TaskRun, taskSpec readyImmediately := isPodReadyImmediately(*featureFlags, taskSpec.Sidecars) if alphaAPIEnabled { - stepContainers, err = orderContainers(commonExtraEntrypointArgs, stepContainers, &taskSpec, taskRun.Spec.Debug, !readyImmediately) + stepContainers, err = orderContainers(commonExtraEntrypointArgs, stepContainers, &taskSpec, taskRun.Spec.Debug, !readyImmediately, enableCancelUsingEntrypoint) } else { - stepContainers, err = orderContainers(commonExtraEntrypointArgs, stepContainers, &taskSpec, nil, !readyImmediately) + stepContainers, err = orderContainers(commonExtraEntrypointArgs, stepContainers, &taskSpec, nil, !readyImmediately, enableCancelUsingEntrypoint) } if err != nil { return nil, err } volumes = append(volumes, binVolume) if !readyImmediately { - volumes = append(volumes, downwardVolume) + downwardVolumeDup := downwardVolume.DeepCopy() + if enableCancelUsingEntrypoint { + downwardVolumeDup.VolumeSource.DownwardAPI.Items = append(downwardVolumeDup.VolumeSource.DownwardAPI.Items, downwardCancelVolumeItem) + } + volumes = append(volumes, *downwardVolumeDup) } // Order of precedence for envs diff --git a/pkg/pod/pod_test.go b/pkg/pod/pod_test.go index 37f62924c00..a471d0f1a00 100644 --- a/pkg/pod/pod_test.go +++ b/pkg/pod/pod_test.go @@ -1963,6 +1963,64 @@ _EOF_ }), ActiveDeadlineSeconds: &defaultActiveDeadlineSeconds, }, + }, { + desc: "cancel use entrypoint enabled", + featureFlags: map[string]string{"enable-cancel-using-entrypoint": "true"}, + ts: v1beta1.TaskSpec{ + Steps: []v1beta1.Step{{ + Name: "name", + Image: "image", + Command: []string{"cmd"}, // avoid entrypoint lookup. + }}, + }, + want: &corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + InitContainers: []corev1.Container{ + entrypointInitContainer(images.EntrypointImage, []v1beta1.Step{{Name: "name"}}), + }, + Containers: []corev1.Container{{ + Name: "step-name", + Image: "image", + Command: []string{"/tekton/bin/entrypoint"}, + Args: []string{ + "-wait_file", + "/tekton/downward/ready", + "-wait_file_content", + "-post_file", + "/tekton/run/0/out", + "-termination_path", + "/tekton/termination", + "-step_metadata_dir", + "/tekton/run/0/status", + "-stop_on_cancel", + "-entrypoint", + "cmd", + "--", + }, + VolumeMounts: append([]corev1.VolumeMount{binROMount, runMount(0, false), downwardMount, { + Name: "tekton-creds-init-home-0", + MountPath: "/tekton/creds", + }}, implicitVolumeMounts...), + TerminationMessagePath: "/tekton/termination", + }}, + Volumes: append(implicitVolumes, binVolume, runVolume(0), corev1.Volume{ + Name: downwardVolumeName, + VolumeSource: corev1.VolumeSource{ + DownwardAPI: &corev1.DownwardAPIVolumeSource{ + Items: []corev1.DownwardAPIVolumeFile{{ + Path: downwardMountReadyFile, + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: fmt.Sprintf("metadata.annotations['%s']", readyAnnotation), + }, + }, downwardCancelVolumeItem}, + }, + }, + }, corev1.Volume{ + Name: "tekton-creds-init-home-0", + VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{Medium: corev1.StorageMediumMemory}}, + }), + ActiveDeadlineSeconds: &defaultActiveDeadlineSeconds, + }, }} { t.Run(c.desc, func(t *testing.T) { names.TestingSeed() @@ -2260,6 +2318,10 @@ debug-fail-continue-heredoc-randomly-generated-mz4c7 } } +func TestPodBuildWithEnabledCancelUseEntrypoint(t *testing.T) { + +} + type ExpectedComputeResources struct { name string ResourceRequirements corev1.ResourceRequirements diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index bbcf5436553..a79b2b63267 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -648,10 +648,17 @@ func (c *Reconciler) failTaskRun(ctx context.Context, tr *v1beta1.TaskRun, reaso return nil } - // tr.Status.PodName will be empty if the pod was never successfully created. This condition - // can be reached, for example, by the pod never being schedulable due to limits imposed by - // a namespace's ResourceQuota. - err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Delete(ctx, tr.Status.PodName, metav1.DeleteOptions{}) + var err error + switch { + case reason == v1beta1.TaskRunReasonCancelled && config.FromContextOrDefaults(ctx).FeatureFlags.EnableCancelUsingEntrypoint: + logger.Infof("canceling task run %q by entrypoint", tr.Name) + err = podconvert.CancelPod(ctx, c.KubeClientSet, tr.Namespace, tr.Status.PodName) + default: + // tr.Status.PodName will be empty if the pod was never successfully created. This condition + // can be reached, for example, by the pod never being schedulable due to limits imposed by + // a namespace's ResourceQuota. + err = podconvert.DeletePod(ctx, c.KubeClientSet, tr.Namespace, tr.Status.PodName) + } if err != nil && !k8serrors.IsNotFound(err) { logger.Infof("Failed to terminate pod: %v", err) return err