Skip to content

Commit

Permalink
sammpling: refactor to read/write model.APMEvents (#5814)
Browse files Browse the repository at this point in the history
Update the tail-based sampling package to read and write
model.APMEvents, rather than model.Transactions and model.Spans.
This is in preparation for moving "metadata" up to APMEvent.

(cherry picked from commit 96d24d4)
  • Loading branch information
axw authored and mergify-bot committed Jul 29, 2021
1 parent 9b6985e commit ed281eb
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 202 deletions.
5 changes: 0 additions & 5 deletions x-pack/apm-server/sampling/eventstorage/codec.go

This file was deleted.

22 changes: 6 additions & 16 deletions x-pack/apm-server/sampling/eventstorage/jsoncodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,12 @@ import (
// JSONCodec is an implementation of Codec, using JSON encoding.
type JSONCodec struct{}

// DecodeSpan decodes data as JSON into span.
func (JSONCodec) DecodeSpan(data []byte, span *model.Span) error {
return jsoniter.ConfigFastest.Unmarshal(data, span)
// DecodeEvent decodes data as JSON into event.
func (JSONCodec) DecodeEvent(data []byte, event *model.APMEvent) error {
return jsoniter.ConfigFastest.Unmarshal(data, event)
}

// DecodeTransaction decodes data as JSON into tx.
func (JSONCodec) DecodeTransaction(data []byte, tx *model.Transaction) error {
return jsoniter.ConfigFastest.Unmarshal(data, tx)
}

// EncodeSpan encodes span as JSON.
func (JSONCodec) EncodeSpan(span *model.Span) ([]byte, error) {
return json.Marshal(span)
}

// EncodeTransaction encodes tx as JSON.
func (JSONCodec) EncodeTransaction(tx *model.Transaction) ([]byte, error) {
return json.Marshal(tx)
// EncodeEvent encodes event as JSON.
func (JSONCodec) EncodeEvent(event *model.APMEvent) ([]byte, error) {
return json.Marshal(event)
}
52 changes: 15 additions & 37 deletions x-pack/apm-server/sampling/eventstorage/sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,14 @@ func (s *ShardedReadWriter) Flush() error {
return result
}

// ReadEvents calls Writer.ReadEvents, using a sharded, locked, Writer.
func (s *ShardedReadWriter) ReadEvents(traceID string, out *model.Batch) error {
return s.getWriter(traceID).ReadEvents(traceID, out)
// ReadTraceEvents calls Writer.ReadTraceEvents, using a sharded, locked, Writer.
func (s *ShardedReadWriter) ReadTraceEvents(traceID string, out *model.Batch) error {
return s.getWriter(traceID).ReadTraceEvents(traceID, out)
}

// WriteTransaction calls Writer.WriteTransaction, using a sharded, locked, Writer.
func (s *ShardedReadWriter) WriteTransaction(tx *model.Transaction) error {
return s.getWriter(tx.TraceID).WriteTransaction(tx)
}

// WriteSpan calls Writer.WriteSpan, using a sharded, locked, Writer.
func (s *ShardedReadWriter) WriteSpan(span *model.Span) error {
return s.getWriter(span.TraceID).WriteSpan(span)
// WriteTraceEvent calls Writer.WriteTraceEvent, using a sharded, locked, Writer.
func (s *ShardedReadWriter) WriteTraceEvent(traceID, id string, event *model.APMEvent) error {
return s.getWriter(traceID).WriteTraceEvent(traceID, id, event)
}

// WriteTraceSampled calls Writer.WriteTraceSampled, using a sharded, locked, Writer.
Expand All @@ -76,14 +71,9 @@ func (s *ShardedReadWriter) IsTraceSampled(traceID string) (bool, error) {
return s.getWriter(traceID).IsTraceSampled(traceID)
}

// DeleteTransaction calls Writer.DeleteTransaction, using a sharded, locked, Writer.
func (s *ShardedReadWriter) DeleteTransaction(tx *model.Transaction) error {
return s.getWriter(tx.TraceID).DeleteTransaction(tx)
}

// DeleteSpan calls Writer.DeleteSpan, using a sharded, locked, Writer.
func (s *ShardedReadWriter) DeleteSpan(span *model.Span) error {
return s.getWriter(span.TraceID).DeleteSpan(span)
// DeleteTraceEvent calls Writer.DeleteTraceEvent, using a sharded, locked, Writer.
func (s *ShardedReadWriter) DeleteTraceEvent(traceID, id string) error {
return s.getWriter(traceID).DeleteTraceEvent(traceID, id)
}

// getWriter returns an event storage writer for the given trace ID.
Expand Down Expand Up @@ -114,22 +104,16 @@ func (rw *lockedReadWriter) Flush() error {
return rw.rw.Flush()
}

func (rw *lockedReadWriter) ReadEvents(traceID string, out *model.Batch) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.ReadEvents(traceID, out)
}

func (rw *lockedReadWriter) WriteTransaction(tx *model.Transaction) error {
func (rw *lockedReadWriter) ReadTraceEvents(traceID string, out *model.Batch) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.WriteTransaction(tx)
return rw.rw.ReadTraceEvents(traceID, out)
}

func (rw *lockedReadWriter) WriteSpan(s *model.Span) error {
func (rw *lockedReadWriter) WriteTraceEvent(traceID, id string, event *model.APMEvent) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.WriteSpan(s)
return rw.rw.WriteTraceEvent(traceID, id, event)
}

func (rw *lockedReadWriter) WriteTraceSampled(traceID string, sampled bool) error {
Expand All @@ -144,14 +128,8 @@ func (rw *lockedReadWriter) IsTraceSampled(traceID string) (bool, error) {
return rw.rw.IsTraceSampled(traceID)
}

func (rw *lockedReadWriter) DeleteTransaction(tx *model.Transaction) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.DeleteTransaction(tx)
}

func (rw *lockedReadWriter) DeleteSpan(span *model.Span) error {
func (rw *lockedReadWriter) DeleteTraceEvent(traceID, id string) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.DeleteSpan(span)
return rw.rw.DeleteTraceEvent(traceID, id)
}
20 changes: 13 additions & 7 deletions x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ func BenchmarkShardedWriteTransactionUncontended(b *testing.B) {
defer sharded.Close()

b.RunParallel(func(pb *testing.PB) {
traceUUID := uuid.Must(uuid.NewV4())
transaction := &model.Transaction{TraceID: traceUUID.String(), ID: traceUUID.String()}
traceID := uuid.Must(uuid.NewV4()).String()
transaction := &model.APMEvent{
Transaction: &model.Transaction{TraceID: traceID, ID: traceID},
}
for pb.Next() {
if err := sharded.WriteTransaction(transaction); err != nil {
if err := sharded.WriteTraceEvent(traceID, traceID, transaction); err != nil {
b.Fatal(err)
}
}
Expand All @@ -41,13 +43,17 @@ func BenchmarkShardedWriteTransactionContended(b *testing.B) {

// Use a single trace ID, causing all events to go through
// the same sharded writer, contending for a single lock.
traceUUID := uuid.Must(uuid.NewV4())
traceID := uuid.Must(uuid.NewV4()).String()

b.RunParallel(func(pb *testing.PB) {
transactionUUID := uuid.Must(uuid.NewV4())
transaction := &model.Transaction{TraceID: traceUUID.String(), ID: transactionUUID.String()}
transactionID := uuid.Must(uuid.NewV4()).String()
transaction := &model.APMEvent{
Transaction: &model.Transaction{
TraceID: traceID, ID: transactionID,
},
}
for pb.Next() {
if err := sharded.WriteTransaction(transaction); err != nil {
if err := sharded.WriteTraceEvent(traceID, transactionID, transaction); err != nil {
b.Fatal(err)
}
}
Expand Down
77 changes: 21 additions & 56 deletions x-pack/apm-server/sampling/eventstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ const (
// over time, to avoid misinterpreting historical data.
entryMetaTraceSampled = 's'
entryMetaTraceUnsampled = 'u'
entryMetaTransaction = 'T'
entryMetaSpan = 'S'
entryMetaTraceEvent = 'e'
)

// ErrNotFound is returned by by the Storage.IsTraceSampled method,
Expand All @@ -36,10 +35,8 @@ type Storage struct {

// Codec provides methods for encoding and decoding events.
type Codec interface {
DecodeSpan([]byte, *model.Span) error
DecodeTransaction([]byte, *model.Transaction) error
EncodeSpan(*model.Span) ([]byte, error)
EncodeTransaction(*model.Transaction) ([]byte, error)
DecodeEvent([]byte, *model.APMEvent) error
EncodeEvent(*model.APMEvent) ([]byte, error)
}

// New returns a new Storage using db and codec.
Expand Down Expand Up @@ -133,34 +130,17 @@ func (rw *ReadWriter) IsTraceSampled(traceID string) (bool, error) {
return item.UserMeta() == entryMetaTraceSampled, nil
}

// WriteTransaction writes tx to storage.
// WriteTraceEvent writes a trace event to storage.
//
// WriteTransaction may return before the write is committed to storage.
// WriteTraceEvent may return before the write is committed to storage.
// Call Flush to ensure the write is committed.
func (rw *ReadWriter) WriteTransaction(tx *model.Transaction) error {
key := append(append([]byte(tx.TraceID), ':'), tx.ID...)
data, err := rw.s.codec.EncodeTransaction(tx)
func (rw *ReadWriter) WriteTraceEvent(traceID string, id string, event *model.APMEvent) error {
key := append(append([]byte(traceID), ':'), id...)
data, err := rw.s.codec.EncodeEvent(event)
if err != nil {
return err
}
return rw.writeEvent(key[:], data, entryMetaTransaction)
}

// WriteSpan writes span to storage.
//
// WriteSpan may return before the write is committed to storage.
// Call Flush to ensure the write is committed.
func (rw *ReadWriter) WriteSpan(span *model.Span) error {
key := append(append([]byte(span.TraceID), ':'), span.ID...)
data, err := rw.s.codec.EncodeSpan(span)
if err != nil {
return err
}
return rw.writeEvent(key[:], data, entryMetaSpan)
}

func (rw *ReadWriter) writeEvent(key, value []byte, meta byte) error {
return rw.writeEntry(badger.NewEntry(key, value).WithMeta(meta).WithTTL(rw.s.ttl))
return rw.writeEntry(badger.NewEntry(key[:], data).WithMeta(entryMetaTraceEvent).WithTTL(rw.s.ttl))
}

func (rw *ReadWriter) writeEntry(e *badger.Entry) error {
Expand All @@ -175,25 +155,18 @@ func (rw *ReadWriter) writeEntry(e *badger.Entry) error {
return rw.txn.SetEntry(e)
}

// DeleteTransaction deletes the transaction from storage.
func (rw *ReadWriter) DeleteTransaction(tx *model.Transaction) error {
key := append(append([]byte(tx.TraceID), ':'), tx.ID...)
return rw.txn.Delete(key)
}

// DeleteSpan deletes the span from storage.
func (rw *ReadWriter) DeleteSpan(span *model.Span) error {
key := append(append([]byte(span.TraceID), ':'), span.ID...)
// DeleteTraceEvent deletes the trace event from storage.
func (rw *ReadWriter) DeleteTraceEvent(traceID, id string) error {
key := append(append([]byte(traceID), ':'), id...)
return rw.txn.Delete(key)
}

// ReadEvents reads events with the given trace ID from storage into a batch.
// ReadTraceEvents reads trace events with the given trace ID from storage into out.
//
// ReadEvents may implicitly commit the current transaction when the number
// of pending writes exceeds a threshold. This is due to how Badger internally
// iterates over uncommitted writes, where it will sort keys for each new
// iterator.
func (rw *ReadWriter) ReadEvents(traceID string, out *model.Batch) error {
// ReadTraceEvents may implicitly commit the current transaction when the number of
// pending writes exceeds a threshold. This is due to how Badger internally iterates
// over uncommitted writes, where it will sort keys for each new iterator.
func (rw *ReadWriter) ReadTraceEvents(traceID string, out *model.Batch) error {
opts := badger.DefaultIteratorOptions
rw.readKeyBuf = append(append(rw.readKeyBuf[:0], traceID...), ':')
opts.Prefix = rw.readKeyBuf
Expand All @@ -215,22 +188,14 @@ func (rw *ReadWriter) ReadEvents(traceID string, out *model.Batch) error {
continue
}
switch item.UserMeta() {
case entryMetaTransaction:
var event model.Transaction
if err := item.Value(func(data []byte) error {
return rw.s.codec.DecodeTransaction(data, &event)
}); err != nil {
return err
}
*out = append(*out, model.APMEvent{Transaction: &event})
case entryMetaSpan:
var event model.Span
case entryMetaTraceEvent:
var event model.APMEvent
if err := item.Value(func(data []byte) error {
return rw.s.codec.DecodeSpan(data, &event)
return rw.s.codec.DecodeEvent(data, &event)
}); err != nil {
return err
}
*out = append(*out, model.APMEvent{Span: &event})
*out = append(*out, event)
default:
// Unknown entry meta: ignore.
continue
Expand Down
33 changes: 16 additions & 17 deletions x-pack/apm-server/sampling/eventstorage/storage_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ func BenchmarkWriteTransaction(b *testing.B) {
readWriter := store.NewReadWriter()
defer readWriter.Close()

traceID := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
transactionID := []byte{1, 2, 3, 4, 5, 6, 7, 8}
transaction := &model.Transaction{
TraceID: hex.EncodeToString(traceID),
ID: hex.EncodeToString(transactionID),
traceID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
transactionID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8})
transaction := &model.APMEvent{
Transaction: &model.Transaction{TraceID: traceID, ID: transactionID},
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := readWriter.WriteTransaction(transaction); err != nil {
if err := readWriter.WriteTraceEvent(traceID, transactionID, transaction); err != nil {
b.Fatal(err)
}
}
Expand All @@ -53,7 +52,7 @@ func BenchmarkWriteTransaction(b *testing.B) {
}

func BenchmarkReadEvents(b *testing.B) {
traceUUID := uuid.Must(uuid.NewV4())
traceID := uuid.Must(uuid.NewV4()).String()

test := func(b *testing.B, codec eventstorage.Codec) {
// Test with varying numbers of events in the trace.
Expand All @@ -67,12 +66,14 @@ func BenchmarkReadEvents(b *testing.B) {
defer readWriter.Close()

for i := 0; i < count; i++ {
transactionUUID := uuid.Must(uuid.NewV4())
transaction := &model.Transaction{
TraceID: traceUUID.String(),
ID: transactionUUID.String(),
transactionID := uuid.Must(uuid.NewV4()).String()
transaction := &model.APMEvent{
Transaction: &model.Transaction{
TraceID: traceID,
ID: transactionID,
},
}
if err := readWriter.WriteTransaction(transaction); err != nil {
if err := readWriter.WriteTraceEvent(traceID, transactionID, transaction); err != nil {
b.Fatal(err)
}
}
Expand All @@ -85,7 +86,7 @@ func BenchmarkReadEvents(b *testing.B) {
var batch model.Batch
for i := 0; i < b.N; i++ {
batch = batch[:0]
if err := readWriter.ReadEvents(traceUUID.String(), &batch); err != nil {
if err := readWriter.ReadTraceEvents(traceID, &batch); err != nil {
b.Fatal(err)
}
if len(batch) != count {
Expand Down Expand Up @@ -156,7 +157,5 @@ func BenchmarkIsTraceSampled(b *testing.B) {

type nopCodec struct{}

func (nopCodec) DecodeSpan(data []byte, span *model.Span) error { return nil }
func (nopCodec) DecodeTransaction(data []byte, tx *model.Transaction) error { return nil }
func (nopCodec) EncodeSpan(*model.Span) ([]byte, error) { return nil, nil }
func (nopCodec) EncodeTransaction(*model.Transaction) ([]byte, error) { return nil, nil }
func (nopCodec) DecodeEvent(data []byte, event *model.APMEvent) error { return nil }
func (nopCodec) EncodeEvent(*model.APMEvent) ([]byte, error) { return nil, nil }
Loading

0 comments on commit ed281eb

Please sign in to comment.