From ab35d465adfa86a22ee7cb8306d084d902a70908 Mon Sep 17 00:00:00 2001 From: Julia Bardi <90178898+juliaElastic@users.noreply.github.com> Date: Thu, 29 Sep 2022 14:33:59 +0200 Subject: [PATCH] Storing checkin message in last_checkin_message (#1932) * Storing checkin message in last_checkin_message * added changelog * fixed tests (cherry picked from commit 129ea1c18ade972ec22b1c2474c61fde4632e269) --- CHANGELOG.next.asciidoc | 1 + internal/pkg/api/handleCheckin.go | 4 ++-- internal/pkg/api/schema.go | 1 + internal/pkg/checkin/bulk.go | 30 +++++++++++++++++------------- internal/pkg/checkin/bulk_test.go | 25 +++++++++++++++++-------- internal/pkg/dl/constants.go | 1 + internal/pkg/model/schema.go | 3 +++ model/schema.json | 4 ++++ 8 files changed, 46 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b98d908e0..4d33e02f9 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -23,3 +23,4 @@ - Fleet Server now allows setting global labels on APM instrumentation. {pull}1649[1649] - Fleet Server now allows setting transaction sample rate on APM instrumentation {pull}1681[1681] - Log redacted config when config updates. {issue}1626[1626] {pull}1668[1668] +- Storing checkin message in last_checkin_message {pull}1932[1932] \ No newline at end of file diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 96446c9be..62bd80fd9 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -209,7 +209,7 @@ func (ct *CheckinT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r defer longPoll.Stop() // Initial update on checkin, and any user fields that might have changed - err = ct.bc.CheckIn(agent.Id, req.Status, rawMeta, seqno, ver) + err = ct.bc.CheckIn(agent.Id, req.Status, req.Message, rawMeta, seqno, ver) if err != nil { zlog.Error().Err(err).Str("agent_id", agent.Id).Msg("checkin failed") } @@ -249,7 +249,7 @@ func (ct *CheckinT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r zlog.Trace().Msg("fire long poll") break LOOP case <-tick.C: - err := ct.bc.CheckIn(agent.Id, req.Status, nil, nil, ver) + err := ct.bc.CheckIn(agent.Id, req.Status, req.Message, nil, nil, ver) if err != nil { zlog.Error().Err(err).Str("agent_id", agent.Id).Msg("checkin failed") } diff --git a/internal/pkg/api/schema.go b/internal/pkg/api/schema.go index 481632b71..fdca0648a 100644 --- a/internal/pkg/api/schema.go +++ b/internal/pkg/api/schema.go @@ -79,6 +79,7 @@ type EnrollResponse struct { type CheckinRequest struct { Status string `json:"status"` + Message string `json:"message"` AckToken string `json:"ack_token,omitempty"` LocalMeta json.RawMessage `json:"local_metadata"` } diff --git a/internal/pkg/checkin/bulk.go b/internal/pkg/checkin/bulk.go index 3d510c8f0..7d80123e2 100644 --- a/internal/pkg/checkin/bulk.go +++ b/internal/pkg/checkin/bulk.go @@ -42,9 +42,10 @@ type extraT struct { // There will be 10's of thousands of items // in the map at any point. type pendingT struct { - ts string - status string - extra *extraT + ts string + status string + message string + extra *extraT } // Bulk will batch pending checkins and update elasticsearch at a set interval. @@ -98,7 +99,7 @@ func (bc *Bulk) timestamp() string { // CheckIn will add the agent (identified by id) to the pending set. // The pending agents are sent to elasticsearch as a bulk update at each flush interval. // WARNING: Bulk will take ownership of fields, so do not use after passing in. -func (bc *Bulk) CheckIn(id string, status string, meta []byte, seqno sqn.SeqNo, newVer string) error { +func (bc *Bulk) CheckIn(id string, status string, message string, meta []byte, seqno sqn.SeqNo, newVer string) error { // Separate out the extra data to minimize // the memory footprint of the 90% case of just // updating the timestamp. @@ -114,9 +115,10 @@ func (bc *Bulk) CheckIn(id string, status string, meta []byte, seqno sqn.SeqNo, bc.mut.Lock() bc.pending[id] = pendingT{ - ts: bc.timestamp(), - status: status, - extra: extra, + ts: bc.timestamp(), + status: status, + message: message, + extra: extra, } bc.mut.Unlock() @@ -180,9 +182,10 @@ func (bc *Bulk) flush(ctx context.Context) error { body, ok = simpleCache[pendingData] if !ok { fields := bulk.UpdateFields{ - dl.FieldLastCheckin: pendingData.ts, - dl.FieldUpdatedAt: nowTimestamp, - dl.FieldLastCheckinStatus: pendingData.status, + dl.FieldLastCheckin: pendingData.ts, + dl.FieldUpdatedAt: nowTimestamp, + dl.FieldLastCheckinStatus: pendingData.status, + dl.FieldLastCheckinMessage: pendingData.message, } if body, err = fields.Marshal(); err != nil { return err @@ -192,9 +195,10 @@ func (bc *Bulk) flush(ctx context.Context) error { } else { fields := bulk.UpdateFields{ - dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp - dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp - dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status + dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp + dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp + dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status + dl.FieldLastCheckinMessage: pendingData.message, // Set the status message } // If the agent version is not empty it needs to be updated diff --git a/internal/pkg/checkin/bulk_test.go b/internal/pkg/checkin/bulk_test.go index 4254c1c89..0ba931996 100644 --- a/internal/pkg/checkin/bulk_test.go +++ b/internal/pkg/checkin/bulk_test.go @@ -79,12 +79,13 @@ func matchOp(tb testing.TB, c bulkcase, ts time.Time) func(ops []bulk.MultiOp) b } type bulkcase struct { - desc string - id string - status string - meta []byte - seqno sqn.SeqNo - ver string + desc string + id string + status string + message string + meta []byte + seqno sqn.SeqNo + ver string } func TestBulkSimple(t *testing.T) { @@ -96,6 +97,7 @@ func TestBulkSimple(t *testing.T) { "Simple case", "simpleId", "online", + "message", nil, nil, "", @@ -104,6 +106,7 @@ func TestBulkSimple(t *testing.T) { "Singled field case", "singleFieldId", "online", + "message", []byte(`{"hey":"now"}`), nil, "", @@ -112,6 +115,7 @@ func TestBulkSimple(t *testing.T) { "Multi field case", "multiFieldId", "online", + "message", []byte(`{"hey":"now","brown":"cow"}`), nil, ver, @@ -120,6 +124,7 @@ func TestBulkSimple(t *testing.T) { "Multi field nested case", "multiFieldNestedId", "online", + "message", []byte(`{"hey":"now","wee":{"little":"doggie"}}`), nil, "", @@ -128,6 +133,7 @@ func TestBulkSimple(t *testing.T) { "Simple case with seqNo", "simpleseqno", "online", + "message", nil, sqn.SeqNo{1, 2, 3, 4}, ver, @@ -136,6 +142,7 @@ func TestBulkSimple(t *testing.T) { "Field case with seqNo", "simpleseqno", "online", + "message", []byte(`{"uncle":"fester"}`), sqn.SeqNo{5, 6, 7, 8}, ver, @@ -144,6 +151,7 @@ func TestBulkSimple(t *testing.T) { "Unusual status", "singleFieldId", "unusual", + "message", nil, nil, "", @@ -152,6 +160,7 @@ func TestBulkSimple(t *testing.T) { "Empty status", "singleFieldId", "", + "message", nil, nil, "", @@ -165,7 +174,7 @@ func TestBulkSimple(t *testing.T) { mockBulk.On("MUpdate", mock.Anything, mock.MatchedBy(matchOp(t, c, start)), mock.Anything).Return([]bulk.BulkIndexerResponseItem{}, nil).Once() bc := NewBulk(mockBulk) - if err := bc.CheckIn(c.id, c.status, c.meta, c.seqno, c.ver); err != nil { + if err := bc.CheckIn(c.id, c.status, c.message, c.meta, c.seqno, c.ver); err != nil { t.Fatal(err) } @@ -203,7 +212,7 @@ func benchmarkBulk(n int, flush bool, b *testing.B) { for i := 0; i < b.N; i++ { for _, id := range ids { - err := bc.CheckIn(id, "", nil, nil, "") + err := bc.CheckIn(id, "", "", nil, nil, "") if err != nil { b.Fatal(err) } diff --git a/internal/pkg/dl/constants.go b/internal/pkg/dl/constants.go index 419f92874..f55bb7123 100644 --- a/internal/pkg/dl/constants.go +++ b/internal/pkg/dl/constants.go @@ -33,6 +33,7 @@ const ( FieldCoordinatorIdx = "coordinator_idx" FieldLastCheckin = "last_checkin" FieldLastCheckinStatus = "last_checkin_status" + FieldLastCheckinMessage = "last_checkin_message" FieldLocalMetadata = "local_metadata" FieldPolicyCoordinatorIdx = "policy_coordinator_idx" FieldPolicyID = "policy_id" diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index 5d3c844f1..0dc0ac947 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -139,6 +139,9 @@ type Agent struct { // Date/time the Elastic Agent checked in last time LastCheckin string `json:"last_checkin,omitempty"` + // Last checkin message + LastCheckinMessage string `json:"last_checkin_message,omitempty"` + // Last checkin status LastCheckinStatus string `json:"last_checkin_status,omitempty"` diff --git a/model/schema.json b/model/schema.json index 423cfe3fc..78debe5c9 100644 --- a/model/schema.json +++ b/model/schema.json @@ -516,6 +516,10 @@ "description": "Last checkin status", "type": "string" }, + "last_checkin_message": { + "description": "Last checkin message", + "type": "string" + }, "default_api_key_id": { "description": "Deprecated. Use Outputs instead. ID of the API key the Elastic Agent uses to authenticate with elasticsearch", "type": "string"