Skip to content

Commit

Permalink
Report results of running rollout commands to the cloud
Browse files Browse the repository at this point in the history
Signed-off-by: Douglas Camata <[email protected]>
  • Loading branch information
douglascamata committed Jan 31, 2022
1 parent 911f9c9 commit f7911b9
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 50 deletions.
12 changes: 12 additions & 0 deletions api/agent/director.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ service Director {
// Retrieve Directives from the CEPC
rpc Retrieve(Identity) returns (stream Directive) {}

// Reports the result of a command execution to the cloud
rpc ReportCommandResult(CommandResult) returns (CommandResultResponse) {}

rpc RetrieveSnapshot(Identity) returns (stream RawSnapshotChunk) {}
}

Expand Down Expand Up @@ -116,4 +119,13 @@ message RolloutCommand {
ABORT = 2;
}
Action action = 3;
string command_id = 4;
}

message CommandResult {
string command_id = 1;
bool success = 2;
string message = 3;
}

message CommandResultResponse {}
1 change: 1 addition & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const cloudConnectTokenKey = "CLOUD_CONNECT_TOKEN"
type Comm interface {
Close() error
Report(context.Context, *agent.Snapshot, string) error
ReportCommandResult(context.Context, *agent.CommandResult) error
Directives() <-chan *agent.Directive
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/agent/comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (c *RPCComm) retrieveLoop(ctx context.Context) {

func (c *RPCComm) retrieve(ctx context.Context) error {
stream, err := c.client.Retrieve(ctx, c.agentID)

if err != nil {
return err
}
Expand All @@ -142,6 +143,14 @@ func (c *RPCComm) Close() error {
return c.conn.Close()
}

func (c *RPCComm) ReportCommandResult(ctx context.Context, result *agent.CommandResult) error {
_, err := c.client.ReportCommandResult(ctx, result, grpc.EmptyCallOption{})
if err != nil {
return fmt.Errorf("ReportCommandResult error: %w", err)
}
return nil
}

func (c *RPCComm) Report(ctx context.Context, report *agent.Snapshot, apiKey string) error {
select {
case c.rptWake <- struct{}{}:
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/comm_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ type MockClient struct {
LastMetadata metadata.MD
}

func (m *MockClient) ReportCommandResult(ctx context.Context, in *agent.CommandResult, opts ...grpc.CallOption) (*agent.CommandResultResponse, error) {
panic("implement me")
}

func (m *MockClient) Close() error {
return nil
}
Expand Down
25 changes: 22 additions & 3 deletions pkg/agent/directive_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ func (dh *BasicDirectiveHandler) HandleDirective(ctx context.Context, a *Agent,
}

if command.RolloutCommand != nil {
dh.handleRolloutCommand(ctx, command.RolloutCommand, dh.rolloutsGetterFactory)
dh.handleRolloutCommand(ctx, command.RolloutCommand, a)
}
}

a.SetLastDirectiveID(ctx, directive.ID)
}

func (dh *BasicDirectiveHandler) handleRolloutCommand(ctx context.Context, cmdSchema *agentapi.RolloutCommand, rolloutsGetterFactory rolloutsGetterFactory) {
func (dh *BasicDirectiveHandler) handleRolloutCommand(ctx context.Context, cmdSchema *agentapi.RolloutCommand, a *Agent) {
if dh.rolloutsGetterFactory == nil {
dlog.Warn(ctx, "Received rollout command but does not know how to talk to Argo Rollouts.")
return
Expand All @@ -64,6 +64,7 @@ func (dh *BasicDirectiveHandler) handleRolloutCommand(ctx context.Context, cmdSc
rolloutName := cmdSchema.GetName()
namespace := cmdSchema.GetNamespace()
action := int32(cmdSchema.GetAction())
commandID := cmdSchema.GetCommandId()

if rolloutName == "" {
dlog.Warn(ctx, "Rollout command received without a rollout name.")
Expand All @@ -75,13 +76,31 @@ func (dh *BasicDirectiveHandler) handleRolloutCommand(ctx context.Context, cmdSc
return
}

if commandID == "" {
dlog.Warn(ctx, "Rollout command received without a command ID.")
return
}

cmd := &rolloutCommand{
rolloutName: rolloutName,
namespace: namespace,
action: rolloutAction(agentapi.RolloutCommand_Action_name[action]),
}
err := cmd.RunWithClientFactory(ctx, rolloutsGetterFactory)
err := cmd.RunWithClientFactory(ctx, dh.rolloutsGetterFactory)
if err != nil {
dlog.Errorf(ctx, "error running rollout command %s: %s", cmd, err)
}
dh.reportCommandResult(ctx, commandID, cmd, err, a)
}

func (dh *BasicDirectiveHandler) reportCommandResult(ctx context.Context, commandID string, cmd *rolloutCommand, cmdError error, a *Agent) {
result := &agentapi.CommandResult{CommandId: commandID, Success: true}
if cmdError != nil {
result.Success = false
result.Message = cmdError.Error()
}
err := a.comm.ReportCommandResult(ctx, result)
if err != nil {
dlog.Errorf(ctx, "error reporting result of rollout command %s: %s", cmd, cmdError)
}
}
Loading

0 comments on commit f7911b9

Please sign in to comment.