Skip to content

Commit

Permalink
The cancellation of taskruns is now done through the entrypoint binary
Browse files Browse the repository at this point in the history
through a new flag called 'stop_on_cancel'. This removes the need for
deleting the pods to cancel a taskrun, allowing examination of the logs
on the pods from cancelled taskruns. Part of work on issue tektoncd#3238

Signed-off-by: chengjoey <[email protected]>
  • Loading branch information
chengjoey committed Apr 8, 2023
1 parent f543fdb commit e53bdc8
Show file tree
Hide file tree
Showing 16 changed files with 466 additions and 39 deletions.
20 changes: 19 additions & 1 deletion cmd/entrypoint/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"encoding/json"
"errors"
"flag"
Expand Down Expand Up @@ -58,6 +59,7 @@ var (
enableSpire = flag.Bool("enable_spire", false, "If specified by configmap, this enables spire signing and verification")
socketPath = flag.String("spire_socket_path", "unix:///spiffe-workload-api/spire-agent.sock", "Experimental: The SPIRE agent socket for SPIFFE workload API.")
resultExtractionMethod = flag.String("result_from", featureFlags.ResultExtractionMethodTerminationMessage, "The method using which to extract results from tasks. Default is using the termination message.")
stopOnCancel = flag.Bool("stop_on_cancel", false, "If specified, stop the step when the taskrun is cancelled")
)

const (
Expand Down Expand Up @@ -137,6 +139,11 @@ func main() {
}
}

ctx := context.Background()
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()

var spireWorkloadAPI spire.EntrypointerAPIClient
if enableSpire != nil && *enableSpire && socketPath != nil && *socketPath != "" {
spireConfig := config.SpireConfig{
Expand All @@ -146,12 +153,14 @@ func main() {
}

e := entrypoint.Entrypointer{
Ctx: ctx,
Cancel: cancel,
Command: append(cmd, commandArgs...),
WaitFiles: strings.Split(*waitFiles, ","),
WaitFileContent: *waitFileContent,
PostFile: *postFile,
TerminationPath: *terminationPath,
Waiter: &realWaiter{waitPollingInterval: defaultWaitPollingInterval, breakpointOnFailure: *breakpointOnFailure},
Waiter: &realWaiter{ctx: ctx, waitPollingInterval: defaultWaitPollingInterval, breakpointOnFailure: *breakpointOnFailure},
Runner: &realRunner{
stdoutPath: *stdoutPath,
stderrPath: *stderrPath,
Expand All @@ -164,6 +173,7 @@ func main() {
StepMetadataDir: *stepMetadataDir,
SpireWorkloadAPI: spireWorkloadAPI,
ResultExtractionMethod: *resultExtractionMethod,
StopOnCancel: *stopOnCancel,
}

// Copy any creds injected by the controller into the $HOME directory of the current
Expand All @@ -181,6 +191,14 @@ func main() {
case termination.MessageLengthError:
log.Print(err.Error())
os.Exit(1)
case entrypoint.ContextError:
if errors.Is(err, entrypoint.ContextCanceledError) {
log.Print("Step was cancelled")
os.Exit(int(syscall.SIGKILL))
} else {
log.Print(err.Error())
os.Exit(1)
}
case *exec.ExitError:
// Copied from https://stackoverflow.com/questions/10385551/get-exit-code-go
// This works on both Unix and Windows. Although
Expand Down
5 changes: 4 additions & 1 deletion cmd/entrypoint/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ func (rr *realRunner) Run(ctx context.Context, args ...string) error {
// Wait for command to exit
if err := cmd.Wait(); err != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return context.DeadlineExceeded
return entrypoint.ContextDeadlineExceededError
}
if errors.Is(ctx.Err(), context.Canceled) {
return entrypoint.ContextCanceledError
}
return err
}
Expand Down
21 changes: 20 additions & 1 deletion cmd/entrypoint/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"syscall"
"testing"
"time"

"github.com/tektoncd/pipeline/pkg/entrypoint"
)

// TestRealRunnerSignalForwarding will artificially put an interrupt signal (SIGINT) in the rr.signals chan.
Expand Down Expand Up @@ -183,10 +185,27 @@ func TestRealRunnerTimeout(t *testing.T) {
defer cancel()

if err := rr.Run(ctx, "sleep", "0.01"); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
if !errors.Is(err, entrypoint.ContextDeadlineExceededError) {
t.Fatalf("unexpected error received: %v", err)
}
} else {
t.Fatalf("step didn't timeout")
}
}

func TestRealRunnerCanceled(t *testing.T) {
rr := realRunner{}
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(1 * time.Second)
cancel()
}()

if err := rr.Run(ctx, "sleep", "3"); err != nil {
if !errors.Is(err, entrypoint.ContextCanceledError) {
t.Fatalf("unexpected error received: %v", err)
}
} else {
t.Fatalf("step didn't cancel")
}
}
16 changes: 15 additions & 1 deletion cmd/entrypoint/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"fmt"
"os"
"time"
Expand All @@ -26,6 +27,7 @@ import (

// realWaiter actually waits for files, by polling.
type realWaiter struct {
ctx context.Context
waitPollingInterval time.Duration
breakpointOnFailure bool
}
Expand All @@ -51,7 +53,19 @@ func (rw *realWaiter) Wait(file string, expectContent bool, breakpointOnFailure
if file == "" {
return nil
}
for ; ; time.Sleep(rw.waitPollingInterval) {
for {
select {
case <-rw.ctx.Done():
switch rw.ctx.Err() {
case context.Canceled:
return entrypoint.ContextCanceledError
case context.DeadlineExceeded:
return entrypoint.ContextDeadlineExceededError
default:
}
return nil
case <-time.After(rw.waitPollingInterval):
}
if info, err := os.Stat(file); err == nil {
if !expectContent || info.Size() > 0 {
return nil
Expand Down
71 changes: 65 additions & 6 deletions cmd/entrypoint/waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ limitations under the License.
package main

import (
"context"
"errors"
"os"
"strings"
"testing"
"time"

"github.com/tektoncd/pipeline/pkg/entrypoint"
)

const testWaitPollingInterval = 50 * time.Millisecond
Expand All @@ -35,7 +38,7 @@ func TestRealWaiterWaitMissingFile(t *testing.T) {
t.Errorf("error creating temp file: %v", err)
}
os.Remove(tmp.Name())
rw := realWaiter{}
rw := realWaiter{ctx: context.Background()}
doneCh := make(chan struct{})
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmp.Name(), false, false)
Expand Down Expand Up @@ -63,7 +66,7 @@ func TestRealWaiterWaitWithFile(t *testing.T) {
t.Errorf("error creating temp file: %v", err)
}
defer os.Remove(tmp.Name())
rw := realWaiter{}
rw := realWaiter{ctx: context.Background()}
doneCh := make(chan struct{})
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmp.Name(), false, false)
Expand All @@ -87,7 +90,7 @@ func TestRealWaiterWaitMissingContent(t *testing.T) {
t.Errorf("error creating temp file: %v", err)
}
defer os.Remove(tmp.Name())
rw := realWaiter{}
rw := realWaiter{ctx: context.Background()}
doneCh := make(chan struct{})
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmp.Name(), true, false)
Expand All @@ -114,7 +117,7 @@ func TestRealWaiterWaitWithContent(t *testing.T) {
t.Errorf("error creating temp file: %v", err)
}
defer os.Remove(tmp.Name())
rw := realWaiter{}
rw := realWaiter{ctx: context.Background()}
doneCh := make(chan struct{})
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmp.Name(), true, false)
Expand Down Expand Up @@ -142,7 +145,7 @@ func TestRealWaiterWaitWithErrorWaitfile(t *testing.T) {
}
tmpFileName := strings.Replace(tmp.Name(), ".err", "", 1)
defer os.Remove(tmp.Name())
rw := realWaiter{}
rw := realWaiter{ctx: context.Background()}
doneCh := make(chan struct{})
go func() {
// error of type skipError is returned after encountering a error waitfile
Expand Down Expand Up @@ -173,7 +176,7 @@ func TestRealWaiterWaitWithBreakpointOnFailure(t *testing.T) {
}
tmpFileName := strings.Replace(tmp.Name(), ".err", "", 1)
defer os.Remove(tmp.Name())
rw := realWaiter{}
rw := realWaiter{ctx: context.Background()}
doneCh := make(chan struct{})
go func() {
// When breakpoint on failure is enabled skipError shouldn't be returned for a error waitfile
Expand All @@ -191,3 +194,59 @@ func TestRealWaiterWaitWithBreakpointOnFailure(t *testing.T) {
t.Errorf("expected Wait() to have detected a non-zero file size by now")
}
}

func TestRealWaiterWaitWithContextCanceled(t *testing.T) {
tmp, err := os.CreateTemp("", "real_waiter_test_file")
if err != nil {
t.Errorf("error creating temp file: %v", err)
}
defer os.Remove(tmp.Name())
ctx, cancel := context.WithCancel(context.Background())
rw := realWaiter{ctx: ctx}
errCh := make(chan error)
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmp.Name(), true, false)
if err == nil {
t.Errorf("expected context canceled error")
}
errCh <- err
}()
cancel()
delay := time.NewTimer(2 * testWaitPollingInterval)
select {
case err := <-errCh:
if !errors.Is(err, entrypoint.ContextCanceledError) {
t.Errorf("expected ContextCanceledError, got %T", err)
}
case <-delay.C:
t.Errorf("expected Wait() to have a ContextCanceledError")
}
}

func TestRealWaiterWaitWithTimeout(t *testing.T) {
tmp, err := os.CreateTemp("", "real_waiter_test_file")
if err != nil {
t.Errorf("error creating temp file: %v", err)
}
defer os.Remove(tmp.Name())
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
rw := realWaiter{ctx: ctx}
errCh := make(chan error)
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).Wait(tmp.Name(), true, false)
if err == nil {
t.Errorf("expected context deadline error")
}
errCh <- err
}()
delay := time.NewTimer(2 * time.Second)
select {
case err := <-errCh:
if !errors.Is(err, entrypoint.ContextDeadlineExceededError) {
t.Errorf("expected ContextDeadlineExceededError, got %T", err)
}
case <-delay.C:
t.Errorf("expected Wait() to have a ContextDeadlineExceededError")
}
}
3 changes: 3 additions & 0 deletions docs/taskruns.md
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,9 @@ When you cancel a TaskRun, the running pod associated with that `TaskRun` is del
means that the logs of the `TaskRun` are not preserved. The deletion of the `TaskRun` pod is necessary
in order to stop `TaskRun` step containers from running.

**Note: if `enable-cancel-using-entrypoint` is set to
`"true"` in the `feature-flags`, the pod associated with that `TaskRun` will not be deleted**

Example of cancelling a `TaskRun`:

```yaml
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/config/feature_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ const (
DefaultEnableAPIFields = StableAPIFields
// DefaultSendCloudEventsForRuns is the default value for "send-cloudevents-for-runs".
DefaultSendCloudEventsForRuns = false
// DefaultEnableCancelUsingEntrypoint is the default value for "enable-cancel-using-entrypoint"
DefaultEnableCancelUsingEntrypoint = false
// EnforceNonfalsifiabilityWithSpire is the value used for "enable-nonfalsifiability" when SPIRE is used to enable non-falsifiability.
EnforceNonfalsifiabilityWithSpire = "spire"
// EnforceNonfalsifiabilityNone is the value used for "enable-nonfalsifiability" when non-falsifiability is not enabled.
Expand All @@ -76,6 +78,8 @@ const (
DefaultResultExtractionMethod = ResultExtractionMethodTerminationMessage
// DefaultMaxResultSize is the default value in bytes for the size of a result
DefaultMaxResultSize = 4096
// EnableCancelUsingEntrypoint is the flag used to enable cancelling a pod using the entrypoint
EnableCancelUsingEntrypoint = "enable-cancel-using-entrypoint"

disableAffinityAssistantKey = "disable-affinity-assistant"
disableCredsInitKey = "disable-creds-init"
Expand Down Expand Up @@ -105,6 +109,7 @@ type FeatureFlags struct {
SendCloudEventsForRuns bool
AwaitSidecarReadiness bool
EnforceNonfalsifiability string
EnableCancelUsingEntrypoint bool
// VerificationNoMatchPolicy is the feature flag for "trusted-resources-verification-no-match-policy"
// VerificationNoMatchPolicy can be set to "ignore", "warn" and "fail" values.
// ignore: skip trusted resources verification when no matching verification policies found
Expand Down Expand Up @@ -190,6 +195,9 @@ func NewFeatureFlagsFromMap(cfgMap map[string]string) (*FeatureFlags, error) {
if err := setMaxResultSize(cfgMap, DefaultMaxResultSize, &tc.MaxResultSize); err != nil {
return nil, err
}
if err := setFeature(EnableCancelUsingEntrypoint, DefaultEnableCancelUsingEntrypoint, &tc.EnableCancelUsingEntrypoint); err != nil {
return nil, err
}

// Given that they are alpha features, Tekton Bundles and Custom Tasks should be switched on if
// enable-api-fields is "alpha". If enable-api-fields is not "alpha" then fall back to the value of
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/config/feature_flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func TestNewFeatureFlagsFromConfigMap(t *testing.T) {
EnableProvenanceInStatus: true,
ResultExtractionMethod: "termination-message",
MaxResultSize: 4096,
EnableCancelUsingEntrypoint: true,
},
fileName: "feature-flags-all-flags-set",
},
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/config/testdata/feature-flags-all-flags-set.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ data:
enforce-nonfalsifiability: "spire"
trusted-resources-verification-no-match-policy: "fail"
enable-provenance-in-status: "true"
enable-cancel-using-entrypoint: "true"
Loading

0 comments on commit e53bdc8

Please sign in to comment.