Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow upgrade action to signal retry #1887

Merged
merged 12 commits into from
Oct 24, 2022
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@
- Fleet Server now allows setting transaction sample rate on APM instrumentation {pull}1681[1681]
- Log redacted config when config updates. {issue}1626[1626] {pull}1668[1668]
- Storing checkin message in last_checkin_message {pull}1932[1932]
- Allow upgrade actions to signal that they will be retried. {pull}1887[1887]
42 changes: 31 additions & 11 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,17 +265,17 @@ func (ack *AckT) handleAckEvents(ctx context.Context, zlog zerolog.Logger, agent
// The unenroll and upgrade acks might overwrite it later
setResult(n, http.StatusOK)

if ev.Error == "" {
if action.Type == TypeUnenroll {
unenrollIdxs = append(unenrollIdxs, n)
} else if action.Type == TypeUpgrade {
if err := ack.handleUpgrade(ctx, zlog, agent); err != nil {
setError(n, err)
log.Error().Err(err).Msg("handle upgrade event")
continue
}
if action.Type == TypeUpgrade {
if err := ack.handleUpgrade(ctx, zlog, agent, ev); err != nil {
setError(n, err)
log.Error().Err(err).Msg("handle upgrade event")
continue
}
}

if ev.Error == "" && action.Type == TypeUnenroll {
unenrollIdxs = append(unenrollIdxs, n)
}
}

// Process policy acks
Expand Down Expand Up @@ -503,13 +503,33 @@ func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent
return nil
}

func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *model.Agent) error {

func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, event Event) error {
now := time.Now().UTC().Format(time.RFC3339)
doc := bulk.UpdateFields{
dl.FieldUpgradeStartedAt: nil,
dl.FieldUpgradeStatus: nil,
dl.FieldUpgradedAt: now,
}
if event.Error != "" {
// unmarshal event payload
var pl struct {
Retry bool `json:"retry"`
Attempt int `json:"retry_attempt"`
}
err := json.Unmarshal(event.Payload, &pl)
if err != nil {
zlog.Error().Err(err).Msg("unable to unmarshal upgrade event payload")
}

// if the payload indicates a retry, mark change the upgrade status to retrying.
if pl.Retry {
zlog.Info().Int("retry_attempt", pl.Attempt).Msg("marking agent upgrade as retrying")
doc[dl.FieldUpgradeStatus] = "retrying" // TODO should we also change FieldUpgradedAt and FieldUpgradeStated at?
michel-laterman marked this conversation as resolved.
Show resolved Hide resolved
} else {
zlog.Info().Int("retry_attempt", pl.Attempt).Msg("marking agent upgrade as failed")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how will we handle stale agent (older versions without retry in ack)? is it expected for them not to retry?
i'd say yes, just want to double check with your intentions.

Copy link
Contributor

@michalpristas michalpristas Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FieldUpgradeStartedAt and FieldUpgradedAt is used in Fleet UI side to set status Upgrading
i wonder if we should mark FieldUpgradedAt in case of failed retry as it was not upgraded so far.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, a pre v8.6 agent that reports an error will set FieldUpgradeStatus to failed. However I'm not sure if it is able to send that ack as I don't think it could actually ack a failure.

If it sends a successful upgrade ack FieldUpgradeStatus will be set to nil and FieldUpgradedAt is set to the current time (keeping the current behaviour)

@juliaElastic should we be setting FieldUpgradedAt and clearing FieldUpgradeStartedAt in failure cases, or should we keep these attributes to their original values?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently there is no Failed agent status. So the agent can either remain in Updating or move back to Healthy status. The error should be captured in action results similar to other action failures.
So I think the agent can move back to Healthy state if the upgrade failed (supposing nothing changed for the agent) - this means setting FieldUpgradeStartedAt to nil and leaving FieldUpgradedAt at the previous value.

doc[dl.FieldUpgradeStatus] = "failed" // TODO should we also change FieldUpgradedAt and FieldUpgradeStated at?
}
}

body, err := doc.Marshal()
if err != nil {
Expand Down
136 changes: 136 additions & 0 deletions internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,61 @@ func TestHandleAckEvents(t *testing.T) {
},
err: &HTTPError{Status: http.StatusNotFound},
},
{
name: "upgrade action failed",
events: []Event{
{
ActionID: "ab12dcd8-bde0-4045-92dc-c4b27668d73a",
Type: "UPGRADE",
Error: "Error with no payload",
},
},
res: newAckResponse(false, []AckResponseItem{
{
Status: http.StatusOK,
Message: http.StatusText(http.StatusOK),
},
}),
bulker: func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Search", mock.Anything, mock.Anything, mock.MatchedBy(matchAction(t, "ab12dcd8-bde0-4045-92dc-c4b27668d73a")), mock.Anything).Return(&es.ResultT{HitsT: es.HitsT{
Hits: []es.HitT{{
Source: []byte(`{"action_id":"ab12dcd8-bde0-4045-92dc-c4b27668d73a","type":"UPGRADE"}`),
}},
}}, nil).Once()
m.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("", nil).Once()
m.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
return m
},
},
{
name: "upgrade action retrying",
events: []Event{
{
ActionID: "ab12dcd8-bde0-4045-92dc-c4b27668d73a",
Type: "UPGRADE",
Error: "Error with payload",
Payload: json.RawMessage(`{"retry":true,"retry_attempt":1}`),
},
},
res: newAckResponse(false, []AckResponseItem{
{
Status: http.StatusOK,
Message: http.StatusText(http.StatusOK),
},
}),
bulker: func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Search", mock.Anything, mock.Anything, mock.MatchedBy(matchAction(t, "ab12dcd8-bde0-4045-92dc-c4b27668d73a")), mock.Anything).Return(&es.ResultT{HitsT: es.HitsT{
Hits: []es.HitT{{
Source: []byte(`{"action_id":"ab12dcd8-bde0-4045-92dc-c4b27668d73a","type":"UPGRADE"}`),
}},
}}, nil).Once()
m.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("", nil).Once()
m.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
return m
},
},
}

for _, tc := range tests {
Expand Down Expand Up @@ -492,3 +547,84 @@ func TestInvalidateAPIKeys(t *testing.T) {
bulker.AssertExpectations(t)
}
}

func TestAckHandleUpgrade(t *testing.T) {
tests := []struct {
name string
event Event
bulker func(t *testing.T) *ftesting.MockBulk
}{{
name: "ok",
event: Event{},
bulker: func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
return m
},
}, {
name: "retry signaled",
event: Event{
Error: "upgrade error",
Payload: json.RawMessage(`{"retry":true,"retry_attempt":1}`),
},
bulker: func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.MatchedBy(func(p []byte) bool {
var body struct {
Doc struct {
Status string `json:"upgrade_status"`
} `json:"doc"`
}
if err := json.Unmarshal(p, &body); err != nil {
t.Fatal(err)
}
return body.Doc.Status == "retrying"
}), mock.Anything).Return(nil).Once()
return m
},
}, {
name: "no more retries",
event: Event{
Error: "upgrade error",
Payload: json.RawMessage(`{"retry":false}`),
},
bulker: func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.MatchedBy(func(p []byte) bool {
var body struct {
Doc struct {
Status string `json:"upgrade_status"`
} `json:"doc"`
}
if err := json.Unmarshal(p, &body); err != nil {
t.Fatal(err)
}
return body.Doc.Status == "failed"
}), mock.Anything).Return(nil).Once()
return m
},
}}
cfg := &config.Server{
Limits: config.ServerLimits{},
}
agent := &model.Agent{
ESDocument: model.ESDocument{Id: "ab12dcd8-bde0-4045-92dc-c4b27668d735"},
Agent: &model.AgentMetadata{Version: "8.0.0"},
}
ctx := context.Background()
cache, err := cache.New(config.Cache{NumCounters: 100, MaxCost: 100000})
if err != nil {
t.Fatal(err)
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
logger := testlog.SetLogger(t)
bulker := tc.bulker(t)
ack := NewAckT(cfg, bulker, cache)

err := ack.handleUpgrade(ctx, logger, agent, tc.event)
assert.NoError(t, err)
bulker.AssertExpectations(t)
})
}
}