diff --git a/cmd/fleet/handleCheckin.go b/cmd/fleet/handleCheckin.go index 9cabb9372..5817d2dfc 100644 --- a/cmd/fleet/handleCheckin.go +++ b/cmd/fleet/handleCheckin.go @@ -164,7 +164,20 @@ func (ct *CheckinT) _handleCheckin(w http.ResponseWriter, r *http.Request, id st return err } +<<<<<<< HEAD // Subsribe to actions dispatcher +======= + log.Debug(). + Str("agentId", id). + Str("reqId", reqId). + Str("status", req.Status). + Str("seqNo", seqno.String()). + RawJSON("meta", rawMeta). + Uint64("bodyCount", readCounter.Count()). + Msg("checkin start long poll") + + // Subscribe to actions dispatcher +>>>>>>> 73fcdb4 (Propagate checkin status to agent record) aSub := ct.ad.Subscribe(agent.Id, seqno) defer ct.ad.Unsubscribe(aSub) actCh := aSub.Ch() @@ -185,7 +198,11 @@ func (ct *CheckinT) _handleCheckin(w http.ResponseWriter, r *http.Request, id st defer longPoll.Stop() // Intial update on checkin, and any user fields that might have changed +<<<<<<< HEAD ct.bc.CheckIn(agent.Id, fields, seqno) +======= + ct.bc.CheckIn(agent.Id, req.Status, rawMeta, seqno) +>>>>>>> 73fcdb4 (Propagate checkin status to agent record) // Initial fetch for pending actions var ( @@ -222,7 +239,11 @@ func (ct *CheckinT) _handleCheckin(w http.ResponseWriter, r *http.Request, id st log.Trace().Msg("fire long poll") break LOOP case <-tick.C: +<<<<<<< HEAD ct.bc.CheckIn(agent.Id, nil, seqno) +======= + ct.bc.CheckIn(agent.Id, req.Status, nil, nil) +>>>>>>> 73fcdb4 (Propagate checkin status to agent record) } } } diff --git a/cmd/fleet/schema.go b/cmd/fleet/schema.go index 7ad4d29ff..600be37f4 100644 --- a/cmd/fleet/schema.go +++ b/cmd/fleet/schema.go @@ -76,6 +76,7 @@ type EnrollResponse struct { } type CheckinRequest struct { + Status string `json:"status"` AckToken string `json:"ack_token,omitempty"` Events []Event `json:"events"` LocalMeta json.RawMessage `json:"local_metadata"` diff --git a/internal/pkg/checkin/bulk.go b/internal/pkg/checkin/bulk.go new file mode 100644 index 000000000..ec9bdc937 --- /dev/null +++ b/internal/pkg/checkin/bulk.go @@ -0,0 +1,238 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package checkin + +import ( + "context" + "encoding/json" + "sync" + "time" + + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/sqn" + + "github.com/rs/zerolog/log" +) + +const defaultFlushInterval = 10 * time.Second + +type optionsT struct { + flushInterval time.Duration +} + +type Opt func(*optionsT) + +func WithFlushInterval(d time.Duration) Opt { + return func(opt *optionsT) { + opt.flushInterval = d + } +} + +type extraT struct { + meta []byte + seqNo sqn.SeqNo +} + +// Minimize the size of this structure. +// There will be 10's of thousands of items +// in the map at any point. +type pendingT struct { + ts string + status string + extra *extraT +} + +type Bulk struct { + opts optionsT + bulker bulk.Bulk + mut sync.Mutex + pending map[string]pendingT + + ts string + unix int64 +} + +func NewBulk(bulker bulk.Bulk, opts ...Opt) *Bulk { + parsedOpts := parseOpts(opts...) + + return &Bulk{ + opts: parsedOpts, + bulker: bulker, + pending: make(map[string]pendingT), + } +} + +func parseOpts(opts ...Opt) optionsT { + + outOpts := optionsT{ + flushInterval: defaultFlushInterval, + } + + for _, f := range opts { + f(&outOpts) + } + + return outOpts +} + +// Generate and cache timestamp on seconds change. +// Avoid thousands of formats of an identical string. +func (bc *Bulk) timestamp() string { + + // WARNING: Expects mutex locked. + now := time.Now() + if now.Unix() != bc.unix { + bc.unix = now.Unix() + bc.ts = now.UTC().Format(time.RFC3339) + } + + return bc.ts +} + +// WARNING: Bulk will take ownership of fields, +// so do not use after passing in. +func (bc *Bulk) CheckIn(id string, status string, meta []byte, seqno sqn.SeqNo) error { + + // Separate out the extra data to minimize + // the memory footprint of the 90% case of just + // updating the timestamp. + var extra *extraT + if meta != nil || seqno.IsSet() { + extra = &extraT{ + meta: meta, + seqNo: seqno, + } + } + + bc.mut.Lock() + + bc.pending[id] = pendingT{ + ts: bc.timestamp(), + status: status, + extra: extra, + } + + bc.mut.Unlock() + return nil +} + +func (bc *Bulk) Run(ctx context.Context) error { + + tick := time.NewTicker(bc.opts.flushInterval) + defer tick.Stop() + + var err error +LOOP: + for { + select { + case <-tick.C: + if err = bc.flush(ctx); err != nil { + log.Error().Err(err).Msg("Eat bulk checkin error; Keep on truckin'") + err = nil + } + + case <-ctx.Done(): + err = ctx.Err() + break LOOP + } + } + + return err +} + +func (bc *Bulk) flush(ctx context.Context) error { + start := time.Now() + + bc.mut.Lock() + pending := bc.pending + bc.pending = make(map[string]pendingT, len(pending)) + bc.mut.Unlock() + + if len(pending) == 0 { + return nil + } + + updates := make([]bulk.MultiOp, 0, len(pending)) + + simpleCache := make(map[pendingT][]byte) + + nowTimestamp := start.UTC().Format(time.RFC3339) + + var err error + var needRefresh bool + for id, pendingData := range pending { + + // In the simple case, there are no fields and no seqNo. + // When that is true, we can reuse an already generated + // JSON body containing just the timestamp updates. + var body []byte + if pendingData.extra == nil { + + var ok bool + body, ok = simpleCache[pendingData] + if !ok { + fields := bulk.UpdateFields{ + dl.FieldLastCheckin: pendingData.ts, + dl.FieldUpdatedAt: nowTimestamp, + dl.FieldLastCheckinStatus: pendingData.status, + } + if body, err = fields.Marshal(); err != nil { + return err + } + simpleCache[pendingData] = body + } + } else { + + fields := bulk.UpdateFields{ + dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp + dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp + dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status + } + + // Update local metadata if provided + if pendingData.extra.meta != nil { + // Surprise: The json encodeer compacts this raw JSON during + // the encode process, so there my be unexpected memory overhead: + // https://github.com/golang/go/blob/go1.16.3/src/encoding/json/encode.go#L499 + fields[dl.FieldLocalMetadata] = json.RawMessage(pendingData.extra.meta) + } + + // If seqNo changed, set the field appropriately + if pendingData.extra.seqNo.IsSet() { + fields[dl.FieldActionSeqNo] = pendingData.extra.seqNo + + // Only refresh if seqNo changed; dropping metadata not important. + needRefresh = true + } + + if body, err = fields.Marshal(); err != nil { + return err + } + } + + updates = append(updates, bulk.MultiOp{ + Id: id, + Body: body, + Index: dl.FleetAgents, + }) + } + + var opts []bulk.Opt + if needRefresh { + opts = append(opts, bulk.WithRefresh()) + } + + _, err = bc.bulker.MUpdate(ctx, updates, opts...) + + log.Trace(). + Err(err). + Dur("rtt", time.Since(start)). + Int("cnt", len(updates)). + Bool("refresh", needRefresh). + Msg("Flush updates") + + return err +} diff --git a/internal/pkg/checkin/bulk_test.go b/internal/pkg/checkin/bulk_test.go new file mode 100644 index 000000000..897450242 --- /dev/null +++ b/internal/pkg/checkin/bulk_test.go @@ -0,0 +1,235 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package checkin + +import ( + "bytes" + "context" + "encoding/json" + "testing" + "time" + + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/sqn" + "github.com/google/go-cmp/cmp" + + tst "github.com/elastic/fleet-server/v7/internal/pkg/testing" + "github.com/rs/xid" + "github.com/rs/zerolog" +) + +type CustomBulk struct { + tst.MockBulk + + ops []bulk.MultiOp +} + +func (m *CustomBulk) MUpdate(ctx context.Context, ops []bulk.MultiOp, opts ...bulk.Opt) ([]bulk.BulkIndexerResponseItem, error) { + m.ops = append(m.ops, ops...) + return nil, nil +} + +// Test simple, +// Test with fields +// Test with seq no + +func TestBulkSimple(t *testing.T) { + start := time.Now() + + var mockBulk CustomBulk + + bc := NewBulk(&mockBulk) + + cases := []struct { + desc string + id string + status string + meta []byte + seqno sqn.SeqNo + }{ + { + "Simple case", + "simpleId", + "online", + nil, + nil, + }, + { + "Singled field case", + "singleFieldId", + "online", + []byte(`{"hey":"now"}`), + nil, + }, + { + "Multi field case", + "multiFieldId", + "online", + []byte(`{"hey":"now","brown":"cow"}`), + nil, + }, + { + "Multi field nested case", + "multiFieldNestedId", + "online", + []byte(`{"hey":"now","wee":{"little":"doggie"}}`), + nil, + }, + { + "Simple case with seqNo", + "simpleseqno", + "online", + nil, + sqn.SeqNo{1, 2, 3, 4}, + }, + { + "Field case with seqNo", + "simpleseqno", + "online", + []byte(`{"uncle":"fester"}`), + sqn.SeqNo{5, 6, 7, 8}, + }, + { + "Unusual status", + "singleFieldId", + "unusual", + nil, + nil, + }, + { + "Empty status", + "singleFieldId", + "", + nil, + nil, + }, + } + + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + + if err := bc.CheckIn(c.id, c.status, c.meta, c.seqno); err != nil { + t.Fatal(err) + } + + if err := bc.flush(context.Background()); err != nil { + t.Fatal(err) + } + + if len(mockBulk.ops) != 1 { + t.Fatal("Expected one op") + } + + op := mockBulk.ops[0] + + mockBulk.ops = nil + + // deserialize the response + if op.Id != c.id { + t.Error("Wrong id") + } + + if op.Index != dl.FleetAgents { + t.Error("Wrong index") + } + + type updateT struct { + LastCheckin string `json:"last_checkin"` + Status string `json:"last_checkin_status"` + UpdatedAt string `json:"updated_at"` + Meta json.RawMessage `json:"local_metadata"` + SeqNo sqn.SeqNo `json:"action_seq_no"` + } + + m := make(map[string]updateT) + if err := json.Unmarshal(op.Body, &m); err != nil { + t.Error(err) + } + + sub, ok := m["doc"] + if !ok { + t.Fatal("expected doc") + } + + validateTimestamp(t, start.Truncate(time.Second), sub.LastCheckin) + validateTimestamp(t, start.Truncate(time.Second), sub.UpdatedAt) + + if c.seqno != nil { + if cdiff := cmp.Diff(c.seqno, sub.SeqNo); cdiff != "" { + t.Error(cdiff) + } + } + + if c.meta != nil && bytes.Compare(c.meta, sub.Meta) != 0 { + t.Error("meta doesn't match up") + } + + if c.status != sub.Status { + t.Error("status mismatch") + } + + }) + } +} + +func validateTimestamp(t *testing.T, start time.Time, ts string) { + + if t1, err := time.Parse(time.RFC3339, ts); err != nil { + t.Error("expected rfc3999") + } else if start.After(t1) { + t.Error("timestamp in the past") + } +} + +func benchmarkBulk(n int, flush bool, b *testing.B) { + b.ReportAllocs() + + l := zerolog.GlobalLevel() + defer zerolog.SetGlobalLevel(l) + + zerolog.SetGlobalLevel(zerolog.ErrorLevel) + + var mockBulk tst.MockBulk + + bc := NewBulk(mockBulk) + + ids := make([]string, 0, n) + for i := 0; i < n; i++ { + id := xid.New().String() + ids = append(ids, id) + } + + for i := 0; i < b.N; i++ { + + for _, id := range ids { + err := bc.CheckIn(id, "", nil, nil) + if err != nil { + b.Fatal(err) + } + } + + if flush { + err := bc.flush(context.Background()) + if err != nil { + b.Fatal(err) + } + } + } +} + +func BenchmarkBulk_1(b *testing.B) { benchmarkBulk(1, false, b) } +func BenchmarkBulk_64(b *testing.B) { benchmarkBulk(64, false, b) } +func BenchmarkBulk_8192(b *testing.B) { benchmarkBulk(8192, false, b) } +func BenchmarkBulk_37268(b *testing.B) { benchmarkBulk(37268, false, b) } +func BenchmarkBulk_131072(b *testing.B) { benchmarkBulk(131072, false, b) } +func BenchmarkBulk_262144(b *testing.B) { benchmarkBulk(262144, false, b) } + +func BenchmarkBulkFlush_1(b *testing.B) { benchmarkBulk(1, true, b) } +func BenchmarkBulkFlush_64(b *testing.B) { benchmarkBulk(64, true, b) } +func BenchmarkBulkFlush_8192(b *testing.B) { benchmarkBulk(8192, true, b) } +func BenchmarkBulkFlush_37268(b *testing.B) { benchmarkBulk(37268, true, b) } +func BenchmarkBulkFlush_131072(b *testing.B) { benchmarkBulk(131072, true, b) } +func BenchmarkBulkFlush_262144(b *testing.B) { benchmarkBulk(262144, true, b) } diff --git a/internal/pkg/dl/constants.go b/internal/pkg/dl/constants.go index f2ad891f5..b25f33c2c 100644 --- a/internal/pkg/dl/constants.go +++ b/internal/pkg/dl/constants.go @@ -31,6 +31,12 @@ const ( FieldPolicyId = "policy_id" FieldRevisionIdx = "revision_idx" FieldCoordinatorIdx = "coordinator_idx" +<<<<<<< HEAD +======= + FieldLastCheckin = "last_checkin" + FieldLastCheckinStatus = "last_checkin_status" + FieldLocalMetadata = "local_metadata" +>>>>>>> 73fcdb4 (Propagate checkin status to agent record) FieldPolicyRevisionIdx = "policy_revision_idx" FieldPolicyCoordinatorIdx = "policy_coordinator_idx" FieldDefaultApiKey = "default_api_key"