diff --git a/agent/agent_configuration.go b/agent/agent_configuration.go index 678cb3832f..c3077652f2 100644 --- a/agent/agent_configuration.go +++ b/agent/agent_configuration.go @@ -3,27 +3,32 @@ package agent // AgentConfiguration is the run-time configuration for an agent that // has been loaded from the config file and command-line params type AgentConfiguration struct { - ConfigPath string - BootstrapScript string - BuildPath string - HooksPath string - SocketsPath string - GitMirrorsPath string - GitMirrorsLockTimeout int - GitMirrorsSkipUpdate bool - PluginsPath string - GitCheckoutFlags string - GitCloneFlags string - GitCloneMirrorFlags string - GitCleanFlags string - GitFetchFlags string - GitSubmodules bool - SSHKeyscan bool - CommandEval bool - PluginsEnabled bool - PluginValidation bool - LocalHooksEnabled bool - RunInPty bool + ConfigPath string + BootstrapScript string + BuildPath string + HooksPath string + SocketsPath string + GitMirrorsPath string + GitMirrorsLockTimeout int + GitMirrorsSkipUpdate bool + PluginsPath string + GitCheckoutFlags string + GitCloneFlags string + GitCloneMirrorFlags string + GitCleanFlags string + GitFetchFlags string + GitSubmodules bool + SSHKeyscan bool + CommandEval bool + PluginsEnabled bool + PluginValidation bool + LocalHooksEnabled bool + RunInPty bool + + JobVerificationKeyPath string + JobVerificationNoSignatureBehavior string + JobVerificationInvalidSignatureBehavior string + ANSITimestamps bool TimestampLines bool HealthCheckAddr string diff --git a/agent/integration/job_runner_integration_test.go b/agent/integration/job_runner_integration_test.go index 8771be6cef..3cc62254bd 100644 --- a/agent/integration/job_runner_integration_test.go +++ b/agent/integration/job_runner_integration_test.go @@ -1,23 +1,13 @@ package integration import ( - "context" - "encoding/json" "fmt" - "io" - "net/http" - "net/http/httptest" "os" - "path/filepath" - "runtime" "strconv" - "strings" "testing" "github.com/buildkite/agent/v3/agent" "github.com/buildkite/agent/v3/api" - "github.com/buildkite/agent/v3/logger" - "github.com/buildkite/agent/v3/metrics" "github.com/buildkite/bintest/v3" ) @@ -196,125 +186,3 @@ func TestJobRunnerIgnoresPipelineChangesToProtectedVars(t *testing.T) { runJob(t, j, server, agent.AgentConfiguration{CommandEval: true}, mb) } - -func mockBootstrap(t *testing.T) *bintest.Mock { - t.Helper() - - // tests run using t.Run() will have a slash in their name, which will mess with paths to bintest binaries - name := strings.ReplaceAll(t.Name(), "/", "-") - bs, err := bintest.NewMock(fmt.Sprintf("buildkite-agent-bootstrap-%s", name)) - if err != nil { - t.Fatalf("bintest.NewMock() error = %v", err) - } - - return bs -} - -func runJob(t *testing.T, j *api.Job, server *httptest.Server, cfg agent.AgentConfiguration, bs *bintest.Mock) { - l := logger.Discard - - // minimal metrics, this could be cleaner - m := metrics.NewCollector(l, metrics.CollectorConfig{}) - scope := m.Scope(metrics.Tags{}) - - // set the bootstrap into the config - cfg.BootstrapScript = bs.Path - - client := api.NewClient(l, api.Config{ - Endpoint: server.URL, - Token: "llamasrock", - }) - - jr, err := agent.NewJobRunner(l, client, agent.JobRunnerConfig{ - Job: j, - AgentConfiguration: cfg, - MetricsScope: scope, - }) - if err != nil { - t.Fatalf("agent.NewJobRunner() error = %v", err) - } - - if err := jr.Run(context.Background()); err != nil { - t.Errorf("jr.Run() = %v", err) - } -} - -type testAgentEndpoint struct { - calls map[string][][]byte -} - -func createTestAgentEndpoint() *testAgentEndpoint { - return &testAgentEndpoint{ - calls: make(map[string][][]byte, 4), - } -} - -func (tae *testAgentEndpoint) finishesFor(t *testing.T, jobID string) []api.Job { - t.Helper() - - endpoint := fmt.Sprintf("/jobs/%s/finish", jobID) - finishes := make([]api.Job, 0, len(tae.calls)) - - for _, b := range tae.calls[endpoint] { - var job api.Job - err := json.Unmarshal(b, &job) - if err != nil { - t.Fatalf("decoding accept request body: %v", err) - } - finishes = append(finishes, job) - } - - return finishes -} - -func (t *testAgentEndpoint) server(jobID string) *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - b, _ := io.ReadAll(req.Body) - t.calls[req.URL.Path] = append(t.calls[req.URL.Path], b) - - switch req.URL.Path { - case "/jobs/" + jobID: - rw.WriteHeader(http.StatusOK) - fmt.Fprintf(rw, `{"state":"running"}`) - case "/jobs/" + jobID + "/start": - rw.WriteHeader(http.StatusOK) - case "/jobs/" + jobID + "/chunks": - rw.WriteHeader(http.StatusCreated) - case "/jobs/" + jobID + "/finish": - rw.WriteHeader(http.StatusOK) - default: - http.Error(rw, fmt.Sprintf("not found; method = %q, path = %q", req.Method, req.URL.Path), http.StatusNotFound) - } - })) -} - -func mockPreBootstrap(t *testing.T, hooksDir string) *bintest.Mock { - t.Helper() - - mock, err := bintest.NewMock(fmt.Sprintf("buildkite-agent-pre-bootstrap-hook-%s", t.Name())) - if err != nil { - t.Fatalf("bintest.NewMock() error = %v", err) - } - - hookScript := filepath.Join(hooksDir, "pre-bootstrap") - body := "" - - if runtime.GOOS == "windows" { - // You may be tempted to change this to `@%q`, but please do not. bintest doesn't like it when things change. - // (%q escapes backslashes, which are windows path separators and leads to this test failing on windows) - body = fmt.Sprintf(`@"%s"`, mock.Path) - hookScript += ".bat" - } else { - body = "#!/bin/sh\n" + mock.Path - } - - if err := os.MkdirAll(hooksDir, 0o700); err != nil { - t.Fatalf("creating pre-bootstrap hook mock: os.MkdirAll() error = %v", err) - } - - if err := os.WriteFile(hookScript, []byte(body), 0o777); err != nil { - t.Fatalf("creating pre-bootstrap hook mock: s.WriteFile() error = %v", err) - } - - return mock -} diff --git a/agent/integration/job_verification_integration_test.go b/agent/integration/job_verification_integration_test.go new file mode 100644 index 0000000000..56417e5bb4 --- /dev/null +++ b/agent/integration/job_verification_integration_test.go @@ -0,0 +1,66 @@ +package integration + +import ( + "os" + "testing" + + "github.com/buildkite/agent/v3/agent" + "github.com/buildkite/agent/v3/api" + "github.com/buildkite/agent/v3/internal/pipeline" +) + +func Test_WhenJobSignatureIsInvalid_AndInvalidSignatureBehaviorIsFail_ItRefusesTheJob(t *testing.T) { + t.Parallel() + + keyFile, err := os.CreateTemp("", "keyfile") + if err != nil { + t.Fatalf("making keyfile: %v", err) + } + defer os.Remove(keyFile.Name()) + + _, err = keyFile.Write([]byte("llamasrock")) + if err != nil { + t.Fatalf("writing keyfile: %v", err) + } + + jobID := "my-job-id" + j := &api.Job{ + ID: jobID, + ChunksMaxSizeBytes: 1024, + Step: pipeline.CommandStep{ + Command: "echo hello world", + Signature: &pipeline.Signature{ + Algorithm: "hmac-sha256", + SignedFields: []string{"command"}, + Value: "not-the-real-signature", + }, + }, + Env: map[string]string{ + "BUILDKITE_COMMAND": "echo hello world", + }, + } + + // create a mock agent API + e := createTestAgentEndpoint() + server := e.server("my-job-id") + defer server.Close() + + mb := mockBootstrap(t) + mb.Expect().NotCalled() // The bootstrap won't be called, as the pre-bootstrap hook failed + defer mb.CheckAndClose(t) + + runJob(t, j, server, agent.AgentConfiguration{ + JobVerificationKeyPath: keyFile.Name(), + JobVerificationInvalidSignatureBehavior: "block", + }, mb) + + job := e.finishesFor(t, jobID)[0] + + if got, want := job.ExitStatus, "-1"; got != want { + t.Errorf("job.ExitStatus = %q, want %q", got, want) + } + + if got, want := job.SignalReason, "agent_refused"; got != want { + t.Errorf("job.SignalReason = %q, want %q", got, want) + } +} diff --git a/agent/integration/test_helpers.go b/agent/integration/test_helpers.go new file mode 100644 index 0000000000..8a78ef6e16 --- /dev/null +++ b/agent/integration/test_helpers.go @@ -0,0 +1,166 @@ +package integration + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "runtime" + "strings" + "sync" + "testing" + + "github.com/buildkite/agent/v3/agent" + "github.com/buildkite/agent/v3/api" + "github.com/buildkite/agent/v3/logger" + "github.com/buildkite/agent/v3/metrics" + "github.com/buildkite/bintest/v3" +) + +func mockBootstrap(t *testing.T) *bintest.Mock { + t.Helper() + + // tests run using t.Run() will have a slash in their name, which will mess with paths to bintest binaries + name := strings.ReplaceAll(t.Name(), "/", "-") + bs, err := bintest.NewMock(fmt.Sprintf("buildkite-agent-bootstrap-%s", name)) + if err != nil { + t.Fatalf("bintest.NewMock() error = %v", err) + } + + return bs +} + +func runJob(t *testing.T, j *api.Job, server *httptest.Server, cfg agent.AgentConfiguration, bs *bintest.Mock) { + l := logger.Discard + + // minimal metrics, this could be cleaner + m := metrics.NewCollector(l, metrics.CollectorConfig{}) + scope := m.Scope(metrics.Tags{}) + + // set the bootstrap into the config + cfg.BootstrapScript = bs.Path + + client := api.NewClient(l, api.Config{ + Endpoint: server.URL, + Token: "llamasrock", + }) + + jr, err := agent.NewJobRunner(l, client, agent.JobRunnerConfig{ + Job: j, + AgentConfiguration: cfg, + MetricsScope: scope, + }) + if err != nil { + t.Fatalf("agent.NewJobRunner() error = %v", err) + } + + if err := jr.Run(context.Background()); err != nil { + t.Errorf("jr.Run() = %v", err) + } +} + +type testAgentEndpoint struct { + calls map[string][][]byte + mtx sync.Mutex +} + +func createTestAgentEndpoint() *testAgentEndpoint { + return &testAgentEndpoint{ + calls: make(map[string][][]byte, 4), + } +} + +func (tae *testAgentEndpoint) finishesFor(t *testing.T, jobID string) []api.Job { + t.Helper() + + endpoint := fmt.Sprintf("/jobs/%s/finish", jobID) + finishes := make([]api.Job, 0, len(tae.calls)) + + for _, b := range tae.calls[endpoint] { + var job api.Job + err := json.Unmarshal(b, &job) + if err != nil { + t.Fatalf("decoding accept request body: %v", err) + } + finishes = append(finishes, job) + } + + return finishes +} + +func (tae *testAgentEndpoint) chunksFor(t *testing.T, jobID string) []api.Chunk { + t.Helper() + + endpoint := fmt.Sprintf("/jobs/%s/chunks", jobID) + chunks := make([]api.Chunk, 0, len(tae.calls)) + + for _, b := range tae.calls[endpoint] { + var chunk api.Chunk + err := json.Unmarshal(b, &chunk) + if err != nil { + t.Fatalf("decoding accept request body: %v", err) + } + chunks = append(chunks, chunk) + } + + return chunks +} + +func (t *testAgentEndpoint) server(jobID string) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + t.mtx.Lock() + defer t.mtx.Unlock() + + b, _ := io.ReadAll(req.Body) + t.calls[req.URL.Path] = append(t.calls[req.URL.Path], b) + + switch req.URL.Path { + case "/jobs/" + jobID: + rw.WriteHeader(http.StatusOK) + fmt.Fprintf(rw, `{"state":"running"}`) + case "/jobs/" + jobID + "/start": + rw.WriteHeader(http.StatusOK) + case "/jobs/" + jobID + "/chunks": + rw.WriteHeader(http.StatusCreated) + case "/jobs/" + jobID + "/finish": + rw.WriteHeader(http.StatusOK) + default: + http.Error(rw, fmt.Sprintf("not found; method = %q, path = %q", req.Method, req.URL.Path), http.StatusNotFound) + } + })) +} + +func mockPreBootstrap(t *testing.T, hooksDir string) *bintest.Mock { + t.Helper() + + mock, err := bintest.NewMock(fmt.Sprintf("buildkite-agent-pre-bootstrap-hook-%s", t.Name())) + if err != nil { + t.Fatalf("bintest.NewMock() error = %v", err) + } + + hookScript := filepath.Join(hooksDir, "pre-bootstrap") + body := "" + + if runtime.GOOS == "windows" { + // You may be tempted to change this to `@%q`, but please do not. bintest doesn't like it when things change. + // (%q escapes backslashes, which are windows path separators and leads to this test failing on windows) + body = fmt.Sprintf(`@"%s"`, mock.Path) + hookScript += ".bat" + } else { + body = "#!/bin/sh\n" + mock.Path + } + + if err := os.MkdirAll(hooksDir, 0o700); err != nil { + t.Fatalf("creating pre-bootstrap hook mock: os.MkdirAll() error = %v", err) + } + + if err := os.WriteFile(hookScript, []byte(body), 0o777); err != nil { + t.Fatalf("creating pre-bootstrap hook mock: s.WriteFile() error = %v", err) + } + + return mock +} diff --git a/agent/job_runner.go b/agent/job_runner.go index 758947bd7a..dad266daa5 100644 --- a/agent/job_runner.go +++ b/agent/job_runner.go @@ -41,6 +41,9 @@ const ( // BuildkiteMessageName is the env var name of the build/commit message. BuildkiteMessageName = "BUILDKITE_MESSAGE" + + JobVerificationBehaviourWarn = "warn" + JobVerificationBehaviourBlock = "block" ) // Certain env can only be set by agent configuration. @@ -105,6 +108,10 @@ type JobRunner struct { // The configuration for the job runner conf JobRunnerConfig + // How the JobRunner should respond in various signature failure modes + JobVerificationInvalidSignatureBehavior string + JobVerificationNoSignatureBehavior string + // The logger to use logger logger.Logger @@ -158,6 +165,17 @@ func NewJobRunner(l logger.Logger, apiClient APIClient, conf JobRunnerConfig) (j apiClient: apiClient, } + var err error + r.JobVerificationInvalidSignatureBehavior, err = r.normalizeJobVerificationBehavior(conf.AgentConfiguration.JobVerificationInvalidSignatureBehavior) + if err != nil { + return nil, fmt.Errorf("setting invalid signature behavior: %w", err) + } + + r.JobVerificationNoSignatureBehavior, err = r.normalizeJobVerificationBehavior(conf.AgentConfiguration.JobVerificationNoSignatureBehavior) + if err != nil { + return nil, fmt.Errorf("setting no signature behavior: %w", err) + } + if conf.JobStatusInterval == 0 { conf.JobStatusInterval = 1 * time.Second } @@ -356,6 +374,25 @@ func NewJobRunner(l logger.Logger, apiClient APIClient, conf JobRunnerConfig) (j return r, nil } +func (r *JobRunner) normalizeJobVerificationBehavior(behavior string) (string, error) { + if r.conf.AgentConfiguration.JobVerificationKeyPath == "" { + // We won't be verifying jobs, so it doesn't matter + return "if you're seeing this string, there's a problem with the job verification code in the agent. contact support@buildkite.com", nil + } + + switch behavior { + case JobVerificationBehaviourBlock, JobVerificationBehaviourWarn: + return behavior, nil + case "": + // TODO: Should we have a default behavior? "warn" is easy, but less secure. "block" is more secure, but has some + // sharp edges when it comes to initial implementation + return JobVerificationBehaviourBlock, nil + default: + return "", fmt.Errorf("invalid job verification behavior: %q", behavior) + } + +} + // Creates the environment variables that will be used in the process and writes a flat environment file func (r *JobRunner) createEnvironment() ([]string, error) { // Create a clone of our jobs environment. We'll then set the @@ -574,89 +611,6 @@ func (r *JobRunner) startJob(ctx context.Context, startedAt time.Time) error { }) } -// finishJob finishes the job in the Buildkite Agent API. If the FinishJob call -// cannot return successfully, this will retry for a long time. -func (r *JobRunner) finishJob(ctx context.Context, finishedAt time.Time, exit processExit, failedChunkCount int) error { - r.conf.Job.FinishedAt = finishedAt.UTC().Format(time.RFC3339Nano) - r.conf.Job.ExitStatus = strconv.Itoa(exit.Status) - r.conf.Job.Signal = exit.Signal - r.conf.Job.SignalReason = exit.SignalReason - r.conf.Job.ChunksFailedCount = failedChunkCount - - r.logger.Debug("[JobRunner] Finishing job with exit_status=%s, signal=%s and signal_reason=%s", - r.conf.Job.ExitStatus, r.conf.Job.Signal, r.conf.Job.SignalReason) - - ctx, cancel := context.WithTimeout(ctx, 48*time.Hour) - defer cancel() - - return roko.NewRetrier( - roko.TryForever(), - roko.WithJitter(), - roko.WithStrategy(roko.Constant(1*time.Second)), - ).DoWithContext(ctx, func(retrier *roko.Retrier) error { - response, err := r.apiClient.FinishJob(ctx, r.conf.Job) - if err != nil { - // If the API returns with a 422, that means that we - // succesfully tried to finish the job, but Buildkite - // rejected the finish for some reason. This can - // sometimes mean that Buildkite has cancelled the job - // before we get a chance to send the final API call - // (maybe this agent took too long to kill the - // process). In that case, we don't want to keep trying - // to finish the job forever so we'll just bail out and - // go find some more work to do. - if response != nil && response.StatusCode == 422 { - r.logger.Warn("Buildkite rejected the call to finish the job (%s)", err) - retrier.Break() - } else { - r.logger.Warn("%s (%s)", err, retrier) - } - } - - return err - }) -} - -// jobLogStreamer waits for the process to start, then grabs the job output -// every few seconds and sends it back to Buildkite. -func (r *JobRunner) jobLogStreamer(ctx context.Context, wg *sync.WaitGroup) { - ctx, setStat, done := status.AddSimpleItem(ctx, "Job Log Streamer") - defer done() - setStat("🏃 Starting...") - - defer func() { - wg.Done() - r.logger.Debug("[JobRunner] Routine that processes the log has finished") - }() - - select { - case <-r.process.Started(): - case <-ctx.Done(): - return - } - - for { - setStat("📨 Sending process output to log streamer") - - // Send the output of the process to the log streamer - // for processing - r.logStreamer.Process(r.output.ReadAndTruncate()) - - setStat("😴 Sleeping for a bit") - - // Sleep for a bit, or until the job is finished - select { - case <-time.After(1 * time.Second): - case <-ctx.Done(): - return - case <-r.process.Done(): - return - } - } - - // The final output after the process has finished is processed in Run(). -} - // jobCancellationChecker waits for the processs to start, then continuously // polls GetJobState to see if the job has been cancelled server-side. If so, // it calls r.Cancel. diff --git a/agent/run_job.go b/agent/run_job.go index 4d708b57f7..f2caa3554b 100644 --- a/agent/run_job.go +++ b/agent/run_job.go @@ -2,18 +2,45 @@ package agent import ( "context" + "errors" + "fmt" "os" "strconv" "sync" "time" "github.com/buildkite/agent/v3/hook" + "github.com/buildkite/agent/v3/internal/pipeline" "github.com/buildkite/agent/v3/kubernetes" "github.com/buildkite/agent/v3/metrics" "github.com/buildkite/agent/v3/process" "github.com/buildkite/agent/v3/status" + "github.com/buildkite/roko" ) +var ErrNoSignature = errors.New("job had no signature to verify") + +type invalidSignatureError struct { + underlying error +} + +func newInvalidSignatureError(err error) *invalidSignatureError { + return &invalidSignatureError{underlying: err} +} + +func (e *invalidSignatureError) Error() string { + return fmt.Sprintf("invalid signature: %v", e.underlying) +} + +func (e *invalidSignatureError) Unwrap() error { + return e.underlying +} + +func (e *invalidSignatureError) Is(target error) bool { + _, ok := target.(*invalidSignatureError) + return ok +} + // Runs the job func (r *JobRunner) Run(ctx context.Context) error { r.logger.Info("Starting job %s", r.conf.Job.ID) @@ -23,6 +50,19 @@ func (r *JobRunner) Run(ctx context.Context) error { r.startedAt = time.Now() + var verifier pipeline.Verifier + if r.conf.AgentConfiguration.JobVerificationKeyPath != "" { + verificationKey, err := os.ReadFile(r.conf.AgentConfiguration.JobVerificationKeyPath) + if err != nil { + return fmt.Errorf("failed to read job verification key: %w", err) + } + + verifier, err = pipeline.NewVerifier("hmac-sha256", verificationKey) + if err != nil { + return fmt.Errorf("failed to create job verifier: %w", err) + } + } + // Start the build in the Buildkite Agent API. This is the first thing // we do so if it fails, we don't have to worry about cleaning things // up like started log streamer workers, and so on. @@ -62,6 +102,56 @@ func (r *JobRunner) Run(ctx context.Context) error { r.cleanup(ctx, wg, exit) }(ctx, &wg) // Note the non-cancellable context (ctx rather than cctx) here - we don't want to be interrupted during cleanup + err := r.verifyJob(verifier) + switch { + case err == nil: // all good! keep going + case errors.Is(err, ErrNoSignature): + switch r.JobVerificationNoSignatureBehavior { + case JobVerificationBehaviourWarn: + r.logger.Warn("Job verification failed: %s", err.Error()) + r.logger.Warn("Job will be run without verification - this is not recommended. You can change this behavior with the `job-verification-no-signature-behavior` agent configuration option.") + + r.logStreamer.Process([]byte(fmt.Sprintf("⚠️ WARNING: Job verification failed: %s\n", err.Error()))) + r.logStreamer.Process([]byte("⚠️ WARNING: Job will be run without verification\n")) + + case JobVerificationBehaviourBlock: + r.logger.Warn("Job verification failed: %s", err.Error()) + r.logStreamer.Process([]byte(fmt.Sprintf("⚠️ WARNING: Job verification failed: %s\n", err.Error()))) + exit.Status = -1 + exit.SignalReason = "job_verification_failed_no_signature" + + return nil + } + + case errors.Is(err, &invalidSignatureError{}): + fmt.Println("IT's AN INVALID SIGNATURE ERROR") + switch r.JobVerificationInvalidSignatureBehavior { + case JobVerificationBehaviourWarn: + r.logger.Warn("Job verification failed with invalid signature: %s", err.Error()) + r.logger.Warn("Job will be run without verification - this is not recommended. You can change this behavior with the `job-verification-invalid-signature-behavior` agent configuration option.") + + r.logStreamer.Process([]byte(fmt.Sprintf("⚠️ WARNING: Job verification failed with invalid signature: %s\n", err.Error()))) + r.logStreamer.Process([]byte("⚠️ WARNING: Job will be run without verification\n")) + + case JobVerificationBehaviourBlock: + r.logger.Warn("Job verification failed: %s", err.Error()) + r.logStreamer.Process([]byte(fmt.Sprintf("⚠️ WARNING: Job verification failed: %s\n", err.Error()))) + exit.Status = -1 + exit.SignalReason = "job_verification_failed_invalid_signature" + + return nil + } + + default: // some other error + r.logger.Warn("Job verification failed with error: %s", err.Error()) + r.logStreamer.Process([]byte(fmt.Sprintf("⚠️ WARNING: Job verification failed with error: %s\n", err.Error()))) + exit.Status = -1 + exit.SignalReason = "job_verification_failed_with_error" + + return nil + + } + // Before executing the bootstrap process with the received Job env, execute the pre-bootstrap hook (if present) for // it to tell us whether it is happy to proceed. if hook, _ := hook.Find(r.conf.AgentConfiguration.HooksPath, "pre-bootstrap"); hook != "" { @@ -85,7 +175,52 @@ func (r *JobRunner) Run(ctx context.Context) error { go r.jobLogStreamer(cctx, &wg) go r.jobCancellationChecker(cctx, &wg) - exit = r.runJob(cctx) // Ignore gostaticcheck here, the value of exit is captured by the deferred cleanup function above + exit = r.runJob(cctx) + + return nil +} + +func (r *JobRunner) verifyJob(verifier pipeline.Verifier) error { + step := r.conf.Job.Step + + if _, ok := step.RemainingFields["matrix"]; ok { + r.logger.Warn("Signing/Verification of matrix jobs is not currently supported") + r.logger.Warn("Watch this space 👀") + + return nil + } + + if step.Signature != nil && verifier == nil { + return fmt.Errorf("job %s was signed with signature %s, but no verification key was provided, so the job can't be verified", r.conf.Job.ID, step.Signature.Value) + } + + if step.Signature == nil { + return ErrNoSignature + } + + // Verify the signature + if err := step.Signature.Verify(&step, verifier); err != nil { + return newInvalidSignatureError(err) + } + + // Now that the signature of the job's step is verified, we need to check if the fields on the job match those on the + // step. If they don't, we need to fail the job + signedFields := step.Signature.SignedFields + jobFields, err := r.conf.Job.ValuesForFields(signedFields) + if err != nil { + return fmt.Errorf("failed to get values for fields %v on job %s: %w", signedFields, r.conf.Job.ID, err) + } + + stepFields, err := step.ValuesForFields(signedFields) + if err != nil { + return fmt.Errorf("failed to get values for fields %v on step: %w", signedFields, err) + } + + for _, field := range signedFields { + if jobFields[field] != stepFields[field] { + return newInvalidSignatureError(fmt.Errorf("job %s was signed with signature %s, but the value of field %s on the job (%s) does not match the value of the field on the step (%s)", r.conf.Job.ID, step.Signature.Value, field, jobFields[field], stepFields[field])) + } + } return nil } @@ -187,6 +322,89 @@ func (r *JobRunner) cleanup(ctx context.Context, wg *sync.WaitGroup, exit proces r.logger.Info("Finished job %s", r.conf.Job.ID) } +// finishJob finishes the job in the Buildkite Agent API. If the FinishJob call +// cannot return successfully, this will retry for a long time. +func (r *JobRunner) finishJob(ctx context.Context, finishedAt time.Time, exit processExit, failedChunkCount int) error { + r.conf.Job.FinishedAt = finishedAt.UTC().Format(time.RFC3339Nano) + r.conf.Job.ExitStatus = strconv.Itoa(exit.Status) + r.conf.Job.Signal = exit.Signal + r.conf.Job.SignalReason = exit.SignalReason + r.conf.Job.ChunksFailedCount = failedChunkCount + + r.logger.Debug("[JobRunner] Finishing job with exit_status=%s, signal=%s and signal_reason=%s", + r.conf.Job.ExitStatus, r.conf.Job.Signal, r.conf.Job.SignalReason) + + ctx, cancel := context.WithTimeout(ctx, 48*time.Hour) + defer cancel() + + return roko.NewRetrier( + roko.TryForever(), + roko.WithJitter(), + roko.WithStrategy(roko.Constant(1*time.Second)), + ).DoWithContext(ctx, func(retrier *roko.Retrier) error { + response, err := r.apiClient.FinishJob(ctx, r.conf.Job) + if err != nil { + // If the API returns with a 422, that means that we + // succesfully tried to finish the job, but Buildkite + // rejected the finish for some reason. This can + // sometimes mean that Buildkite has cancelled the job + // before we get a chance to send the final API call + // (maybe this agent took too long to kill the + // process). In that case, we don't want to keep trying + // to finish the job forever so we'll just bail out and + // go find some more work to do. + if response != nil && response.StatusCode == 422 { + r.logger.Warn("Buildkite rejected the call to finish the job (%s)", err) + retrier.Break() + } else { + r.logger.Warn("%s (%s)", err, retrier) + } + } + + return err + }) +} + +// jobLogStreamer waits for the process to start, then grabs the job output +// every few seconds and sends it back to Buildkite. +func (r *JobRunner) jobLogStreamer(ctx context.Context, wg *sync.WaitGroup) { + ctx, setStat, done := status.AddSimpleItem(ctx, "Job Log Streamer") + defer done() + setStat("🏃 Starting...") + + defer func() { + wg.Done() + r.logger.Debug("[JobRunner] Routine that processes the log has finished") + }() + + select { + case <-r.process.Started(): + case <-ctx.Done(): + return + } + + for { + setStat("📨 Sending process output to log streamer") + + // Send the output of the process to the log streamer + // for processing + r.logStreamer.Process(r.output.ReadAndTruncate()) + + setStat("😴 Sleeping for a bit") + + // Sleep for a bit, or until the job is finished + select { + case <-time.After(1 * time.Second): + case <-ctx.Done(): + return + case <-r.process.Done(): + return + } + } + + // The final output after the process has finished is processed in Run(). +} + func (r *JobRunner) CancelAndStop() error { r.cancelLock.Lock() r.stopped = true diff --git a/api/jobs.go b/api/jobs.go index e9e5ec02f7..7cd2c8b4d9 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -9,20 +9,34 @@ import ( // Job represents a Buildkite Agent API Job type Job struct { - ID string `json:"id,omitempty"` - Endpoint string `json:"endpoint"` - State string `json:"state,omitempty"` - Env map[string]string `json:"env,omitempty"` - Step *pipeline.CommandStep `json:"step,omitempty"` - ChunksMaxSizeBytes int `json:"chunks_max_size_bytes,omitempty"` - Token string `json:"token,omitempty"` - ExitStatus string `json:"exit_status,omitempty"` - Signal string `json:"signal,omitempty"` - SignalReason string `json:"signal_reason,omitempty"` - StartedAt string `json:"started_at,omitempty"` - FinishedAt string `json:"finished_at,omitempty"` - RunnableAt string `json:"runnable_at,omitempty"` - ChunksFailedCount int `json:"chunks_failed_count,omitempty"` + ID string `json:"id,omitempty"` + Endpoint string `json:"endpoint"` + State string `json:"state,omitempty"` + Env map[string]string `json:"env,omitempty"` + Step pipeline.CommandStep `json:"step,omitempty"` + ChunksMaxSizeBytes int `json:"chunks_max_size_bytes,omitempty"` + Token string `json:"token,omitempty"` + ExitStatus string `json:"exit_status,omitempty"` + Signal string `json:"signal,omitempty"` + SignalReason string `json:"signal_reason,omitempty"` + StartedAt string `json:"started_at,omitempty"` + FinishedAt string `json:"finished_at,omitempty"` + RunnableAt string `json:"runnable_at,omitempty"` + ChunksFailedCount int `json:"chunks_failed_count,omitempty"` +} + +func (j Job) ValuesForFields(fields []string) (map[string]string, error) { + o := make(map[string]string, len(fields)) + for _, f := range fields { + switch f { + case "command": + o[f] = j.Env["BUILDKITE_COMMAND"] + default: + return nil, fmt.Errorf("unknown or unsupported field on Job struct for signing/verification: %q", f) + } + } + + return o, nil } type JobState struct { diff --git a/clicommand/agent_start.go b/clicommand/agent_start.go index 590b2bd5e9..5e2492778b 100644 --- a/clicommand/agent_start.go +++ b/clicommand/agent_start.go @@ -34,6 +34,7 @@ import ( "github.com/mitchellh/go-homedir" "github.com/urfave/cli" "golang.org/x/exp/maps" + "golang.org/x/exp/slices" ) const startDescription = `Usage: @@ -53,6 +54,8 @@ Example: $ buildkite-agent start --token xxx` +var noSignatureBehaviors = []string{"warn", "block"} + // Adding config requires changes in a few different spots // - The AgentStartConfig struct with a cli parameter // - As a flag in the AgentStartCommand (with matching env) @@ -69,6 +72,10 @@ type AgentStartConfig struct { RedactedVars []string `cli:"redacted-vars" normalize:"list"` CancelSignal string `cli:"cancel-signal"` + JobVerificationKeyPath string `cli:"job-verification-key-path" normalize:"filepath"` + JobVerificationNoSignatureBehavior string `cli:"job-verification-no-signature-behavior"` + JobVerificationInvalidSignatureBehavior string `cli:"job-verification-invalid-signature-behavior"` + AcquireJob string `cli:"acquire-job"` DisconnectAfterJob bool `cli:"disconnect-after-job"` DisconnectAfterIdleTimeout int `cli:"disconnect-after-idle-timeout"` @@ -578,6 +585,21 @@ var AgentStartCommand = cli.Command{ EnvVar: "BUILDKITE_TRACING_SERVICE_NAME", Value: "buildkite-agent", }, + cli.StringFlag{ + Name: "job-verification-key-path", + Usage: "Path to a file containing a verification key. Passing this flag enables job verification. For hmac-sha256, the raw file content is used as the shared key", + EnvVar: "BUILDKITE_AGENT_JOB_VERIFICATION_KEY_PATH", + }, + cli.StringFlag{ + Name: "job-verification-no-signature-behavior", + Usage: fmt.Sprintf("The behavior when a job is received without a signature. One of: % #v", noSignatureBehaviors), + EnvVar: "BUILDKITE_AGENT_JOB_VERIFICATION_NO_SIGNATURE_BEHAVIOR", + }, + cli.StringFlag{ + Name: "job-verification-invalid-signature-behavior", + Usage: fmt.Sprintf("The behavior when a job is received, and the signature calculated is different from the one specified. One of: % #v", noSignatureBehaviors), + EnvVar: "BUILDKITE_AGENT_JOB_VERIFICATION_INVALID_SIGNATURE_BEHAVIOR", + }, // API Flags AgentRegisterTokenFlag, @@ -680,6 +702,16 @@ var AgentStartCommand = cli.Command{ os.Exit(1) } + if cfg.JobVerificationKeyPath != "" { + if !slices.Contains(noSignatureBehaviors, cfg.JobVerificationNoSignatureBehavior) { + l.Fatal("Invalid job verification no signature behavior %q. Must be one of: %v", cfg.JobVerificationNoSignatureBehavior, noSignatureBehaviors) + } + + if !slices.Contains(noSignatureBehaviors, cfg.JobVerificationInvalidSignatureBehavior) { + l.Fatal("Invalid job verification invalid signature behavior %q. Must be one of: %v", cfg.JobVerificationInvalidSignatureBehavior, noSignatureBehaviors) + } + } + // Force some settings if on Windows (these aren't supported yet) if runtime.GOOS == "windows" { cfg.NoPTY = true @@ -820,6 +852,7 @@ var AgentStartCommand = cli.Command{ AcquireJob: cfg.AcquireJob, TracingBackend: cfg.TracingBackend, TracingServiceName: cfg.TracingServiceName, + JobVerificationKeyPath: cfg.JobVerificationKeyPath, } if loader.File != nil { diff --git a/internal/pipeline/command_step.go b/internal/pipeline/command_step.go new file mode 100644 index 0000000000..4064b30e94 --- /dev/null +++ b/internal/pipeline/command_step.go @@ -0,0 +1,131 @@ +package pipeline + +import ( + "errors" + "fmt" + "strings" + + "github.com/buildkite/agent/v3/internal/ordered" + "github.com/buildkite/interpolate" +) + +var _ SignedFielder = (*CommandStep)(nil) + +var ( + ErrNoSignature = errors.New("no signature to verify") +) + +// CommandStep models a command step. +// +// Standard caveats apply - see the package comment. +type CommandStep struct { + Command string `yaml:"command"` + Plugins Plugins `yaml:"plugins,omitempty"` + Signature *Signature `yaml:"signature,omitempty"` + + // RemainingFields stores any other top-level mapping items so they at least + // survive an unmarshal-marshal round-trip. + RemainingFields map[string]any `yaml:",inline"` +} + +// MarshalJSON marshals the step to JSON. Special handling is needed because +// yaml.v3 has "inline" but encoding/json has no concept of it. +func (c *CommandStep) MarshalJSON() ([]byte, error) { + return inlineFriendlyMarshalJSON(c) +} + +// unmarshalMap unmarshals a command step from an ordered map. +func (c *CommandStep) unmarshalMap(m *ordered.MapSA) error { + return m.Range(func(k string, v any) error { + switch k { + case "command", "commands": + // command and commands are aliases for the same thing, which can be + // either one big string or a sequence of strings. + // So we need to act as though we are unmarshaling either string or + // []string (but with type []any). + switch x := v.(type) { + case []any: + cmds := make([]string, 0, len(x)) + for _, cx := range x { + cmds = append(cmds, fmt.Sprint(cx)) + } + // Normalise cmds into one single command string. + // This makes signing easier later on - it's easier to hash one + // string consistently than it is to pick apart multiple strings + // in a consistent way in order to hash all of them + // consistently. + c.Command = strings.Join(cmds, "\n") + + case string: + c.Command = x + + default: + // Some weird-looking command that's not a string... + c.Command = fmt.Sprint(x) + } + + case "plugins": + if err := c.Plugins.unmarshalAny(v); err != nil { + return fmt.Errorf("unmarshaling plugins: %w", err) + } + + case "signature": + sig := new(Signature) + if err := sig.unmarshalAny(v); err != nil { + return fmt.Errorf("unmarshaling signature: %w", err) + } + c.Signature = sig + + default: + // Preserve any other key. + if c.RemainingFields == nil { + c.RemainingFields = make(map[string]any) + } + c.RemainingFields[k] = v + } + + return nil + }) +} + +// SignedFields returns the default fields for signing. +func (c *CommandStep) SignedFields() map[string]string { + return map[string]string{ + "command": c.Command, + } +} + +// ValuesForFields returns the contents of fields to sign. +func (c *CommandStep) ValuesForFields(fields []string) (map[string]string, error) { + out := make(map[string]string, len(fields)) + for _, f := range fields { + switch f { + case "command": + out["command"] = c.Command + default: + return nil, fmt.Errorf("unknown or unsupported field for signing %q", f) + } + } + if _, ok := out["command"]; !ok { + return nil, errors.New("command is required for signature verification") + } + return out, nil +} + +func (c *CommandStep) interpolate(env interpolate.Env) error { + cmd, err := interpolate.Interpolate(env, c.Command) + if err != nil { + return err + } + if err := interpolateSlice(env, c.Plugins); err != nil { + return err + } + // NB: Do not interpolate Signature. + if err := interpolateMap(env, c.RemainingFields); err != nil { + return err + } + c.Command = cmd + return nil +} + +func (CommandStep) stepTag() {} diff --git a/internal/pipeline/step.go b/internal/pipeline/step.go index 9e6b36ad19..4a32d59b85 100644 --- a/internal/pipeline/step.go +++ b/internal/pipeline/step.go @@ -2,16 +2,12 @@ package pipeline import ( "encoding/json" - "errors" "fmt" - "strings" "github.com/buildkite/agent/v3/internal/ordered" "github.com/buildkite/interpolate" ) -var _ SignedFielder = (*CommandStep)(nil) - // Step models a step in the pipeline. It will be a pointer to one of: // - CommandStep // - WaitStep @@ -29,121 +25,6 @@ type Step interface { selfInterpolater } -// CommandStep models a command step. -// -// Standard caveats apply - see the package comment. -type CommandStep struct { - Command string `yaml:"command"` - Plugins Plugins `yaml:"plugins,omitempty"` - Signature *Signature `yaml:"signature,omitempty"` - - // RemainingFields stores any other top-level mapping items so they at least - // survive an unmarshal-marshal round-trip. - RemainingFields map[string]any `yaml:",inline"` -} - -// MarshalJSON marshals the step to JSON. Special handling is needed because -// yaml.v3 has "inline" but encoding/json has no concept of it. -func (c *CommandStep) MarshalJSON() ([]byte, error) { - return inlineFriendlyMarshalJSON(c) -} - -// unmarshalMap unmarshals a command step from an ordered map. -func (c *CommandStep) unmarshalMap(m *ordered.MapSA) error { - return m.Range(func(k string, v any) error { - switch k { - case "command", "commands": - // command and commands are aliases for the same thing, which can be - // either one big string or a sequence of strings. - // So we need to act as though we are unmarshaling either string or - // []string (but with type []any). - switch x := v.(type) { - case []any: - cmds := make([]string, 0, len(x)) - for _, cx := range x { - cmds = append(cmds, fmt.Sprint(cx)) - } - // Normalise cmds into one single command string. - // This makes signing easier later on - it's easier to hash one - // string consistently than it is to pick apart multiple strings - // in a consistent way in order to hash all of them - // consistently. - c.Command = strings.Join(cmds, "\n") - - case string: - c.Command = x - - default: - // Some weird-looking command that's not a string... - c.Command = fmt.Sprint(x) - } - - case "plugins": - if err := c.Plugins.unmarshalAny(v); err != nil { - return fmt.Errorf("unmarshaling plugins: %w", err) - } - - case "signature": - sig := new(Signature) - if err := sig.unmarshalAny(v); err != nil { - return fmt.Errorf("unmarshaling signature: %w", err) - } - c.Signature = sig - - default: - // Preserve any other key. - if c.RemainingFields == nil { - c.RemainingFields = make(map[string]any) - } - c.RemainingFields[k] = v - } - - return nil - }) -} - -// SignedFields returns the default fields for signing. -func (c *CommandStep) SignedFields() map[string]string { - return map[string]string{ - "command": c.Command, - } -} - -// ValuesForFields returns the contents of fields to sign. -func (c *CommandStep) ValuesForFields(fields []string) (map[string]string, error) { - out := make(map[string]string, len(fields)) - for _, f := range fields { - switch f { - case "command": - out["command"] = c.Command - default: - return nil, fmt.Errorf("unknown or unsupported field for signing %q", f) - } - } - if _, ok := out["command"]; !ok { - return nil, errors.New("command is required for signature verification") - } - return out, nil -} - -func (c *CommandStep) interpolate(env interpolate.Env) error { - cmd, err := interpolate.Interpolate(env, c.Command) - if err != nil { - return err - } - if err := interpolateSlice(env, c.Plugins); err != nil { - return err - } - // NB: Do not interpolate Signature. - if err := interpolateMap(env, c.RemainingFields); err != nil { - return err - } - c.Command = cmd - return nil -} - -func (CommandStep) stepTag() {} - // WaitStep models a wait step. // // Standard caveats apply - see the package comment.