Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow upgrade action to signal retry #1887

Merged
merged 12 commits into from
Oct 24, 2022
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
- Fleet Server now allows setting global labels on APM instrumentation. {pull}1649[1649]
- Fleet Server now allows setting transaction sample rate on APM instrumentation {pull}1681[1681]
- Log redacted config when config updates. {issue}1626[1626] {pull}1668[1668]
- Allow upgrade actions to signal that they will be retried. {pull}1887[1887]
38 changes: 27 additions & 11 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,17 +278,17 @@ func (ack *AckT) handleAckEvents(ctx context.Context, zlog zerolog.Logger, agent
// The unenroll and upgrade acks might overwrite it later
setResult(n, http.StatusOK)

if ev.Error == "" {
if action.Type == TypeUnenroll {
unenrollIdxs = append(unenrollIdxs, n)
} else if action.Type == TypeUpgrade {
if err := ack.handleUpgrade(ctx, zlog, agent); err != nil {
setError(n, err)
log.Error().Err(err).Msg("handle upgrade event")
continue
}
if action.Type == TypeUpgrade {
if err := ack.handleUpgrade(ctx, zlog, agent, ev); err != nil {
setError(n, err)
log.Error().Err(err).Msg("handle upgrade event")
continue
}
}

if ev.Error == "" && action.Type == TypeUnenroll {
unenrollIdxs = append(unenrollIdxs, n)
}
}

// Process policy acks
Expand Down Expand Up @@ -505,14 +505,30 @@ func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent
return nil
}

func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *model.Agent) error {

func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, event Event) error {
now := time.Now().UTC().Format(time.RFC3339)
doc := bulk.UpdateFields{
dl.FieldUpgradeStartedAt: nil,
dl.FieldUpgradedAt: now,
dl.FieldUpgradeStatus: "completed",
}
if event.Error != "" {
// unmarshal event payload
var pl struct {
Retry bool `json:"retry"`
Attempt int `json:"retry_attempt"`
}
err := json.Unmarshal(event.Payload, &pl)
if err != nil {
zlog.Error().Err(err).Msg("unable to unmarshal upgrade event payload")
}

// if the payload indicates a retry, mark change the upgrade status to retrying.
if pl.Retry {
zlog.Info().Int("retry_attempt", pl.Attempt).Msg("marking agent upgrade as retrying")
doc[dl.FieldUpgradeStatus] = "retrying" // TODO should we also change FieldUpgradedAt and FieldUpgradeStated at?
michel-laterman marked this conversation as resolved.
Show resolved Hide resolved
}
}

body, err := doc.Marshal()
if err != nil {
Expand Down
65 changes: 65 additions & 0 deletions internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,71 @@ func TestHandleAckEvents(t *testing.T) {
},
err: &HTTPError{Status: http.StatusNotFound},
},
{
name: "upgrade action failed",
events: []Event{
{
ActionID: "ab12dcd8-bde0-4045-92dc-c4b27668d73a",
Type: "UPGRADE",
Error: "Error with no payload",
},
},
res: newAckResponse(true, []AckResponseItem{
{
Status: http.StatusOK,
Message: http.StatusText(http.StatusOK),
},
}),
bulker: func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Search", mock.Anything, mock.Anything, mock.MatchedBy(matchAction(t, "ab12dcd8-bde0-4045-92dc-c4b27668d73a")), mock.Anything).Return(&es.ResultT{HitsT: es.HitsT{
Hits: []es.HitT{{
Source: []byte(`{"action_id":"ab12dcd8-bde0-4045-92dc-c4b27668d73a","type":"UPGRADE"}`),
}},
}}, nil).Once()
m.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("", nil).Once()
m.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
return m
},
},
{
name: "upgrade action retrying",
events: []Event{
{
ActionID: "ab12dcd8-bde0-4045-92dc-c4b27668d73a",
Type: "UPGRADE",
Error: "Error with payload",
Payload: json.RawMessage(`{"retry":true,"retry_attempt":1}`),
},
},
res: newAckResponse(true, []AckResponseItem{
{
Status: http.StatusOK,
Message: http.StatusText(http.StatusOK),
},
}),
bulker: func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Search", mock.Anything, mock.Anything, mock.MatchedBy(matchAction(t, "ab12dcd8-bde0-4045-92dc-c4b27668d73a")), mock.Anything).Return(&es.ResultT{HitsT: es.HitsT{
Hits: []es.HitT{{
Source: []byte(`{"action_id":"ab12dcd8-bde0-4045-92dc-c4b27668d73a","type":"UPGRADE"}`),
}},
}}, nil).Once()
m.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("", nil).Once()
m.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.MatchedBy(func(t *testing.T, body []byte) bool {
t.Helper()
var pl struct {
Retry bool `json:"retry"`
Attempt int `json:"retry_attempt"`
}
if err := json.Unmarshal(body, &pl); err != nil {
t.Fatal(err)
}
return pl.Retry && pl.Attempt == 1
}), mock.Anything).Return(nil).Once()
return m
},
},
}

for _, tc := range tests {
Expand Down