Skip to content

Commit

Permalink
The cancellation of taskruns is now done through the entrypoint binary
Browse files Browse the repository at this point in the history
through a new flag called 'stop_on_cancel'. This removes the need for
deleting the pods to cancel a taskrun, allowing examination of the logs
on the pods from cancelled taskruns. Part of work on issue tektoncd#3238

Signed-off-by: chengjoey <[email protected]>
  • Loading branch information
lbernick authored and chengjoey committed Jul 26, 2023
1 parent 5cc5929 commit 0a81ada
Show file tree
Hide file tree
Showing 19 changed files with 769 additions and 31 deletions.
10 changes: 10 additions & 0 deletions cmd/entrypoint/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var (
enableSpire = flag.Bool("enable_spire", false, "If specified by configmap, this enables spire signing and verification")
socketPath = flag.String("spire_socket_path", "unix:///spiffe-workload-api/spire-agent.sock", "Experimental: The SPIRE agent socket for SPIFFE workload API.")
resultExtractionMethod = flag.String("result_from", featureFlags.ResultExtractionMethodTerminationMessage, "The method using which to extract results from tasks. Default is using the termination message.")
stopOnCancel = flag.Bool("stop_on_cancel", false, "If specified, stop the step when the taskrun is cancelled")
)

const (
Expand Down Expand Up @@ -164,6 +165,7 @@ func main() {
StepMetadataDir: *stepMetadataDir,
SpireWorkloadAPI: spireWorkloadAPI,
ResultExtractionMethod: *resultExtractionMethod,
StopOnCancel: *stopOnCancel,
}

// Copy any creds injected by the controller into the $HOME directory of the current
Expand All @@ -181,6 +183,14 @@ func main() {
case termination.MessageLengthError:
log.Print(err.Error())
os.Exit(1)
case entrypoint.ContextError:
if errors.Is(err, entrypoint.ErrContextCanceled) {
log.Print("Step was cancelled")
os.Exit(int(syscall.SIGKILL))
} else {
log.Print(err.Error())
os.Exit(1)
}
case *exec.ExitError:
// Copied from https://stackoverflow.com/questions/10385551/get-exit-code-go
// This works on both Unix and Windows. Although
Expand Down
13 changes: 11 additions & 2 deletions cmd/entrypoint/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ func (rr *realRunner) Run(ctx context.Context, args ...string) error {
// Start defined command
if err := cmd.Start(); err != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return context.DeadlineExceeded
return entrypoint.ErrContextDeadlineExceeded
}
if errors.Is(ctx.Err(), context.Canceled) {
return entrypoint.ErrContextCanceled
}
return err
}
Expand All @@ -134,9 +137,15 @@ func (rr *realRunner) Run(ctx context.Context, args ...string) error {
}()

// Wait for command to exit
// as os.exec [note](https://github.com/golang/go/blob/ee522e2cdad04a43bc9374776483b6249eb97ec9/src/os/exec/exec.go#L897-L906)
// cmd.Wait prefer Process error over context error
// but we want to return context error instead
if err := cmd.Wait(); err != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return context.DeadlineExceeded
return entrypoint.ErrContextDeadlineExceeded
}
if errors.Is(ctx.Err(), context.Canceled) {
return entrypoint.ErrContextCanceled
}
return err
}
Expand Down
35 changes: 34 additions & 1 deletion cmd/entrypoint/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"syscall"
"testing"
"time"

"github.com/tektoncd/pipeline/pkg/entrypoint"
)

// TestRealRunnerSignalForwarding will artificially put an interrupt signal (SIGINT) in the rr.signals chan.
Expand Down Expand Up @@ -183,10 +185,41 @@ func TestRealRunnerTimeout(t *testing.T) {
defer cancel()

if err := rr.Run(ctx, "sleep", "0.01"); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
if !errors.Is(err, entrypoint.ErrContextDeadlineExceeded) {
t.Fatalf("unexpected error received: %v", err)
}
} else {
t.Fatalf("step didn't timeout")
}
}

func TestRealRunnerCanceled_beforeCmdWait(t *testing.T) {
rr := realRunner{}
ctx, cancel := context.WithCancel(context.Background())
cancel()

if err := rr.Run(ctx, "sleep", "3"); err != nil {
if !errors.Is(err, entrypoint.ErrContextCanceled) {
t.Fatalf("unexpected error received: %v", err)
}
} else {
t.Fatalf("step didn't cancel")
}
}

func TestRealRunnerCanceled(t *testing.T) {
rr := realRunner{}
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(1 * time.Second)
cancel()
}()

if err := rr.Run(ctx, "sleep", "3"); err != nil {
if !errors.Is(err, entrypoint.ErrContextCanceled) {
t.Fatalf("unexpected error received: %v", err)
}
} else {
t.Fatalf("step didn't cancel")
}
}
40 changes: 40 additions & 0 deletions cmd/entrypoint/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package main

import (
"context"
"errors"
"fmt"
"os"
"time"
Expand Down Expand Up @@ -74,6 +76,44 @@ func (rw *realWaiter) Wait(file string, expectContent bool, breakpointOnFailure
}
}

func (rw *realWaiter) WaitWithContext(ctx context.Context, file string, expectContent bool, breakpointOnFailure bool) error {
if file == "" {
return nil
}
for {
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.Canceled) {
return entrypoint.ErrContextCanceled
}
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return entrypoint.ErrContextDeadlineExceeded
}
return nil
case <-time.After(rw.waitPollingInterval):
}
if info, err := os.Stat(file); err == nil {
if !expectContent || info.Size() > 0 {
return nil
}
} else if !os.IsNotExist(err) {
return fmt.Errorf("waiting for %q: %w", file, err)
}
// When a .err file is read by this step, it means that a previous step has failed
// We wouldn't want this step to stop executing because the previous step failed during debug
// That is counterproductive to debugging
// Hence we disable skipError here so that the other steps in the failed taskRun can continue
// executing if breakpointOnFailure is enabled for the taskRun
// TLDR: Do not return skipError when breakpointOnFailure is enabled as it breaks execution of the TaskRun
if _, err := os.Stat(file + ".err"); err == nil {
if breakpointOnFailure {
return nil
}
return skipError("error file present, bail and skip the step")
}
}
}

type skipError string

func (e skipError) Error() string {
Expand Down
174 changes: 174 additions & 0 deletions cmd/entrypoint/waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ limitations under the License.
package main

import (
"context"
"errors"
"os"
"strings"
"testing"
"time"

"github.com/tektoncd/pipeline/pkg/entrypoint"
)

const testWaitPollingInterval = 50 * time.Millisecond
Expand Down Expand Up @@ -191,3 +194,174 @@ func TestRealWaiterWaitWithBreakpointOnFailure(t *testing.T) {
t.Errorf("expected Wait() to have detected a non-zero file size by now")
}
}

func TestRealWaiterWaitWithContextCanceled(t *testing.T) {
tmp, err := os.CreateTemp("", "real_waiter_test_file")
if err != nil {
t.Errorf("error creating temp file: %v", err)
}
defer os.Remove(tmp.Name())
ctx, cancel := context.WithCancel(context.Background())
rw := realWaiter{}
errCh := make(chan error)
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(ctx, tmp.Name(), true, false)
if err == nil {
t.Errorf("expected context canceled error")
}
errCh <- err
}()
cancel()
delay := time.NewTimer(2 * testWaitPollingInterval)
select {
case err := <-errCh:
if !errors.Is(err, entrypoint.ErrContextCanceled) {
t.Errorf("expected ErrContextCanceled, got %T", err)
}
case <-delay.C:
t.Errorf("expected Wait() to have a ErrContextCanceled")
}
}

func TestRealWaiterWaitWithTimeout(t *testing.T) {
tmp, err := os.CreateTemp("", "real_waiter_test_file")
if err != nil {
t.Errorf("error creating temp file: %v", err)
}
defer os.Remove(tmp.Name())
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
rw := realWaiter{}
errCh := make(chan error)
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(ctx, tmp.Name(), true, false)
if err == nil {
t.Errorf("expected context deadline error")
}
errCh <- err
}()
delay := time.NewTimer(2 * time.Second)
select {
case err := <-errCh:
if !errors.Is(err, entrypoint.ErrContextDeadlineExceeded) {
t.Errorf("expected ErrContextDeadlineExceeded, got %T", err)
}
case <-delay.C:
t.Errorf("expected Wait() to have a ErrContextDeadlineExceeded")
}
}

func TestRealWaiterWaitContextWithBreakpointOnFailure(t *testing.T) {
tmp, err := os.CreateTemp("", "real_waiter_test_file*.err")
if err != nil {
t.Errorf("error creating temp file: %v", err)
}
tmpFileName := strings.Replace(tmp.Name(), ".err", "", 1)
defer os.Remove(tmp.Name())
rw := realWaiter{}
doneCh := make(chan struct{})
go func() {
// When breakpoint on failure is enabled skipError shouldn't be returned for a error waitfile
err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(context.Background(), tmpFileName, false, true)
if err != nil {
t.Errorf("error waiting on tmp file %q", tmp.Name())
}
close(doneCh)
}()
delay := time.NewTimer(2 * testWaitPollingInterval)
select {
case <-doneCh:
// Success
case <-delay.C:
t.Errorf("expected Wait() to have detected a non-zero file size by now")
}
}

func TestRealWaiterWaitContextWithErrorWaitfile(t *testing.T) {
tmp, err := os.CreateTemp("", "real_waiter_test_file*.err")
if err != nil {
t.Errorf("error creating temp file: %v", err)
}
tmpFileName := strings.Replace(tmp.Name(), ".err", "", 1)
defer os.Remove(tmp.Name())
rw := realWaiter{}
doneCh := make(chan struct{})
go func() {
// error of type skipError is returned after encountering a error waitfile
err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(context.Background(), tmpFileName, false, false)
if err == nil {
t.Errorf("expected skipError upon encounter error waitfile")
}
var skipErr skipError
if errors.As(err, &skipErr) {
close(doneCh)
} else {
t.Errorf("unexpected error type %T", err)
}
}()
delay := time.NewTimer(2 * testWaitPollingInterval)
select {
case <-doneCh:
// Success
case <-delay.C:
t.Errorf("expected Wait() to have detected a non-zero file size by now")
}
}

func TestRealWaiterWaitContextWithContent(t *testing.T) {
tmp, err := os.CreateTemp("", "real_waiter_test_file")
if err != nil {
t.Errorf("error creating temp file: %v", err)
}
defer os.Remove(tmp.Name())
rw := realWaiter{}
doneCh := make(chan struct{})
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(context.Background(), tmp.Name(), true, false)
if err != nil {
t.Errorf("error waiting on tmp file %q", tmp.Name())
}
close(doneCh)
}()
if err := os.WriteFile(tmp.Name(), []byte("😺"), 0700); err != nil {
t.Errorf("error writing content to temp file: %v", err)
}
delay := time.NewTimer(2 * testWaitPollingInterval)
select {
case <-doneCh:
// Success
case <-delay.C:
t.Errorf("expected Wait() to have detected a non-zero file size by now")
}
}

func TestRealWaiterWaitContextMissingFile(t *testing.T) {
// Create a temp file and then immediately delete it to get
// a legitimate tmp path and ensure the file doesnt exist
// prior to testing Wait().
tmp, err := os.CreateTemp("", "real_waiter_test_file")
if err != nil {
t.Errorf("error creating temp file: %v", err)
}
os.Remove(tmp.Name())
rw := realWaiter{}
doneCh := make(chan struct{})
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(context.Background(), tmp.Name(), false, false)
if err != nil {
t.Errorf("error waiting on tmp file %q", tmp.Name())
}
close(doneCh)
}()

delay := time.NewTimer(2 * testWaitPollingInterval)
select {
case <-delay.C:
// Success
case <-doneCh:
t.Errorf("did not expect Wait() to have detected a file at path %q", tmp.Name())
if !delay.Stop() {
<-delay.C
}
}
}
10 changes: 10 additions & 0 deletions docs/deprecations.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ The following features are deprecated but have not yet been removed.
| [`pipelineRef.bundle` and `taskRef.bundle` are deprecated](https://github.com/tektoncd/pipeline/issues/5514) | v0.41.0 | Alpha | July 13, 2023 |
| [The `config-trusted-resources` configMap is deprecated](https://github.com/tektoncd/pipeline/issues/5852) | v0.45.0 | Alpha | v0.46.0 |
| [The `default-cloud-events-sink` setting in the `config-defaults` configMap is deprecated](https://github.com/tektoncd/pipeline/pull/6883) in favour of the new `config-events` configMap. | v0.50.0 | N/A | v0.59.0 |
| [v1beta1 Tasks, TaskRuns, Pipelines, and PipelineRuns are deprecated in favor of v1](https://github.com/tektoncd/pipeline/issues/5541) | v0.50.0 | Beta | v0.62.0 |

### v1beta1 deprecation

The v1beta1 versions of Task, TaskRun, Pipeline, and PipelineRun are deprecated in favor of the v1 versions of these APIs,
as of release v0.50.0. Following the [beta CRD compatibility policy](../api_compatibility_policy.md#beta-crds),
the earliest release the v1beta1 versions of these CRDs may be removed is 1 year later, or v0.62.0 (LTS).
The v1beta1 client libraries will be retained until v0.62.0 has reached its end of life, 1 year later.
Therefore, the earliest release the client libraries may be removed is v0.74.0, 12 months after v0.62.0.

## Removed features

The features listed below have been removed but may still be supported in releases that have not reached their EOL.
Expand Down
Loading

0 comments on commit 0a81ada

Please sign in to comment.