From bfbfdedd5687e84abe98643a5963d31629d7c10a Mon Sep 17 00:00:00 2001 From: Chris Doherty Date: Wed, 5 Apr 2023 20:26:42 -0500 Subject: [PATCH] Add agent business logic Signed-off-by: Chris Doherty --- internal/agent/agent.go | 85 +++++ internal/agent/agent_test.go | 462 ++++++++++++++++++++++++ internal/agent/event/action.go | 53 +++ internal/agent/event/error.go | 14 + internal/agent/event/event.go | 22 ++ internal/agent/event/fake.go | 12 + internal/agent/event/mocks.go | 177 +++++++++ internal/agent/event/workflow.go | 17 + internal/agent/event/zz_from_package.go | 16 + internal/agent/failure/reason.go | 44 +++ internal/agent/mocks.go | 160 ++++++++ internal/agent/run.go | 105 ++++++ internal/agent/runtime.go | 20 + internal/agent/runtime/fake.go | 28 ++ internal/agent/runtime/runtime.go | 7 + internal/agent/transport.go | 15 + internal/agent/transport/fake.go | 35 ++ internal/agent/transport/transport.go | 4 + internal/agent/workflow/handler.go | 15 + internal/agent/workflow/workflow.go | 33 ++ 20 files changed, 1324 insertions(+) create mode 100644 internal/agent/agent.go create mode 100644 internal/agent/agent_test.go create mode 100644 internal/agent/event/action.go create mode 100644 internal/agent/event/error.go create mode 100644 internal/agent/event/event.go create mode 100644 internal/agent/event/fake.go create mode 100644 internal/agent/event/mocks.go create mode 100644 internal/agent/event/workflow.go create mode 100644 internal/agent/event/zz_from_package.go create mode 100644 internal/agent/failure/reason.go create mode 100644 internal/agent/mocks.go create mode 100644 internal/agent/run.go create mode 100644 internal/agent/runtime.go create mode 100644 internal/agent/runtime/fake.go create mode 100644 internal/agent/runtime/runtime.go create mode 100644 internal/agent/transport.go create mode 100644 internal/agent/transport/fake.go create mode 100644 internal/agent/transport/transport.go create mode 100644 internal/agent/workflow/handler.go create mode 100644 internal/agent/workflow/workflow.go diff --git a/internal/agent/agent.go b/internal/agent/agent.go new file mode 100644 index 000000000..273bdeeb3 --- /dev/null +++ b/internal/agent/agent.go @@ -0,0 +1,85 @@ +package agent + +import ( + "context" + "errors" + + "github.com/go-logr/logr" + "github.com/tinkerbell/tink/internal/agent/event" + "github.com/tinkerbell/tink/internal/agent/workflow" +) + +// Agent is the core data structure for handling workflow execution on target nodes. It leverages +// a Transport and a ContainerRuntime to retrieve workflows and execute actions. +// +// The agent runs a single workflow at a time. Concurrent requests to run workflows will have the +// second workflow rejected with an event.WorkflowRejected event. +type Agent struct { + Log logr.Logger + + // ID is the unique identifier for the agent. It is used by the transport to identify workflows + // scheduled for this agent. + ID string + + // Transport is the transport used by the agent for communicating workflows and events. + Transport Transport + + // Runtime is the container runtime used to execute workflow actions. + Runtime ContainerRuntime + + sem chan struct{} +} + +// Start finalizes the Agent configuration and starts the configured Transport so it is ready +// to receive workflows. On receiving a workflow, it will leverage the configured Runtime to +// execute workflow actions. +func (agent *Agent) Start(ctx context.Context) error { + if agent.ID == "" { + return errors.New("ID field must be set before calling Start()") + } + + if agent.Transport == nil { + return errors.New("Transport field must be set before calling Start()") + } + + if agent.Runtime == nil { + //nolint:stylecheck // Specifying field on data structure + return errors.New("Runtime field must be set before calling Start()") + } + + agent.Log = agent.Log.WithValues("agent_id", agent.ID) + + // Initialize the semaphore and add a resource to it ensuring we can run 1 workflow at a time. + agent.sem = make(chan struct{}, 1) + agent.sem <- struct{}{} + + agent.Log.Info("Starting agent") + return agent.Transport.Start(ctx, agent.ID, agent) +} + +// HandleWorkflow satisfies transport. +func (agent *Agent) HandleWorkflow(ctx context.Context, wflw workflow.Workflow, events event.Recorder) error { + // sem isn't protected by a synchronization data structure so this is technically invoking + // undefined behavior. Its also a misuse of the data structure so we're making a best effort + // without adding complexity. + if agent.sem == nil { + return errors.New("Agent must have Start() called before calling HandleWorkflow()") + } + + select { + case <-agent.sem: + // Replenish the semaphore on exit so we can pick up another workflow. + defer func() { agent.sem <- struct{}{} }() + return agent.run(ctx, wflw, events) + + default: + reject := event.WorkflowRejected{ + ID: wflw.ID, + Message: "workflow already in progress", + } + if err := events.RecordEvent(ctx, reject); err != nil { + agent.Log.Info("Failed to record event", logEventKey, reject) + } + return nil + } +} diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go new file mode 100644 index 000000000..77de42e5d --- /dev/null +++ b/internal/agent/agent_test.go @@ -0,0 +1,462 @@ +package agent_test + +import ( + "context" + "strings" + "testing" + + "github.com/go-logr/logr" + "github.com/go-logr/zapr" + "github.com/google/go-cmp/cmp" + "github.com/tinkerbell/tink/internal/agent" + "github.com/tinkerbell/tink/internal/agent/event" + "github.com/tinkerbell/tink/internal/agent/failure" + "github.com/tinkerbell/tink/internal/agent/runtime" + "github.com/tinkerbell/tink/internal/agent/transport" + "github.com/tinkerbell/tink/internal/agent/workflow" + "go.uber.org/zap" +) + +func TestAgent_InvalidStart(t *testing.T) { + cases := []struct { + Name string + Agent agent.Agent + Error string + }{ + { + Name: "MissingAgentID", + Agent: agent.Agent{ + Log: logr.Discard(), + Transport: transport.Noop(), + Runtime: runtime.Noop(), + }, + Error: "ID field must be set before calling Start()", + }, + { + Name: "MissingRuntime", + Agent: agent.Agent{ + Log: logr.Discard(), + ID: "1234", + Transport: transport.Noop(), + }, + Error: "Runtime field must be set before calling Start()", + }, + { + Name: "MissingTransport", + Agent: agent.Agent{ + Log: logr.Discard(), + ID: "1234", + Runtime: runtime.Noop(), + }, + Error: "Transport field must be set before calling Start()", + }, + { + Name: "InitializedCorrectly", + Agent: agent.Agent{ + Log: logr.Discard(), + ID: "1234", + Transport: transport.Noop(), + Runtime: runtime.Noop(), + }, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + err := tc.Agent.Start(context.Background()) + switch { + case tc.Error != "" && err == nil: + t.Fatalf("Expected error '%v' but received none", tc.Error) + case tc.Error != "" && err != nil && !strings.Contains(err.Error(), tc.Error): + t.Fatalf("Expected: %v\n;\nReceived: %v", tc.Error, err) + case tc.Error == "" && err != nil: + t.Fatalf("Received unexpected error: %v", err) + } + }) + } +} + +func TestAgent_ConcurrentWorkflows(t *testing.T) { + // The goal of this test is to ensure the agent rejects concurrent workflows. We have to + // build a valid agent because it will also reject calls to HandleWorkflow without first + // starting the Agent. + + logger := zapr.NewLogger(zap.Must(zap.NewDevelopment())) + + recorder := event.RecorderMock{ + RecordEventFunc: func(contextMoqParam context.Context, event event.Event) error { + return nil + }, + } + + wrkflow := workflow.Workflow{ + ID: "1234", + Actions: []workflow.Action{ + { + ID: "1234", + Name: "name", + Image: "image", + }, + }, + } + + trnport := agent.TransportMock{ + StartFunc: func(ctx context.Context, agentID string, handler workflow.Handler) error { + return handler.HandleWorkflow(ctx, wrkflow, &recorder) + }, + } + + started := make(chan struct{}) + + rntime := agent.ContainerRuntimeMock{ + RunFunc: func(ctx context.Context, action workflow.Action) error { + started <- struct{}{} + <-ctx.Done() + return nil + }, + } + + agnt := agent.Agent{ + Log: logger, + Transport: &trnport, + Runtime: &rntime, + ID: "1234", + } + + // Build a cancellable context so we can tear the goroutine down. + + errs := make(chan error) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { errs <- agnt.Start(ctx) }() + + // Await either an error or the mock runtime to tell us its stated. + select { + case err := <-errs: + t.Fatalf("Unexpected error: %v", err) + case <-started: + } + + // Attempt to fire off another workflow. + err := agnt.HandleWorkflow(context.Background(), wrkflow, &recorder) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Ensure the latest event recorded is a event.WorkflowRejected. + calls := recorder.RecordEventCalls() + if len(calls) == 0 { + t.Fatal("No calls have been made to the event.Recorder") + } + + lastCall := calls[len(calls)-1] + ev, ok := lastCall.Event.(event.WorkflowRejected) + if !ok { + t.Fatalf("Expected event of type event.WorkflowRejected; received %T", ev) + } + + expectEvent := event.WorkflowRejected{ + ID: wrkflow.ID, + Message: "workflow already in progress", + } + if !cmp.Equal(expectEvent, ev) { + t.Fatalf("Received unexpected event:\n%v", cmp.Diff(expectEvent, ev)) + } +} + +func TestAgent_HandlingWorkflows(t *testing.T) { + logger := zapr.NewLogger(zap.Must(zap.NewDevelopment())) + + type ReasonAndMessage struct { + Reason, Message string + } + + cases := []struct { + Name string + Workflow workflow.Workflow + Errors map[string]ReasonAndMessage + Events []event.Event + }{ + { + Name: "SuccessfulWorkflow", + Workflow: workflow.Workflow{ + ID: "1234", + Actions: []workflow.Action{ + { + ID: "1", + Name: "action_1", + Image: "image_1", + }, + { + ID: "2", + Name: "action_2", + Image: "image_2", + }, + }, + }, + Errors: map[string]ReasonAndMessage{}, + Events: []event.Event{ + event.ActionStarted{ + WorkflowID: "1234", + ActionID: "1", + }, + event.ActionSucceeded{ + WorkflowID: "1234", + ActionID: "1", + }, + event.ActionStarted{ + WorkflowID: "1234", + ActionID: "2", + }, + event.ActionSucceeded{ + WorkflowID: "1234", + ActionID: "2", + }, + }, + }, + { + Name: "LastActionFails", + Workflow: workflow.Workflow{ + ID: "1234", + Actions: []workflow.Action{ + { + ID: "1", + Name: "action_1", + Image: "image_1", + }, + { + ID: "2", + Name: "action_2", + Image: "image_2", + }, + }, + }, + Errors: map[string]ReasonAndMessage{ + "2": { + Reason: "TestReason", + Message: "test message", + }, + }, + Events: []event.Event{ + event.ActionStarted{ + WorkflowID: "1234", + ActionID: "1", + }, + event.ActionSucceeded{ + WorkflowID: "1234", + ActionID: "1", + }, + event.ActionStarted{ + WorkflowID: "1234", + ActionID: "2", + }, + event.ActionFailed{ + WorkflowID: "1234", + ActionID: "2", + Reason: "TestReason", + Message: "test message", + }, + }, + }, + { + Name: "FirstActionFails", + Workflow: workflow.Workflow{ + ID: "1234", + Actions: []workflow.Action{ + { + ID: "1", + Name: "action_1", + Image: "image_1", + }, + { + ID: "2", + Name: "action_2", + Image: "image_2", + }, + }, + }, + Errors: map[string]ReasonAndMessage{ + "1": { + Reason: "TestReason", + Message: "test message", + }, + }, + Events: []event.Event{ + event.ActionStarted{ + WorkflowID: "1234", + ActionID: "1", + }, + event.ActionFailed{ + WorkflowID: "1234", + ActionID: "1", + Reason: "TestReason", + Message: "test message", + }, + }, + }, + { + Name: "MiddleActionFails", + Workflow: workflow.Workflow{ + ID: "1234", + Actions: []workflow.Action{ + { + ID: "1", + Name: "action_1", + Image: "image_1", + }, + { + ID: "2", + Name: "action_2", + Image: "image_2", + }, + { + ID: "3", + Name: "action_3", + Image: "image_3", + }, + }, + }, + Errors: map[string]ReasonAndMessage{ + "2": { + Reason: "TestReason", + Message: "test message", + }, + }, + Events: []event.Event{ + event.ActionStarted{ + WorkflowID: "1234", + ActionID: "1", + }, + event.ActionSucceeded{ + WorkflowID: "1234", + ActionID: "1", + }, + event.ActionStarted{ + WorkflowID: "1234", + ActionID: "2", + }, + event.ActionFailed{ + WorkflowID: "1234", + ActionID: "2", + Reason: "TestReason", + Message: "test message", + }, + }, + }, + { + Name: "InvalidReason", + Workflow: workflow.Workflow{ + ID: "1234", + Actions: []workflow.Action{ + { + ID: "1", + Name: "action_1", + Image: "image_1", + }, + }, + }, + Errors: map[string]ReasonAndMessage{ + "1": { + Reason: "this is an invalid reason format", + Message: "test message", + }, + }, + Events: []event.Event{ + event.ActionStarted{ + WorkflowID: "1234", + ActionID: "1", + }, + event.ActionFailed{ + WorkflowID: "1234", + ActionID: "1", + Reason: "InvalidReason", + Message: "test message", + }, + }, + }, + { + Name: "InvalidMessage", + Workflow: workflow.Workflow{ + ID: "1234", + Actions: []workflow.Action{ + { + ID: "1", + Name: "action_1", + Image: "image_1", + }, + }, + }, + Errors: map[string]ReasonAndMessage{ + "1": { + Reason: "TestReason", + Message: `invalid +message`, + }, + }, + Events: []event.Event{ + event.ActionStarted{ + WorkflowID: "1234", + ActionID: "1", + }, + event.ActionFailed{ + WorkflowID: "1234", + ActionID: "1", + Reason: "TestReason", + Message: `invalid \nmessage`, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + recorder := event.RecorderMock{ + RecordEventFunc: func(contextMoqParam context.Context, event event.Event) error { + return nil + }, + } + + trnport := agent.TransportMock{ + StartFunc: func(ctx context.Context, agentID string, handler workflow.Handler) error { + return handler.HandleWorkflow(ctx, tc.Workflow, &recorder) + }, + } + + rntime := agent.ContainerRuntimeMock{ + RunFunc: func(ctx context.Context, action workflow.Action) error { + if res, ok := tc.Errors[action.ID]; ok { + return failure.WithReason(res.Message, res.Reason) + } + return nil + }, + } + + agnt := agent.Agent{ + Log: logger, + Transport: &trnport, + Runtime: &rntime, + ID: "1234", + } + + err := agnt.Start(context.Background()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + calls := recorder.RecordEventCalls() + if len(calls) != len(tc.Events) { + t.Fatalf("Expected %v events; Received %v\n%+v", len(tc.Events), len(calls), calls) + } + + var receivedEvents []event.Event + for _, call := range calls { + receivedEvents = append(receivedEvents, call.Event) + } + + if !cmp.Equal(tc.Events, receivedEvents) { + t.Fatalf("Did not received expected event set:\n%v", cmp.Diff(tc.Events, receivedEvents)) + } + }) + } +} diff --git a/internal/agent/event/action.go b/internal/agent/event/action.go new file mode 100644 index 000000000..3f2df4f9a --- /dev/null +++ b/internal/agent/event/action.go @@ -0,0 +1,53 @@ +package event + +import "fmt" + +const ( + ActionStartedName Name = "ActionStarted" + ActionSucceededName Name = "ActionSucceeded" + ActionFailedName Name = "ActionFailed" +) + +// ActionStarted occurs when an action begins running. +type ActionStarted struct { + ActionID string + WorkflowID string +} + +func (ActionStarted) GetName() Name { + return ActionStartedName +} + +func (e ActionStarted) String() string { + return fmt.Sprintf("workflow=%v action=%v", e.WorkflowID, e.ActionID) +} + +// ActionSucceeded occurs when an action successfully completes. +type ActionSucceeded struct { + ActionID string + WorkflowID string +} + +func (ActionSucceeded) GetName() Name { + return ActionSucceededName +} + +func (e ActionSucceeded) String() string { + return fmt.Sprintf("workflow=%v action=%v", e.WorkflowID, e.ActionID) +} + +// ActionFailed occurs when an action fails to complete. +type ActionFailed struct { + ActionID string + WorkflowID string + Reason string + Message string +} + +func (ActionFailed) GetName() Name { + return ActionFailedName +} + +func (e ActionFailed) String() string { + return fmt.Sprintf("workflow=%v action=%v reason=%v", e.WorkflowID, e.ActionID, e.Reason) +} diff --git a/internal/agent/event/error.go b/internal/agent/event/error.go new file mode 100644 index 000000000..6aad1836b --- /dev/null +++ b/internal/agent/event/error.go @@ -0,0 +1,14 @@ +package event + +import ( + "fmt" +) + +// IncompatibleError indicates an event was received that. +type IncompatibleError struct { + Event Event +} + +func (e IncompatibleError) Error() string { + return fmt.Sprintf("incompatible event: %v", e.Event.GetName()) +} diff --git a/internal/agent/event/event.go b/internal/agent/event/event.go new file mode 100644 index 000000000..8f53fe60f --- /dev/null +++ b/internal/agent/event/event.go @@ -0,0 +1,22 @@ +// Package event describes the event set and an interface for recording events. Events are +// generated as workflows execute. +package event + +import "context" + +// Name is a unique name identifying an event. +type Name string + +// Event is a recordable event. +type Event interface { + // GetName retrieves the event name. + GetName() Name + + // Force events to reside in this package - see zz_known.go. + isEventFromThisPackage() +} + +// Recorder records events generated from running a Workflow. +type Recorder interface { + RecordEvent(context.Context, Event) error +} diff --git a/internal/agent/event/fake.go b/internal/agent/event/fake.go new file mode 100644 index 000000000..4fc02d879 --- /dev/null +++ b/internal/agent/event/fake.go @@ -0,0 +1,12 @@ +package event + +import "context" + +// NoopRecorder retrieves a nooping fake recorder. +func NoopRecorder() *RecorderMock { + return &RecorderMock{ + RecordEventFunc: func(context.Context, Event) error { + return nil + }, + } +} diff --git a/internal/agent/event/mocks.go b/internal/agent/event/mocks.go new file mode 100644 index 000000000..10688a68a --- /dev/null +++ b/internal/agent/event/mocks.go @@ -0,0 +1,177 @@ +// Code generated by moq; DO NOT EDIT. +// github.com/matryer/moq + +package event + +import ( + "context" + "sync" +) + +// Ensure, that RecorderMock does implement Recorder. +// If this is not the case, regenerate this file with moq. +var _ Recorder = &RecorderMock{} + +// RecorderMock is a mock implementation of Recorder. +// +// func TestSomethingThatUsesRecorder(t *testing.T) { +// +// // make and configure a mocked Recorder +// mockedRecorder := &RecorderMock{ +// RecordEventFunc: func(contextMoqParam context.Context, event Event) error { +// panic("mock out the RecordEvent method") +// }, +// } +// +// // use mockedRecorder in code that requires Recorder +// // and then make assertions. +// +// } +type RecorderMock struct { + // RecordEventFunc mocks the RecordEvent method. + RecordEventFunc func(contextMoqParam context.Context, event Event) error + + // calls tracks calls to the methods. + calls struct { + // RecordEvent holds details about calls to the RecordEvent method. + RecordEvent []struct { + // ContextMoqParam is the contextMoqParam argument value. + ContextMoqParam context.Context + // Event is the event argument value. + Event Event + } + } + lockRecordEvent sync.RWMutex +} + +// RecordEvent calls RecordEventFunc. +func (mock *RecorderMock) RecordEvent(contextMoqParam context.Context, event Event) error { + if mock.RecordEventFunc == nil { + panic("RecorderMock.RecordEventFunc: method is nil but Recorder.RecordEvent was just called") + } + callInfo := struct { + ContextMoqParam context.Context + Event Event + }{ + ContextMoqParam: contextMoqParam, + Event: event, + } + mock.lockRecordEvent.Lock() + mock.calls.RecordEvent = append(mock.calls.RecordEvent, callInfo) + mock.lockRecordEvent.Unlock() + return mock.RecordEventFunc(contextMoqParam, event) +} + +// RecordEventCalls gets all the calls that were made to RecordEvent. +// Check the length with: +// +// len(mockedRecorder.RecordEventCalls()) +func (mock *RecorderMock) RecordEventCalls() []struct { + ContextMoqParam context.Context + Event Event +} { + var calls []struct { + ContextMoqParam context.Context + Event Event + } + mock.lockRecordEvent.RLock() + calls = mock.calls.RecordEvent + mock.lockRecordEvent.RUnlock() + return calls +} + +// Ensure, that EventMock does implement Event. +// If this is not the case, regenerate this file with moq. +var _ Event = &EventMock{} + +// EventMock is a mock implementation of Event. +// +// func TestSomethingThatUsesEvent(t *testing.T) { +// +// // make and configure a mocked Event +// mockedEvent := &EventMock{ +// GetNameFunc: func() Name { +// panic("mock out the GetName method") +// }, +// isEventFromThisPackageFunc: func() { +// panic("mock out the isEventFromThisPackage method") +// }, +// } +// +// // use mockedEvent in code that requires Event +// // and then make assertions. +// +// } +type EventMock struct { + // GetNameFunc mocks the GetName method. + GetNameFunc func() Name + + // isEventFromThisPackageFunc mocks the isEventFromThisPackage method. + isEventFromThisPackageFunc func() + + // calls tracks calls to the methods. + calls struct { + // GetName holds details about calls to the GetName method. + GetName []struct { + } + // isEventFromThisPackage holds details about calls to the isEventFromThisPackage method. + isEventFromThisPackage []struct { + } + } + lockGetName sync.RWMutex + lockisEventFromThisPackage sync.RWMutex +} + +// GetName calls GetNameFunc. +func (mock *EventMock) GetName() Name { + if mock.GetNameFunc == nil { + panic("EventMock.GetNameFunc: method is nil but Event.GetName was just called") + } + callInfo := struct { + }{} + mock.lockGetName.Lock() + mock.calls.GetName = append(mock.calls.GetName, callInfo) + mock.lockGetName.Unlock() + return mock.GetNameFunc() +} + +// GetNameCalls gets all the calls that were made to GetName. +// Check the length with: +// +// len(mockedEvent.GetNameCalls()) +func (mock *EventMock) GetNameCalls() []struct { +} { + var calls []struct { + } + mock.lockGetName.RLock() + calls = mock.calls.GetName + mock.lockGetName.RUnlock() + return calls +} + +// isEventFromThisPackage calls isEventFromThisPackageFunc. +func (mock *EventMock) isEventFromThisPackage() { + if mock.isEventFromThisPackageFunc == nil { + panic("EventMock.isEventFromThisPackageFunc: method is nil but Event.isEventFromThisPackage was just called") + } + callInfo := struct { + }{} + mock.lockisEventFromThisPackage.Lock() + mock.calls.isEventFromThisPackage = append(mock.calls.isEventFromThisPackage, callInfo) + mock.lockisEventFromThisPackage.Unlock() + mock.isEventFromThisPackageFunc() +} + +// isEventFromThisPackageCalls gets all the calls that were made to isEventFromThisPackage. +// Check the length with: +// +// len(mockedEvent.isEventFromThisPackageCalls()) +func (mock *EventMock) isEventFromThisPackageCalls() []struct { +} { + var calls []struct { + } + mock.lockisEventFromThisPackage.RLock() + calls = mock.calls.isEventFromThisPackage + mock.lockisEventFromThisPackage.RUnlock() + return calls +} diff --git a/internal/agent/event/workflow.go b/internal/agent/event/workflow.go new file mode 100644 index 000000000..1eb53b3fa --- /dev/null +++ b/internal/agent/event/workflow.go @@ -0,0 +1,17 @@ +package event + +const WorkflowRejectedName Name = "WorkflowRejected" + +// WorkflowRejected is generated when a workflow is being rejected by the agent. +type WorkflowRejected struct { + ID string + Message string +} + +func (WorkflowRejected) GetName() Name { + return WorkflowRejectedName +} + +func (e WorkflowRejected) String() string { + return e.Message +} diff --git a/internal/agent/event/zz_from_package.go b/internal/agent/event/zz_from_package.go new file mode 100644 index 000000000..b6cd10291 --- /dev/null +++ b/internal/agent/event/zz_from_package.go @@ -0,0 +1,16 @@ +package event + +// We want to force events to reside in this package so its clear what events are usable +// with by agent code. We achieve this using a compile time check that ensures all events +// implement an unexported method on the Event interface which is the interface passed around +// by event handling code. +// +// This source file should not contain methods other than the isEventFromThisPackage(). +// +// This code is hand written. + +func (ActionStarted) isEventFromThisPackage() {} +func (ActionSucceeded) isEventFromThisPackage() {} +func (ActionFailed) isEventFromThisPackage() {} + +func (WorkflowRejected) isEventFromThisPackage() {} diff --git a/internal/agent/failure/reason.go b/internal/agent/failure/reason.go new file mode 100644 index 000000000..a302f8c65 --- /dev/null +++ b/internal/agent/failure/reason.go @@ -0,0 +1,44 @@ +package failure + +import "errors" + +// Reason extracts a failure reason from err. err has a reason if it satisfies the failure reason +// interface: +// +// interface { +// FailureReason() string +// } +// +// If err does not have a reason or FailureReason() returns an empty string, ReasonUnknown is +// returned. +func Reason(err error) (string, bool) { + fr, ok := err.(interface { + FailureReason() string + }) + + if !ok || fr.FailureReason() == "" { + return "", false + } + + return fr.FailureReason(), true +} + +// WrapWithReason decorates err with reason. The reason can be extracted using Reason(). +func WrapWithReason(err error, reason string) error { + return withReason{err, reason} +} + +// WithReason creates a new error using message and wraps it with reason. The reason can be +// extracted using Reason(). +func WithReason(message, reason string) error { + return WrapWithReason(errors.New(message), reason) +} + +type withReason struct { + error + reason string +} + +func (e withReason) FailureReason() string { + return e.reason +} diff --git a/internal/agent/mocks.go b/internal/agent/mocks.go new file mode 100644 index 000000000..f7b8ddb6d --- /dev/null +++ b/internal/agent/mocks.go @@ -0,0 +1,160 @@ +// Code generated by moq; DO NOT EDIT. +// github.com/matryer/moq + +package agent + +import ( + "context" + "github.com/tinkerbell/tink/internal/agent/workflow" + "sync" +) + +// Ensure, that TransportMock does implement Transport. +// If this is not the case, regenerate this file with moq. +var _ Transport = &TransportMock{} + +// TransportMock is a mock implementation of Transport. +// +// func TestSomethingThatUsesTransport(t *testing.T) { +// +// // make and configure a mocked Transport +// mockedTransport := &TransportMock{ +// StartFunc: func(contextMoqParam context.Context, agentID string, handler workflow.Handler) error { +// panic("mock out the Start method") +// }, +// } +// +// // use mockedTransport in code that requires Transport +// // and then make assertions. +// +// } +type TransportMock struct { + // StartFunc mocks the Start method. + StartFunc func(contextMoqParam context.Context, agentID string, handler workflow.Handler) error + + // calls tracks calls to the methods. + calls struct { + // Start holds details about calls to the Start method. + Start []struct { + // ContextMoqParam is the contextMoqParam argument value. + ContextMoqParam context.Context + // AgentID is the agentID argument value. + AgentID string + // Handler is the handler argument value. + Handler workflow.Handler + } + } + lockStart sync.RWMutex +} + +// Start calls StartFunc. +func (mock *TransportMock) Start(contextMoqParam context.Context, agentID string, handler workflow.Handler) error { + if mock.StartFunc == nil { + panic("TransportMock.StartFunc: method is nil but Transport.Start was just called") + } + callInfo := struct { + ContextMoqParam context.Context + AgentID string + Handler workflow.Handler + }{ + ContextMoqParam: contextMoqParam, + AgentID: agentID, + Handler: handler, + } + mock.lockStart.Lock() + mock.calls.Start = append(mock.calls.Start, callInfo) + mock.lockStart.Unlock() + return mock.StartFunc(contextMoqParam, agentID, handler) +} + +// StartCalls gets all the calls that were made to Start. +// Check the length with: +// +// len(mockedTransport.StartCalls()) +func (mock *TransportMock) StartCalls() []struct { + ContextMoqParam context.Context + AgentID string + Handler workflow.Handler +} { + var calls []struct { + ContextMoqParam context.Context + AgentID string + Handler workflow.Handler + } + mock.lockStart.RLock() + calls = mock.calls.Start + mock.lockStart.RUnlock() + return calls +} + +// Ensure, that ContainerRuntimeMock does implement ContainerRuntime. +// If this is not the case, regenerate this file with moq. +var _ ContainerRuntime = &ContainerRuntimeMock{} + +// ContainerRuntimeMock is a mock implementation of ContainerRuntime. +// +// func TestSomethingThatUsesContainerRuntime(t *testing.T) { +// +// // make and configure a mocked ContainerRuntime +// mockedContainerRuntime := &ContainerRuntimeMock{ +// RunFunc: func(contextMoqParam context.Context, action workflow.Action) error { +// panic("mock out the Run method") +// }, +// } +// +// // use mockedContainerRuntime in code that requires ContainerRuntime +// // and then make assertions. +// +// } +type ContainerRuntimeMock struct { + // RunFunc mocks the Run method. + RunFunc func(contextMoqParam context.Context, action workflow.Action) error + + // calls tracks calls to the methods. + calls struct { + // Run holds details about calls to the Run method. + Run []struct { + // ContextMoqParam is the contextMoqParam argument value. + ContextMoqParam context.Context + // Action is the action argument value. + Action workflow.Action + } + } + lockRun sync.RWMutex +} + +// Run calls RunFunc. +func (mock *ContainerRuntimeMock) Run(contextMoqParam context.Context, action workflow.Action) error { + if mock.RunFunc == nil { + panic("ContainerRuntimeMock.RunFunc: method is nil but ContainerRuntime.Run was just called") + } + callInfo := struct { + ContextMoqParam context.Context + Action workflow.Action + }{ + ContextMoqParam: contextMoqParam, + Action: action, + } + mock.lockRun.Lock() + mock.calls.Run = append(mock.calls.Run, callInfo) + mock.lockRun.Unlock() + return mock.RunFunc(contextMoqParam, action) +} + +// RunCalls gets all the calls that were made to Run. +// Check the length with: +// +// len(mockedContainerRuntime.RunCalls()) +func (mock *ContainerRuntimeMock) RunCalls() []struct { + ContextMoqParam context.Context + Action workflow.Action +} { + var calls []struct { + ContextMoqParam context.Context + Action workflow.Action + } + mock.lockRun.RLock() + calls = mock.calls.Run + mock.lockRun.RUnlock() + return calls +} diff --git a/internal/agent/run.go b/internal/agent/run.go new file mode 100644 index 000000000..f486f4a84 --- /dev/null +++ b/internal/agent/run.go @@ -0,0 +1,105 @@ +package agent + +import ( + "context" + "regexp" + "strings" + "time" + + "github.com/tinkerbell/tink/internal/agent/event" + "github.com/tinkerbell/tink/internal/agent/failure" + "github.com/tinkerbell/tink/internal/agent/workflow" +) + +// ReasonRuntimeError is the default reason used when no reason is provided by the runtime. +const ReasonRuntimeError = "RuntimeError" + +// ReasonInvalid indicates a reason provided by the runtime was invalid. +const ReasonInvalid = "InvalidReason" + +// Consistent logging keys. +const ( + logEventKey = "event" + logErrorKey = "error" + logReasonKey = "reason" +) + +// validReasonRegex defines the regex for a valid action failure reason. +var validReasonRegex = regexp.MustCompile(`^[a-zA-Z]+$`) + +// run executes the workflow using the runtime configured on the Agent. +func (agent *Agent) run(ctx context.Context, wflw workflow.Workflow, events event.Recorder) error { + logger := agent.Log.WithValues("workflow", wflw) + logger.Info("Starting workflow") + + for _, action := range wflw.Actions { + logger := logger.WithValues("action_id", action.ID, "action_name", action.Name) + + start := time.Now() + logger.Info("Starting action") + + agent.recordNonTerminatingEvent(ctx, events, event.ActionStarted{ + ActionID: action.ID, + WorkflowID: wflw.ID, + }) + + if err := agent.Runtime.Run(ctx, action); err != nil { + reason := ReasonRuntimeError + if r, ok := failure.Reason(err); ok { + reason = r + if !validReasonRegex.MatchString(reason) { + logger.Info( + "Received invalid reason for action failure; using InvalidReason instead", + logReasonKey, reason, + ) + reason = ReasonInvalid + } + } + + // We consider newlines in the failure message invalid because it upsets formatting. + // The failure message is vital to easy debugability so we force the string into + // something we're happy with and communicate that. + message := strings.ReplaceAll(err.Error(), "\n", `\n`) + + logger.Info("Failed to run action; terminating workflow", + logErrorKey, err, + logReasonKey, reason, + "duration", time.Since(start).String(), + ) + agent.recordTerminatingEvent(ctx, events, event.ActionFailed{ + ActionID: action.ID, + WorkflowID: wflw.ID, + Reason: reason, + Message: message, + }) + return nil + } + + agent.recordNonTerminatingEvent(ctx, events, event.ActionSucceeded{ + ActionID: action.ID, + WorkflowID: wflw.ID, + }) + + logger.Info("Finished action", "duration", time.Since(start).String()) + } + + logger.Info("Finished workflow") + + return nil +} + +func (agent *Agent) recordNonTerminatingEvent(ctx context.Context, events event.Recorder, e event.Event) { + if err := events.RecordEvent(ctx, e); err != nil { + agent.Log.Info( + "Failed to record event; continuing workflow", + logEventKey, e, + logErrorKey, err, + ) + } +} + +func (agent *Agent) recordTerminatingEvent(ctx context.Context, events event.Recorder, e event.Event) { + if err := events.RecordEvent(ctx, e); err != nil { + agent.Log.Info("Failed to record event", logEventKey, e, logErrorKey, err) + } +} diff --git a/internal/agent/runtime.go b/internal/agent/runtime.go new file mode 100644 index 000000000..eea77536f --- /dev/null +++ b/internal/agent/runtime.go @@ -0,0 +1,20 @@ +package agent + +import ( + "context" + + "github.com/tinkerbell/tink/internal/agent/workflow" +) + +// ContainerRuntime is a runtime capable of executing workflow actions. +type ContainerRuntime interface { + // Run executes the action. The runtime should mount the following files for the action + // implementation to communicate a reason and message in the event of failure: + // + // /tinkerbell/failure-reason + // /tinkerbell/failure-message + // + // The reason and message should be communicataed via the returned error. The message should + // be the error message and the reason should be provided as defined in failure.Reason(). + Run(context.Context, workflow.Action) error +} diff --git a/internal/agent/runtime/fake.go b/internal/agent/runtime/fake.go new file mode 100644 index 000000000..2798331db --- /dev/null +++ b/internal/agent/runtime/fake.go @@ -0,0 +1,28 @@ +package runtime + +import ( + "context" + + "github.com/go-logr/logr" + "github.com/tinkerbell/tink/internal/agent" + "github.com/tinkerbell/tink/internal/agent/workflow" +) + +var _ agent.ContainerRuntime = Fake{} + +func Noop() Fake { + return Fake{ + Log: logr.Discard(), + } +} + +// Fake is a runtime that always succeeds. It does not literally execute any actions. +type Fake struct { + Log logr.Logger +} + +// Run satisfies agent.ContainerRuntime. +func (f Fake) Run(_ context.Context, a workflow.Action) error { + f.Log.Info("Starting fake container", "action", a) + return nil +} diff --git a/internal/agent/runtime/runtime.go b/internal/agent/runtime/runtime.go new file mode 100644 index 000000000..364326f0b --- /dev/null +++ b/internal/agent/runtime/runtime.go @@ -0,0 +1,7 @@ +// Package runtime contains runtime implementations that can execute workflow actions. They are +// responsible for extracting workflow failure reason and messages from the the action +// file system at the following locations: +// +// /tinkerbell/failure-reason +// /tinkerbell/failure-message +package runtime diff --git a/internal/agent/transport.go b/internal/agent/transport.go new file mode 100644 index 000000000..79a033a22 --- /dev/null +++ b/internal/agent/transport.go @@ -0,0 +1,15 @@ +package agent + +import ( + "context" + + "github.com/tinkerbell/tink/internal/agent/workflow" +) + +// Transport is a transport mechanism for communicating workflows to the agent. +type Transport interface { + // Start is a blocking call that starts the transport and begins retrieving workflows for the + // given agentID. The transport should pass workflows to the Handler. If the transport + // needs to cancel a workflow it should cancel the context passed to the Handler. + Start(_ context.Context, agentID string, _ workflow.Handler) error +} diff --git a/internal/agent/transport/fake.go b/internal/agent/transport/fake.go new file mode 100644 index 000000000..8ea0e5f35 --- /dev/null +++ b/internal/agent/transport/fake.go @@ -0,0 +1,35 @@ +package transport + +import ( + "context" + + "github.com/go-logr/logr" + "github.com/tinkerbell/tink/internal/agent/event" + "github.com/tinkerbell/tink/internal/agent/workflow" +) + +func Noop() Fake { + return Fake{ + Log: logr.Discard(), + } +} + +type Fake struct { + Log logr.Logger + Workflows []workflow.Workflow +} + +func (f Fake) Start(ctx context.Context, _ string, runner workflow.Handler) error { + f.Log.Info("Starting fake transport") + for _, w := range f.Workflows { + if err := runner.HandleWorkflow(ctx, w, f); err != nil { + f.Log.Error(err, "Running workflow", "workflow", w) + } + } + return nil +} + +func (f Fake) RecordEvent(_ context.Context, e event.Event) error { + f.Log.Info("Recording event", "event", e.GetName()) + return nil +} diff --git a/internal/agent/transport/transport.go b/internal/agent/transport/transport.go new file mode 100644 index 000000000..5f9e821c3 --- /dev/null +++ b/internal/agent/transport/transport.go @@ -0,0 +1,4 @@ +// Package transport contains data structures that implement agent transport capabilities. +// transport implementations are responsible for connecting to the Tink server and retrieving +// workflows for the agent to run. +package transport diff --git a/internal/agent/workflow/handler.go b/internal/agent/workflow/handler.go new file mode 100644 index 000000000..e63f7c8e6 --- /dev/null +++ b/internal/agent/workflow/handler.go @@ -0,0 +1,15 @@ +package workflow + +import ( + "context" + + "github.com/tinkerbell/tink/internal/agent/event" +) + +// Handler is responsible for handling workflow execution. +type Handler interface { + // HandleWorkflow begins executing the given workflow. The event recorder can be used to + // indicate the progress of a workflow. If the given context becomes cancelled, the workflow + // handler should stop workflow execution. + HandleWorkflow(context.Context, Workflow, event.Recorder) error +} diff --git a/internal/agent/workflow/workflow.go b/internal/agent/workflow/workflow.go new file mode 100644 index 000000000..3082962a0 --- /dev/null +++ b/internal/agent/workflow/workflow.go @@ -0,0 +1,33 @@ +// Package workflow contains workflow domain objects. The domain objects will be moved to +// /internal/workflow at a later date when they are required/we transition to the new codebase. +package workflow + +// Workflow represents a runnable workflow for the Handler. +type Workflow struct { + // Do we need a workflow name? Does that even come down in the proto definition? + ID string + Actions []Action +} + +func (w Workflow) String() string { + return w.ID +} + +// Action represents an individually runnable action. +type Action struct { + ID string + Name string + Image string + Cmd string + Args []string + Env map[string]string + Volumes []string + NetworkNamespace string +} + +func (a Action) String() string { + // We should consider normalizing the action name and combining it with the ID. It would + // make human identification easier. Alternatively, we could have a dedicated method for + // retrieving names. + return a.ID +}