From 22bf71fd9cabc3c8b95efe132d88025073c09fc0 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 3 Jun 2021 16:25:18 +0000 Subject: [PATCH] Propagate checkin status to agent record (#427) (cherry picked from commit 73fcdb4cbc14f3cff5a612027f81d03401965e36) Co-authored-by: Sean Cunningham --- cmd/fleet/handleCheckin.go | 13 +++++++++-- cmd/fleet/schema.go | 1 + internal/pkg/checkin/bulk.go | 29 +++++++++++++---------- internal/pkg/checkin/bulk_test.go | 38 ++++++++++++++++++++++++++----- internal/pkg/dl/constants.go | 1 + 5 files changed, 62 insertions(+), 20 deletions(-) diff --git a/cmd/fleet/handleCheckin.go b/cmd/fleet/handleCheckin.go index f3a7cca3c..b490f500f 100644 --- a/cmd/fleet/handleCheckin.go +++ b/cmd/fleet/handleCheckin.go @@ -176,6 +176,15 @@ func (ct *CheckinT) _handleCheckin(w http.ResponseWriter, r *http.Request, id st return err } + log.Debug(). + Str("agentId", id). + Str("reqId", reqId). + Str("status", req.Status). + Str("seqNo", seqno.String()). + RawJSON("meta", rawMeta). + Uint64("bodyCount", readCounter.Count()). + Msg("checkin start long poll") + // Subscribe to actions dispatcher aSub := ct.ad.Subscribe(agent.Id, seqno) defer ct.ad.Unsubscribe(aSub) @@ -197,7 +206,7 @@ func (ct *CheckinT) _handleCheckin(w http.ResponseWriter, r *http.Request, id st defer longPoll.Stop() // Intial update on checkin, and any user fields that might have changed - ct.bc.CheckIn(agent.Id, rawMeta, seqno) + ct.bc.CheckIn(agent.Id, req.Status, rawMeta, seqno) // Initial fetch for pending actions var ( @@ -234,7 +243,7 @@ func (ct *CheckinT) _handleCheckin(w http.ResponseWriter, r *http.Request, id st log.Trace().Str(EcsHttpRequestId, reqId).Str("agentId", agent.Id).Msg("fire long poll") break LOOP case <-tick.C: - ct.bc.CheckIn(agent.Id, nil, nil) + ct.bc.CheckIn(agent.Id, req.Status, nil, nil) } } } diff --git a/cmd/fleet/schema.go b/cmd/fleet/schema.go index e3dde6ed8..8c28436e7 100644 --- a/cmd/fleet/schema.go +++ b/cmd/fleet/schema.go @@ -71,6 +71,7 @@ type EnrollResponse struct { } type CheckinRequest struct { + Status string `json:"status"` AckToken string `json:"ack_token,omitempty"` Events []Event `json:"events"` LocalMeta json.RawMessage `json:"local_metadata"` diff --git a/internal/pkg/checkin/bulk.go b/internal/pkg/checkin/bulk.go index 8f20d0294..ec9bdc937 100644 --- a/internal/pkg/checkin/bulk.go +++ b/internal/pkg/checkin/bulk.go @@ -40,8 +40,9 @@ type extraT struct { // There will be 10's of thousands of items // in the map at any point. type pendingT struct { - ts string - extra *extraT + ts string + status string + extra *extraT } type Bulk struct { @@ -93,7 +94,7 @@ func (bc *Bulk) timestamp() string { // WARNING: Bulk will take ownership of fields, // so do not use after passing in. -func (bc *Bulk) CheckIn(id string, meta []byte, seqno sqn.SeqNo) error { +func (bc *Bulk) CheckIn(id string, status string, meta []byte, seqno sqn.SeqNo) error { // Separate out the extra data to minimize // the memory footprint of the 90% case of just @@ -109,8 +110,9 @@ func (bc *Bulk) CheckIn(id string, meta []byte, seqno sqn.SeqNo) error { bc.mut.Lock() bc.pending[id] = pendingT{ - ts: bc.timestamp(), - extra: extra, + ts: bc.timestamp(), + status: status, + extra: extra, } bc.mut.Unlock() @@ -155,7 +157,7 @@ func (bc *Bulk) flush(ctx context.Context) error { updates := make([]bulk.MultiOp, 0, len(pending)) - simpleCache := make(map[string][]byte) + simpleCache := make(map[pendingT][]byte) nowTimestamp := start.UTC().Format(time.RFC3339) @@ -168,23 +170,26 @@ func (bc *Bulk) flush(ctx context.Context) error { // JSON body containing just the timestamp updates. var body []byte if pendingData.extra == nil { + var ok bool - body, ok = simpleCache[pendingData.ts] + body, ok = simpleCache[pendingData] if !ok { fields := bulk.UpdateFields{ - dl.FieldLastCheckin: pendingData.ts, - dl.FieldUpdatedAt: nowTimestamp, + dl.FieldLastCheckin: pendingData.ts, + dl.FieldUpdatedAt: nowTimestamp, + dl.FieldLastCheckinStatus: pendingData.status, } if body, err = fields.Marshal(); err != nil { return err } - simpleCache[pendingData.ts] = body + simpleCache[pendingData] = body } } else { fields := bulk.UpdateFields{ - dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp - dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp + dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp + dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp + dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status } // Update local metadata if provided diff --git a/internal/pkg/checkin/bulk_test.go b/internal/pkg/checkin/bulk_test.go index fc881e16a..897450242 100644 --- a/internal/pkg/checkin/bulk_test.go +++ b/internal/pkg/checkin/bulk_test.go @@ -44,53 +44,74 @@ func TestBulkSimple(t *testing.T) { bc := NewBulk(&mockBulk) cases := []struct { - desc string - id string - meta []byte - seqno sqn.SeqNo + desc string + id string + status string + meta []byte + seqno sqn.SeqNo }{ { "Simple case", "simpleId", + "online", nil, nil, }, { "Singled field case", "singleFieldId", + "online", []byte(`{"hey":"now"}`), nil, }, { "Multi field case", "multiFieldId", + "online", []byte(`{"hey":"now","brown":"cow"}`), nil, }, { "Multi field nested case", "multiFieldNestedId", + "online", []byte(`{"hey":"now","wee":{"little":"doggie"}}`), nil, }, { "Simple case with seqNo", "simpleseqno", + "online", nil, sqn.SeqNo{1, 2, 3, 4}, }, { "Field case with seqNo", "simpleseqno", + "online", []byte(`{"uncle":"fester"}`), sqn.SeqNo{5, 6, 7, 8}, }, + { + "Unusual status", + "singleFieldId", + "unusual", + nil, + nil, + }, + { + "Empty status", + "singleFieldId", + "", + nil, + nil, + }, } for _, c := range cases { t.Run(c.desc, func(t *testing.T) { - if err := bc.CheckIn(c.id, c.meta, c.seqno); err != nil { + if err := bc.CheckIn(c.id, c.status, c.meta, c.seqno); err != nil { t.Fatal(err) } @@ -117,6 +138,7 @@ func TestBulkSimple(t *testing.T) { type updateT struct { LastCheckin string `json:"last_checkin"` + Status string `json:"last_checkin_status"` UpdatedAt string `json:"updated_at"` Meta json.RawMessage `json:"local_metadata"` SeqNo sqn.SeqNo `json:"action_seq_no"` @@ -145,6 +167,10 @@ func TestBulkSimple(t *testing.T) { t.Error("meta doesn't match up") } + if c.status != sub.Status { + t.Error("status mismatch") + } + }) } } @@ -179,7 +205,7 @@ func benchmarkBulk(n int, flush bool, b *testing.B) { for i := 0; i < b.N; i++ { for _, id := range ids { - err := bc.CheckIn(id, nil, nil) + err := bc.CheckIn(id, "", nil, nil) if err != nil { b.Fatal(err) } diff --git a/internal/pkg/dl/constants.go b/internal/pkg/dl/constants.go index 7d0d6df99..80811717f 100644 --- a/internal/pkg/dl/constants.go +++ b/internal/pkg/dl/constants.go @@ -32,6 +32,7 @@ const ( FieldRevisionIdx = "revision_idx" FieldCoordinatorIdx = "coordinator_idx" FieldLastCheckin = "last_checkin" + FieldLastCheckinStatus = "last_checkin_status" FieldLocalMetadata = "local_metadata" FieldPolicyRevisionIdx = "policy_revision_idx" FieldPolicyCoordinatorIdx = "policy_coordinator_idx"