From ed281eb1e3bbf0845a4a5ee4946b9ec4e302fdf1 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Thu, 29 Jul 2021 10:59:38 +0800 Subject: [PATCH] sammpling: refactor to read/write model.APMEvents (#5814) Update the tail-based sampling package to read and write model.APMEvents, rather than model.Transactions and model.Spans. This is in preparation for moving "metadata" up to APMEvent. (cherry picked from commit 96d24d4f9ced74ae153699d5eeb40cd6d0561f96) --- .../apm-server/sampling/eventstorage/codec.go | 5 -- .../sampling/eventstorage/jsoncodec.go | 22 ++---- .../sampling/eventstorage/sharded.go | 52 ++++--------- .../eventstorage/sharded_bench_test.go | 20 +++-- .../sampling/eventstorage/storage.go | 77 +++++-------------- .../eventstorage/storage_bench_test.go | 33 ++++---- .../sampling/eventstorage/storage_test.go | 68 +++++++--------- x-pack/apm-server/sampling/processor.go | 42 +++++----- x-pack/apm-server/sampling/processor_test.go | 12 +-- 9 files changed, 129 insertions(+), 202 deletions(-) delete mode 100644 x-pack/apm-server/sampling/eventstorage/codec.go diff --git a/x-pack/apm-server/sampling/eventstorage/codec.go b/x-pack/apm-server/sampling/eventstorage/codec.go deleted file mode 100644 index 64078d0743f..00000000000 --- a/x-pack/apm-server/sampling/eventstorage/codec.go +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package eventstorage diff --git a/x-pack/apm-server/sampling/eventstorage/jsoncodec.go b/x-pack/apm-server/sampling/eventstorage/jsoncodec.go index 4e040db1d2d..77d7f26b8d1 100644 --- a/x-pack/apm-server/sampling/eventstorage/jsoncodec.go +++ b/x-pack/apm-server/sampling/eventstorage/jsoncodec.go @@ -17,22 +17,12 @@ import ( // JSONCodec is an implementation of Codec, using JSON encoding. type JSONCodec struct{} -// DecodeSpan decodes data as JSON into span. -func (JSONCodec) DecodeSpan(data []byte, span *model.Span) error { - return jsoniter.ConfigFastest.Unmarshal(data, span) +// DecodeEvent decodes data as JSON into event. +func (JSONCodec) DecodeEvent(data []byte, event *model.APMEvent) error { + return jsoniter.ConfigFastest.Unmarshal(data, event) } -// DecodeTransaction decodes data as JSON into tx. -func (JSONCodec) DecodeTransaction(data []byte, tx *model.Transaction) error { - return jsoniter.ConfigFastest.Unmarshal(data, tx) -} - -// EncodeSpan encodes span as JSON. -func (JSONCodec) EncodeSpan(span *model.Span) ([]byte, error) { - return json.Marshal(span) -} - -// EncodeTransaction encodes tx as JSON. -func (JSONCodec) EncodeTransaction(tx *model.Transaction) ([]byte, error) { - return json.Marshal(tx) +// EncodeEvent encodes event as JSON. +func (JSONCodec) EncodeEvent(event *model.APMEvent) ([]byte, error) { + return json.Marshal(event) } diff --git a/x-pack/apm-server/sampling/eventstorage/sharded.go b/x-pack/apm-server/sampling/eventstorage/sharded.go index afc433a5bb4..f45f6c6f6a3 100644 --- a/x-pack/apm-server/sampling/eventstorage/sharded.go +++ b/x-pack/apm-server/sampling/eventstorage/sharded.go @@ -51,19 +51,14 @@ func (s *ShardedReadWriter) Flush() error { return result } -// ReadEvents calls Writer.ReadEvents, using a sharded, locked, Writer. -func (s *ShardedReadWriter) ReadEvents(traceID string, out *model.Batch) error { - return s.getWriter(traceID).ReadEvents(traceID, out) +// ReadTraceEvents calls Writer.ReadTraceEvents, using a sharded, locked, Writer. +func (s *ShardedReadWriter) ReadTraceEvents(traceID string, out *model.Batch) error { + return s.getWriter(traceID).ReadTraceEvents(traceID, out) } -// WriteTransaction calls Writer.WriteTransaction, using a sharded, locked, Writer. -func (s *ShardedReadWriter) WriteTransaction(tx *model.Transaction) error { - return s.getWriter(tx.TraceID).WriteTransaction(tx) -} - -// WriteSpan calls Writer.WriteSpan, using a sharded, locked, Writer. -func (s *ShardedReadWriter) WriteSpan(span *model.Span) error { - return s.getWriter(span.TraceID).WriteSpan(span) +// WriteTraceEvent calls Writer.WriteTraceEvent, using a sharded, locked, Writer. +func (s *ShardedReadWriter) WriteTraceEvent(traceID, id string, event *model.APMEvent) error { + return s.getWriter(traceID).WriteTraceEvent(traceID, id, event) } // WriteTraceSampled calls Writer.WriteTraceSampled, using a sharded, locked, Writer. @@ -76,14 +71,9 @@ func (s *ShardedReadWriter) IsTraceSampled(traceID string) (bool, error) { return s.getWriter(traceID).IsTraceSampled(traceID) } -// DeleteTransaction calls Writer.DeleteTransaction, using a sharded, locked, Writer. -func (s *ShardedReadWriter) DeleteTransaction(tx *model.Transaction) error { - return s.getWriter(tx.TraceID).DeleteTransaction(tx) -} - -// DeleteSpan calls Writer.DeleteSpan, using a sharded, locked, Writer. -func (s *ShardedReadWriter) DeleteSpan(span *model.Span) error { - return s.getWriter(span.TraceID).DeleteSpan(span) +// DeleteTraceEvent calls Writer.DeleteTraceEvent, using a sharded, locked, Writer. +func (s *ShardedReadWriter) DeleteTraceEvent(traceID, id string) error { + return s.getWriter(traceID).DeleteTraceEvent(traceID, id) } // getWriter returns an event storage writer for the given trace ID. @@ -114,22 +104,16 @@ func (rw *lockedReadWriter) Flush() error { return rw.rw.Flush() } -func (rw *lockedReadWriter) ReadEvents(traceID string, out *model.Batch) error { - rw.mu.Lock() - defer rw.mu.Unlock() - return rw.rw.ReadEvents(traceID, out) -} - -func (rw *lockedReadWriter) WriteTransaction(tx *model.Transaction) error { +func (rw *lockedReadWriter) ReadTraceEvents(traceID string, out *model.Batch) error { rw.mu.Lock() defer rw.mu.Unlock() - return rw.rw.WriteTransaction(tx) + return rw.rw.ReadTraceEvents(traceID, out) } -func (rw *lockedReadWriter) WriteSpan(s *model.Span) error { +func (rw *lockedReadWriter) WriteTraceEvent(traceID, id string, event *model.APMEvent) error { rw.mu.Lock() defer rw.mu.Unlock() - return rw.rw.WriteSpan(s) + return rw.rw.WriteTraceEvent(traceID, id, event) } func (rw *lockedReadWriter) WriteTraceSampled(traceID string, sampled bool) error { @@ -144,14 +128,8 @@ func (rw *lockedReadWriter) IsTraceSampled(traceID string) (bool, error) { return rw.rw.IsTraceSampled(traceID) } -func (rw *lockedReadWriter) DeleteTransaction(tx *model.Transaction) error { - rw.mu.Lock() - defer rw.mu.Unlock() - return rw.rw.DeleteTransaction(tx) -} - -func (rw *lockedReadWriter) DeleteSpan(span *model.Span) error { +func (rw *lockedReadWriter) DeleteTraceEvent(traceID, id string) error { rw.mu.Lock() defer rw.mu.Unlock() - return rw.rw.DeleteSpan(span) + return rw.rw.DeleteTraceEvent(traceID, id) } diff --git a/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go b/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go index fd39b0dde15..3b6d82b9dba 100644 --- a/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go +++ b/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go @@ -22,10 +22,12 @@ func BenchmarkShardedWriteTransactionUncontended(b *testing.B) { defer sharded.Close() b.RunParallel(func(pb *testing.PB) { - traceUUID := uuid.Must(uuid.NewV4()) - transaction := &model.Transaction{TraceID: traceUUID.String(), ID: traceUUID.String()} + traceID := uuid.Must(uuid.NewV4()).String() + transaction := &model.APMEvent{ + Transaction: &model.Transaction{TraceID: traceID, ID: traceID}, + } for pb.Next() { - if err := sharded.WriteTransaction(transaction); err != nil { + if err := sharded.WriteTraceEvent(traceID, traceID, transaction); err != nil { b.Fatal(err) } } @@ -41,13 +43,17 @@ func BenchmarkShardedWriteTransactionContended(b *testing.B) { // Use a single trace ID, causing all events to go through // the same sharded writer, contending for a single lock. - traceUUID := uuid.Must(uuid.NewV4()) + traceID := uuid.Must(uuid.NewV4()).String() b.RunParallel(func(pb *testing.PB) { - transactionUUID := uuid.Must(uuid.NewV4()) - transaction := &model.Transaction{TraceID: traceUUID.String(), ID: transactionUUID.String()} + transactionID := uuid.Must(uuid.NewV4()).String() + transaction := &model.APMEvent{ + Transaction: &model.Transaction{ + TraceID: traceID, ID: transactionID, + }, + } for pb.Next() { - if err := sharded.WriteTransaction(transaction); err != nil { + if err := sharded.WriteTraceEvent(traceID, transactionID, transaction); err != nil { b.Fatal(err) } } diff --git a/x-pack/apm-server/sampling/eventstorage/storage.go b/x-pack/apm-server/sampling/eventstorage/storage.go index 1f7f8159dce..bcb8949d5d7 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage.go +++ b/x-pack/apm-server/sampling/eventstorage/storage.go @@ -18,8 +18,7 @@ const ( // over time, to avoid misinterpreting historical data. entryMetaTraceSampled = 's' entryMetaTraceUnsampled = 'u' - entryMetaTransaction = 'T' - entryMetaSpan = 'S' + entryMetaTraceEvent = 'e' ) // ErrNotFound is returned by by the Storage.IsTraceSampled method, @@ -36,10 +35,8 @@ type Storage struct { // Codec provides methods for encoding and decoding events. type Codec interface { - DecodeSpan([]byte, *model.Span) error - DecodeTransaction([]byte, *model.Transaction) error - EncodeSpan(*model.Span) ([]byte, error) - EncodeTransaction(*model.Transaction) ([]byte, error) + DecodeEvent([]byte, *model.APMEvent) error + EncodeEvent(*model.APMEvent) ([]byte, error) } // New returns a new Storage using db and codec. @@ -133,34 +130,17 @@ func (rw *ReadWriter) IsTraceSampled(traceID string) (bool, error) { return item.UserMeta() == entryMetaTraceSampled, nil } -// WriteTransaction writes tx to storage. +// WriteTraceEvent writes a trace event to storage. // -// WriteTransaction may return before the write is committed to storage. +// WriteTraceEvent may return before the write is committed to storage. // Call Flush to ensure the write is committed. -func (rw *ReadWriter) WriteTransaction(tx *model.Transaction) error { - key := append(append([]byte(tx.TraceID), ':'), tx.ID...) - data, err := rw.s.codec.EncodeTransaction(tx) +func (rw *ReadWriter) WriteTraceEvent(traceID string, id string, event *model.APMEvent) error { + key := append(append([]byte(traceID), ':'), id...) + data, err := rw.s.codec.EncodeEvent(event) if err != nil { return err } - return rw.writeEvent(key[:], data, entryMetaTransaction) -} - -// WriteSpan writes span to storage. -// -// WriteSpan may return before the write is committed to storage. -// Call Flush to ensure the write is committed. -func (rw *ReadWriter) WriteSpan(span *model.Span) error { - key := append(append([]byte(span.TraceID), ':'), span.ID...) - data, err := rw.s.codec.EncodeSpan(span) - if err != nil { - return err - } - return rw.writeEvent(key[:], data, entryMetaSpan) -} - -func (rw *ReadWriter) writeEvent(key, value []byte, meta byte) error { - return rw.writeEntry(badger.NewEntry(key, value).WithMeta(meta).WithTTL(rw.s.ttl)) + return rw.writeEntry(badger.NewEntry(key[:], data).WithMeta(entryMetaTraceEvent).WithTTL(rw.s.ttl)) } func (rw *ReadWriter) writeEntry(e *badger.Entry) error { @@ -175,25 +155,18 @@ func (rw *ReadWriter) writeEntry(e *badger.Entry) error { return rw.txn.SetEntry(e) } -// DeleteTransaction deletes the transaction from storage. -func (rw *ReadWriter) DeleteTransaction(tx *model.Transaction) error { - key := append(append([]byte(tx.TraceID), ':'), tx.ID...) - return rw.txn.Delete(key) -} - -// DeleteSpan deletes the span from storage. -func (rw *ReadWriter) DeleteSpan(span *model.Span) error { - key := append(append([]byte(span.TraceID), ':'), span.ID...) +// DeleteTraceEvent deletes the trace event from storage. +func (rw *ReadWriter) DeleteTraceEvent(traceID, id string) error { + key := append(append([]byte(traceID), ':'), id...) return rw.txn.Delete(key) } -// ReadEvents reads events with the given trace ID from storage into a batch. +// ReadTraceEvents reads trace events with the given trace ID from storage into out. // -// ReadEvents may implicitly commit the current transaction when the number -// of pending writes exceeds a threshold. This is due to how Badger internally -// iterates over uncommitted writes, where it will sort keys for each new -// iterator. -func (rw *ReadWriter) ReadEvents(traceID string, out *model.Batch) error { +// ReadTraceEvents may implicitly commit the current transaction when the number of +// pending writes exceeds a threshold. This is due to how Badger internally iterates +// over uncommitted writes, where it will sort keys for each new iterator. +func (rw *ReadWriter) ReadTraceEvents(traceID string, out *model.Batch) error { opts := badger.DefaultIteratorOptions rw.readKeyBuf = append(append(rw.readKeyBuf[:0], traceID...), ':') opts.Prefix = rw.readKeyBuf @@ -215,22 +188,14 @@ func (rw *ReadWriter) ReadEvents(traceID string, out *model.Batch) error { continue } switch item.UserMeta() { - case entryMetaTransaction: - var event model.Transaction - if err := item.Value(func(data []byte) error { - return rw.s.codec.DecodeTransaction(data, &event) - }); err != nil { - return err - } - *out = append(*out, model.APMEvent{Transaction: &event}) - case entryMetaSpan: - var event model.Span + case entryMetaTraceEvent: + var event model.APMEvent if err := item.Value(func(data []byte) error { - return rw.s.codec.DecodeSpan(data, &event) + return rw.s.codec.DecodeEvent(data, &event) }); err != nil { return err } - *out = append(*out, model.APMEvent{Span: &event}) + *out = append(*out, event) default: // Unknown entry meta: ignore. continue diff --git a/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go index 3a8c5834798..d505d3a5f49 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go +++ b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go @@ -25,16 +25,15 @@ func BenchmarkWriteTransaction(b *testing.B) { readWriter := store.NewReadWriter() defer readWriter.Close() - traceID := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} - transactionID := []byte{1, 2, 3, 4, 5, 6, 7, 8} - transaction := &model.Transaction{ - TraceID: hex.EncodeToString(traceID), - ID: hex.EncodeToString(transactionID), + traceID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + transactionID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8}) + transaction := &model.APMEvent{ + Transaction: &model.Transaction{TraceID: traceID, ID: transactionID}, } b.ResetTimer() for i := 0; i < b.N; i++ { - if err := readWriter.WriteTransaction(transaction); err != nil { + if err := readWriter.WriteTraceEvent(traceID, transactionID, transaction); err != nil { b.Fatal(err) } } @@ -53,7 +52,7 @@ func BenchmarkWriteTransaction(b *testing.B) { } func BenchmarkReadEvents(b *testing.B) { - traceUUID := uuid.Must(uuid.NewV4()) + traceID := uuid.Must(uuid.NewV4()).String() test := func(b *testing.B, codec eventstorage.Codec) { // Test with varying numbers of events in the trace. @@ -67,12 +66,14 @@ func BenchmarkReadEvents(b *testing.B) { defer readWriter.Close() for i := 0; i < count; i++ { - transactionUUID := uuid.Must(uuid.NewV4()) - transaction := &model.Transaction{ - TraceID: traceUUID.String(), - ID: transactionUUID.String(), + transactionID := uuid.Must(uuid.NewV4()).String() + transaction := &model.APMEvent{ + Transaction: &model.Transaction{ + TraceID: traceID, + ID: transactionID, + }, } - if err := readWriter.WriteTransaction(transaction); err != nil { + if err := readWriter.WriteTraceEvent(traceID, transactionID, transaction); err != nil { b.Fatal(err) } } @@ -85,7 +86,7 @@ func BenchmarkReadEvents(b *testing.B) { var batch model.Batch for i := 0; i < b.N; i++ { batch = batch[:0] - if err := readWriter.ReadEvents(traceUUID.String(), &batch); err != nil { + if err := readWriter.ReadTraceEvents(traceID, &batch); err != nil { b.Fatal(err) } if len(batch) != count { @@ -156,7 +157,5 @@ func BenchmarkIsTraceSampled(b *testing.B) { type nopCodec struct{} -func (nopCodec) DecodeSpan(data []byte, span *model.Span) error { return nil } -func (nopCodec) DecodeTransaction(data []byte, tx *model.Transaction) error { return nil } -func (nopCodec) EncodeSpan(*model.Span) ([]byte, error) { return nil, nil } -func (nopCodec) EncodeTransaction(*model.Transaction) ([]byte, error) { return nil, nil } +func (nopCodec) DecodeEvent(data []byte, event *model.APMEvent) error { return nil } +func (nopCodec) EncodeEvent(*model.APMEvent) ([]byte, error) { return nil, nil } diff --git a/x-pack/apm-server/sampling/eventstorage/storage_test.go b/x-pack/apm-server/sampling/eventstorage/storage_test.go index 88b4a3d923d..c8638c72020 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage_test.go +++ b/x-pack/apm-server/sampling/eventstorage/storage_test.go @@ -23,7 +23,7 @@ func TestWriteEvents(t *testing.T) { // - 1 transaction and 1 span // - 1 transaction and 100 spans // - // The latter test will cause ReadEvents to implicitly call flush. + // The latter test will cause ReadTraceEvents to implicitly call flush. t.Run("no_flush", func(t *testing.T) { testWriteEvents(t, 1) }) @@ -40,30 +40,28 @@ func testWriteEvents(t *testing.T, numSpans int) { defer readWriter.Close() beforeWrite := time.Now() - traceUUID := uuid.Must(uuid.NewV4()) - transactionUUID := uuid.Must(uuid.NewV4()) - transaction := &model.Transaction{ - TraceID: traceUUID.String(), - ID: transactionUUID.String(), + traceID := uuid.Must(uuid.NewV4()).String() + transactionID := uuid.Must(uuid.NewV4()).String() + transaction := model.APMEvent{ + Transaction: &model.Transaction{TraceID: traceID, ID: transactionID}, } - assert.NoError(t, readWriter.WriteTransaction(transaction)) + assert.NoError(t, readWriter.WriteTraceEvent(traceID, transactionID, &transaction)) var spanEvents []model.APMEvent for i := 0; i < numSpans; i++ { - spanUUID := uuid.Must(uuid.NewV4()) - span := &model.Span{ - TraceID: traceUUID.String(), - ID: spanUUID.String(), + spanID := uuid.Must(uuid.NewV4()).String() + span := model.APMEvent{ + Span: &model.Span{TraceID: traceID, ID: spanID}, } - assert.NoError(t, readWriter.WriteSpan(span)) - spanEvents = append(spanEvents, model.APMEvent{Span: span}) + assert.NoError(t, readWriter.WriteTraceEvent(traceID, spanID, &span)) + spanEvents = append(spanEvents, span) } afterWrite := time.Now() // We can read our writes without flushing. var batch model.Batch - assert.NoError(t, readWriter.ReadEvents(traceUUID.String(), &batch)) - assert.ElementsMatch(t, append(spanEvents, model.APMEvent{Transaction: transaction}), batch) + assert.NoError(t, readWriter.ReadTraceEvents(traceID, &batch)) + assert.ElementsMatch(t, append(spanEvents, transaction), batch) // Flush in order for the writes to be visible to other readers. assert.NoError(t, readWriter.Flush()) @@ -71,7 +69,7 @@ func testWriteEvents(t *testing.T, numSpans int) { var recorded []model.APMEvent assert.NoError(t, db.View(func(txn *badger.Txn) error { iter := txn.NewIterator(badger.IteratorOptions{ - Prefix: []byte(traceUUID.String()), + Prefix: []byte(traceID), }) defer iter.Close() for iter.Rewind(); iter.Valid(); iter.Next() { @@ -92,22 +90,12 @@ func testWriteEvents(t *testing.T, numSpans int) { return !expiryTime.After(upperBound) }, "expiry time %s is after %s", expiryTime, upperBound) - var value interface{} - switch meta := item.UserMeta(); meta { - case 'T': - tx := &model.Transaction{} - recorded = append(recorded, model.APMEvent{Transaction: tx}) - value = tx - case 'S': - span := &model.Span{} - recorded = append(recorded, model.APMEvent{Span: span}) - value = span - default: - t.Fatalf("invalid meta %q", meta) - } + var event model.APMEvent + require.Equal(t, "e", string(item.UserMeta())) assert.NoError(t, item.Value(func(data []byte) error { - return json.Unmarshal(data, value) + return json.Unmarshal(data, &event) })) + recorded = append(recorded, event) } return nil })) @@ -164,7 +152,7 @@ func TestWriteTraceSampled(t *testing.T) { }, sampled) } -func TestReadEvents(t *testing.T) { +func TestReadTraceEvents(t *testing.T) { db := newBadgerDB(t, badgerOptions) ttl := time.Minute store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) @@ -172,14 +160,14 @@ func TestReadEvents(t *testing.T) { traceID := [...]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} require.NoError(t, db.Update(func(txn *badger.Txn) error { key := append(traceID[:], ":12345678"...) - value := []byte(`{"name":"transaction"}`) - if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('T')); err != nil { + value := []byte(`{"transaction":{"name":"transaction"}}`) + if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('e')); err != nil { return err } key = append(traceID[:], ":87654321"...) - value = []byte(`{"name":"span"}`) - if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('S')); err != nil { + value = []byte(`{"span":{"name":"span"}}`) + if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('e')); err != nil { return err } @@ -187,7 +175,7 @@ func TestReadEvents(t *testing.T) { // proceeding colon, causing it to be ignored. key = append(traceID[:], "nocolon"...) value = []byte(`not-json`) - if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('S')); err != nil { + if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('e')); err != nil { return err } @@ -204,14 +192,14 @@ func TestReadEvents(t *testing.T) { defer reader.Close() var events model.Batch - assert.NoError(t, reader.ReadEvents(string(traceID[:]), &events)) + assert.NoError(t, reader.ReadTraceEvents(string(traceID[:]), &events)) assert.Equal(t, model.Batch{ {Transaction: &model.Transaction{Name: "transaction"}}, {Span: &model.Span{Name: "span"}}, }, events) } -func TestReadEventsDecodeError(t *testing.T) { +func TestReadTraceEventsDecodeError(t *testing.T) { db := newBadgerDB(t, badgerOptions) ttl := time.Minute store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) @@ -220,7 +208,7 @@ func TestReadEventsDecodeError(t *testing.T) { require.NoError(t, db.Update(func(txn *badger.Txn) error { key := append(traceID[:], ":12345678"...) value := []byte(`wat`) - if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('T')); err != nil { + if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('e')); err != nil { return err } return nil @@ -230,7 +218,7 @@ func TestReadEventsDecodeError(t *testing.T) { defer reader.Close() var events model.Batch - err := reader.ReadEvents(string(traceID[:]), &events) + err := reader.ReadTraceEvents(string(traceID[:]), &events) assert.Error(t, err) } diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index f473b937b02..9922fc0851d 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -154,19 +154,19 @@ func (p *Processor) ProcessBatch(ctx context.Context, batch *model.Batch) error } events := *batch for i := 0; i < len(events); i++ { - event := events[i] + event := &events[i] var report, stored bool if event.Transaction != nil { var err error atomic.AddInt64(&p.eventMetrics.processed, 1) - report, stored, err = p.processTransaction(event.Transaction) + report, stored, err = p.processTransaction(event) if err != nil { return err } } else if event.Span != nil { var err error atomic.AddInt64(&p.eventMetrics.processed, 1) - report, stored, err = p.processSpan(event.Span) + report, stored, err = p.processSpan(event) if err != nil { return err } @@ -201,14 +201,14 @@ func (p *Processor) updateProcessorMetrics(report, stored bool) { } } -func (p *Processor) processTransaction(tx *model.Transaction) (report, stored bool, _ error) { - if !tx.Sampled { +func (p *Processor) processTransaction(event *model.APMEvent) (report, stored bool, _ error) { + if !event.Transaction.Sampled { // (Head-based) unsampled transactions are passed through // by the tail sampler. return true, false, nil } - traceSampled, err := p.storage.IsTraceSampled(tx.TraceID) + traceSampled, err := p.storage.IsTraceSampled(event.Transaction.TraceID) switch err { case nil: // Tail-sampling decision has been made: report the transaction @@ -222,14 +222,16 @@ func (p *Processor) processTransaction(tx *model.Transaction) (report, stored bo return false, false, err } - if tx.ParentID != "" { + if event.Transaction.ParentID != "" { // Non-root transaction: write to local storage while we wait // for a sampling decision. - return false, true, p.storage.WriteTransaction(tx) + return false, true, p.storage.WriteTraceEvent( + event.Transaction.TraceID, event.Transaction.ID, event, + ) } // Root transaction: apply reservoir sampling. - reservoirSampled, err := p.groups.sampleTrace(tx) + reservoirSampled, err := p.groups.sampleTrace(event.Transaction) if err == errTooManyTraceGroups { // Too many trace groups, drop the transaction. p.tooManyGroupsLogger.Warn(` @@ -249,21 +251,25 @@ sampling policies without service name specified. // This is a local optimisation only. To avoid creating network // traffic and load on Elasticsearch for uninteresting root // transactions, we do not propagate this to other APM Servers. - return false, false, p.storage.WriteTraceSampled(tx.TraceID, false) + return false, false, p.storage.WriteTraceSampled(event.Transaction.TraceID, false) } // The root transaction was admitted to the sampling reservoir, so we // can proceed to write the transaction to storage; we may index it later, // after finalising the sampling decision. - return false, true, p.storage.WriteTransaction(tx) + return false, true, p.storage.WriteTraceEvent( + event.Transaction.TraceID, event.Transaction.ID, event, + ) } -func (p *Processor) processSpan(span *model.Span) (report, stored bool, _ error) { - traceSampled, err := p.storage.IsTraceSampled(span.TraceID) +func (p *Processor) processSpan(event *model.APMEvent) (report, stored bool, _ error) { + traceSampled, err := p.storage.IsTraceSampled(event.Span.TraceID) if err != nil { if err == eventstorage.ErrNotFound { - // Tail-sampling decision has not yet been made, write span to local storage. - return false, true, p.storage.WriteSpan(span) + // Tail-sampling decision has not yet been made, write event to local storage. + return false, true, p.storage.WriteTraceEvent( + event.Span.TraceID, event.Span.ID, event, + ) } return false, false, err } @@ -445,7 +451,7 @@ func (p *Processor) Run() error { return err } var events model.Batch - if err := p.storage.ReadEvents(traceID, &events); err != nil { + if err := p.storage.ReadTraceEvents(traceID, &events); err != nil { return err } if n := len(events); n > 0 { @@ -459,11 +465,11 @@ func (p *Processor) Run() error { // at-most-once, not guaranteed. for _, event := range events { if event.Transaction != nil { - if err := p.storage.DeleteTransaction(event.Transaction); err != nil { + if err := p.storage.DeleteTraceEvent(event.Transaction.TraceID, event.Transaction.ID); err != nil { return errors.Wrap(err, "failed to delete transaction from local storage") } } else if event.Span != nil { - if err := p.storage.DeleteSpan(event.Span); err != nil { + if err := p.storage.DeleteTraceEvent(event.Span.TraceID, event.Span.ID); err != nil { return errors.Wrap(err, "failed to delete span from local storage") } } diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go index 0464af40554..221ff75a9a4 100644 --- a/x-pack/apm-server/sampling/processor_test.go +++ b/x-pack/apm-server/sampling/processor_test.go @@ -127,11 +127,11 @@ func TestProcessAlreadyTailSampled(t *testing.T) { defer reader.Close() var batch model.Batch - err := reader.ReadEvents(traceID1, &batch) + err := reader.ReadTraceEvents(traceID1, &batch) assert.NoError(t, err) assert.Zero(t, batch) - err = reader.ReadEvents(traceID2, &batch) + err = reader.ReadTraceEvents(traceID2, &batch) assert.NoError(t, err) assert.Equal(t, model.Batch{ {Transaction: transaction2}, @@ -238,7 +238,7 @@ func TestProcessLocalTailSampling(t *testing.T) { assert.False(t, sampled) var batch model.Batch - err = reader.ReadEvents(sampledTraceID, &batch) + err = reader.ReadTraceEvents(sampledTraceID, &batch) assert.NoError(t, err) assert.Equal(t, sampledTraceEvents, batch) @@ -246,7 +246,7 @@ func TestProcessLocalTailSampling(t *testing.T) { // available in storage until the TTL expires, as they're // written there first. batch = batch[:0] - err = reader.ReadEvents(unsampledTraceID, &batch) + err = reader.ReadTraceEvents(unsampledTraceID, &batch) assert.NoError(t, err) assert.Equal(t, unsampledTraceEvents, batch) }) @@ -454,12 +454,12 @@ func TestProcessRemoteTailSampling(t *testing.T) { assert.True(t, sampled) var batch model.Batch - err = reader.ReadEvents(traceID1, &batch) + err = reader.ReadTraceEvents(traceID1, &batch) assert.NoError(t, err) assert.Zero(t, batch) // events are deleted from local storage batch = model.Batch{} - err = reader.ReadEvents(traceID2, &batch) + err = reader.ReadTraceEvents(traceID2, &batch) assert.NoError(t, err) assert.Empty(t, batch) })