-
Notifications
You must be signed in to change notification settings - Fork 135
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Chris Doherty <[email protected]>
- Loading branch information
1 parent
f017f99
commit bfbfded
Showing
20 changed files
with
1,324 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
package agent | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
|
||
"github.com/go-logr/logr" | ||
"github.com/tinkerbell/tink/internal/agent/event" | ||
"github.com/tinkerbell/tink/internal/agent/workflow" | ||
) | ||
|
||
// Agent is the core data structure for handling workflow execution on target nodes. It leverages | ||
// a Transport and a ContainerRuntime to retrieve workflows and execute actions. | ||
// | ||
// The agent runs a single workflow at a time. Concurrent requests to run workflows will have the | ||
// second workflow rejected with an event.WorkflowRejected event. | ||
type Agent struct { | ||
Log logr.Logger | ||
|
||
// ID is the unique identifier for the agent. It is used by the transport to identify workflows | ||
// scheduled for this agent. | ||
ID string | ||
|
||
// Transport is the transport used by the agent for communicating workflows and events. | ||
Transport Transport | ||
|
||
// Runtime is the container runtime used to execute workflow actions. | ||
Runtime ContainerRuntime | ||
|
||
sem chan struct{} | ||
} | ||
|
||
// Start finalizes the Agent configuration and starts the configured Transport so it is ready | ||
// to receive workflows. On receiving a workflow, it will leverage the configured Runtime to | ||
// execute workflow actions. | ||
func (agent *Agent) Start(ctx context.Context) error { | ||
if agent.ID == "" { | ||
return errors.New("ID field must be set before calling Start()") | ||
} | ||
|
||
if agent.Transport == nil { | ||
return errors.New("Transport field must be set before calling Start()") | ||
} | ||
|
||
if agent.Runtime == nil { | ||
//nolint:stylecheck // Specifying field on data structure | ||
return errors.New("Runtime field must be set before calling Start()") | ||
} | ||
|
||
agent.Log = agent.Log.WithValues("agent_id", agent.ID) | ||
|
||
// Initialize the semaphore and add a resource to it ensuring we can run 1 workflow at a time. | ||
agent.sem = make(chan struct{}, 1) | ||
agent.sem <- struct{}{} | ||
|
||
agent.Log.Info("Starting agent") | ||
return agent.Transport.Start(ctx, agent.ID, agent) | ||
} | ||
|
||
// HandleWorkflow satisfies transport. | ||
func (agent *Agent) HandleWorkflow(ctx context.Context, wflw workflow.Workflow, events event.Recorder) error { | ||
// sem isn't protected by a synchronization data structure so this is technically invoking | ||
// undefined behavior. Its also a misuse of the data structure so we're making a best effort | ||
// without adding complexity. | ||
if agent.sem == nil { | ||
return errors.New("Agent must have Start() called before calling HandleWorkflow()") | ||
} | ||
|
||
select { | ||
case <-agent.sem: | ||
// Replenish the semaphore on exit so we can pick up another workflow. | ||
defer func() { agent.sem <- struct{}{} }() | ||
return agent.run(ctx, wflw, events) | ||
|
||
default: | ||
reject := event.WorkflowRejected{ | ||
ID: wflw.ID, | ||
Message: "workflow already in progress", | ||
} | ||
if err := events.RecordEvent(ctx, reject); err != nil { | ||
agent.Log.Info("Failed to record event", logEventKey, reject) | ||
} | ||
return nil | ||
} | ||
} |
Oops, something went wrong.