Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sammpling: refactor to read/write model.APMEvents #5814

Merged
merged 1 commit into from
Jul 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions x-pack/apm-server/sampling/eventstorage/codec.go

This file was deleted.

22 changes: 6 additions & 16 deletions x-pack/apm-server/sampling/eventstorage/jsoncodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,12 @@ import (
// JSONCodec is an implementation of Codec, using JSON encoding.
type JSONCodec struct{}

// DecodeSpan decodes data as JSON into span.
func (JSONCodec) DecodeSpan(data []byte, span *model.Span) error {
return jsoniter.ConfigFastest.Unmarshal(data, span)
// DecodeEvent decodes data as JSON into event.
func (JSONCodec) DecodeEvent(data []byte, event *model.APMEvent) error {
return jsoniter.ConfigFastest.Unmarshal(data, event)
}

// DecodeTransaction decodes data as JSON into tx.
func (JSONCodec) DecodeTransaction(data []byte, tx *model.Transaction) error {
return jsoniter.ConfigFastest.Unmarshal(data, tx)
}

// EncodeSpan encodes span as JSON.
func (JSONCodec) EncodeSpan(span *model.Span) ([]byte, error) {
return json.Marshal(span)
}

// EncodeTransaction encodes tx as JSON.
func (JSONCodec) EncodeTransaction(tx *model.Transaction) ([]byte, error) {
return json.Marshal(tx)
// EncodeEvent encodes event as JSON.
func (JSONCodec) EncodeEvent(event *model.APMEvent) ([]byte, error) {
return json.Marshal(event)
}
52 changes: 15 additions & 37 deletions x-pack/apm-server/sampling/eventstorage/sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,14 @@ func (s *ShardedReadWriter) Flush() error {
return result
}

// ReadEvents calls Writer.ReadEvents, using a sharded, locked, Writer.
func (s *ShardedReadWriter) ReadEvents(traceID string, out *model.Batch) error {
return s.getWriter(traceID).ReadEvents(traceID, out)
// ReadTraceEvents calls Writer.ReadTraceEvents, using a sharded, locked, Writer.
func (s *ShardedReadWriter) ReadTraceEvents(traceID string, out *model.Batch) error {
return s.getWriter(traceID).ReadTraceEvents(traceID, out)
}

// WriteTransaction calls Writer.WriteTransaction, using a sharded, locked, Writer.
func (s *ShardedReadWriter) WriteTransaction(tx *model.Transaction) error {
return s.getWriter(tx.TraceID).WriteTransaction(tx)
}

// WriteSpan calls Writer.WriteSpan, using a sharded, locked, Writer.
func (s *ShardedReadWriter) WriteSpan(span *model.Span) error {
return s.getWriter(span.TraceID).WriteSpan(span)
// WriteTraceEvent calls Writer.WriteTraceEvent, using a sharded, locked, Writer.
func (s *ShardedReadWriter) WriteTraceEvent(traceID, id string, event *model.APMEvent) error {
return s.getWriter(traceID).WriteTraceEvent(traceID, id, event)
}

// WriteTraceSampled calls Writer.WriteTraceSampled, using a sharded, locked, Writer.
Expand All @@ -76,14 +71,9 @@ func (s *ShardedReadWriter) IsTraceSampled(traceID string) (bool, error) {
return s.getWriter(traceID).IsTraceSampled(traceID)
}

// DeleteTransaction calls Writer.DeleteTransaction, using a sharded, locked, Writer.
func (s *ShardedReadWriter) DeleteTransaction(tx *model.Transaction) error {
return s.getWriter(tx.TraceID).DeleteTransaction(tx)
}

// DeleteSpan calls Writer.DeleteSpan, using a sharded, locked, Writer.
func (s *ShardedReadWriter) DeleteSpan(span *model.Span) error {
return s.getWriter(span.TraceID).DeleteSpan(span)
// DeleteTraceEvent calls Writer.DeleteTraceEvent, using a sharded, locked, Writer.
func (s *ShardedReadWriter) DeleteTraceEvent(traceID, id string) error {
return s.getWriter(traceID).DeleteTraceEvent(traceID, id)
}

// getWriter returns an event storage writer for the given trace ID.
Expand Down Expand Up @@ -114,22 +104,16 @@ func (rw *lockedReadWriter) Flush() error {
return rw.rw.Flush()
}

func (rw *lockedReadWriter) ReadEvents(traceID string, out *model.Batch) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.ReadEvents(traceID, out)
}

func (rw *lockedReadWriter) WriteTransaction(tx *model.Transaction) error {
func (rw *lockedReadWriter) ReadTraceEvents(traceID string, out *model.Batch) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.WriteTransaction(tx)
return rw.rw.ReadTraceEvents(traceID, out)
}

func (rw *lockedReadWriter) WriteSpan(s *model.Span) error {
func (rw *lockedReadWriter) WriteTraceEvent(traceID, id string, event *model.APMEvent) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.WriteSpan(s)
return rw.rw.WriteTraceEvent(traceID, id, event)
}

func (rw *lockedReadWriter) WriteTraceSampled(traceID string, sampled bool) error {
Expand All @@ -144,14 +128,8 @@ func (rw *lockedReadWriter) IsTraceSampled(traceID string) (bool, error) {
return rw.rw.IsTraceSampled(traceID)
}

func (rw *lockedReadWriter) DeleteTransaction(tx *model.Transaction) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.DeleteTransaction(tx)
}

func (rw *lockedReadWriter) DeleteSpan(span *model.Span) error {
func (rw *lockedReadWriter) DeleteTraceEvent(traceID, id string) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.DeleteSpan(span)
return rw.rw.DeleteTraceEvent(traceID, id)
}
20 changes: 13 additions & 7 deletions x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ func BenchmarkShardedWriteTransactionUncontended(b *testing.B) {
defer sharded.Close()

b.RunParallel(func(pb *testing.PB) {
traceUUID := uuid.Must(uuid.NewV4())
transaction := &model.Transaction{TraceID: traceUUID.String(), ID: traceUUID.String()}
traceID := uuid.Must(uuid.NewV4()).String()
transaction := &model.APMEvent{
Transaction: &model.Transaction{TraceID: traceID, ID: traceID},
}
for pb.Next() {
if err := sharded.WriteTransaction(transaction); err != nil {
if err := sharded.WriteTraceEvent(traceID, traceID, transaction); err != nil {
b.Fatal(err)
}
}
Expand All @@ -41,13 +43,17 @@ func BenchmarkShardedWriteTransactionContended(b *testing.B) {

// Use a single trace ID, causing all events to go through
// the same sharded writer, contending for a single lock.
traceUUID := uuid.Must(uuid.NewV4())
traceID := uuid.Must(uuid.NewV4()).String()

b.RunParallel(func(pb *testing.PB) {
transactionUUID := uuid.Must(uuid.NewV4())
transaction := &model.Transaction{TraceID: traceUUID.String(), ID: transactionUUID.String()}
transactionID := uuid.Must(uuid.NewV4()).String()
transaction := &model.APMEvent{
Transaction: &model.Transaction{
TraceID: traceID, ID: transactionID,
},
}
for pb.Next() {
if err := sharded.WriteTransaction(transaction); err != nil {
if err := sharded.WriteTraceEvent(traceID, transactionID, transaction); err != nil {
b.Fatal(err)
}
}
Expand Down
77 changes: 21 additions & 56 deletions x-pack/apm-server/sampling/eventstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ const (
// over time, to avoid misinterpreting historical data.
entryMetaTraceSampled = 's'
entryMetaTraceUnsampled = 'u'
entryMetaTransaction = 'T'
entryMetaSpan = 'S'
entryMetaTraceEvent = 'e'
)

// ErrNotFound is returned by by the Storage.IsTraceSampled method,
Expand All @@ -36,10 +35,8 @@ type Storage struct {

// Codec provides methods for encoding and decoding events.
type Codec interface {
DecodeSpan([]byte, *model.Span) error
DecodeTransaction([]byte, *model.Transaction) error
EncodeSpan(*model.Span) ([]byte, error)
EncodeTransaction(*model.Transaction) ([]byte, error)
DecodeEvent([]byte, *model.APMEvent) error
EncodeEvent(*model.APMEvent) ([]byte, error)
}

// New returns a new Storage using db and codec.
Expand Down Expand Up @@ -133,34 +130,17 @@ func (rw *ReadWriter) IsTraceSampled(traceID string) (bool, error) {
return item.UserMeta() == entryMetaTraceSampled, nil
}

// WriteTransaction writes tx to storage.
// WriteTraceEvent writes a trace event to storage.
//
// WriteTransaction may return before the write is committed to storage.
// WriteTraceEvent may return before the write is committed to storage.
// Call Flush to ensure the write is committed.
func (rw *ReadWriter) WriteTransaction(tx *model.Transaction) error {
key := append(append([]byte(tx.TraceID), ':'), tx.ID...)
data, err := rw.s.codec.EncodeTransaction(tx)
func (rw *ReadWriter) WriteTraceEvent(traceID string, id string, event *model.APMEvent) error {
key := append(append([]byte(traceID), ':'), id...)
data, err := rw.s.codec.EncodeEvent(event)
if err != nil {
return err
}
return rw.writeEvent(key[:], data, entryMetaTransaction)
}

// WriteSpan writes span to storage.
//
// WriteSpan may return before the write is committed to storage.
// Call Flush to ensure the write is committed.
func (rw *ReadWriter) WriteSpan(span *model.Span) error {
key := append(append([]byte(span.TraceID), ':'), span.ID...)
data, err := rw.s.codec.EncodeSpan(span)
if err != nil {
return err
}
return rw.writeEvent(key[:], data, entryMetaSpan)
}

func (rw *ReadWriter) writeEvent(key, value []byte, meta byte) error {
return rw.writeEntry(badger.NewEntry(key, value).WithMeta(meta).WithTTL(rw.s.ttl))
return rw.writeEntry(badger.NewEntry(key[:], data).WithMeta(entryMetaTraceEvent).WithTTL(rw.s.ttl))
}

func (rw *ReadWriter) writeEntry(e *badger.Entry) error {
Expand All @@ -175,25 +155,18 @@ func (rw *ReadWriter) writeEntry(e *badger.Entry) error {
return rw.txn.SetEntry(e)
}

// DeleteTransaction deletes the transaction from storage.
func (rw *ReadWriter) DeleteTransaction(tx *model.Transaction) error {
key := append(append([]byte(tx.TraceID), ':'), tx.ID...)
return rw.txn.Delete(key)
}

// DeleteSpan deletes the span from storage.
func (rw *ReadWriter) DeleteSpan(span *model.Span) error {
key := append(append([]byte(span.TraceID), ':'), span.ID...)
// DeleteTraceEvent deletes the trace event from storage.
func (rw *ReadWriter) DeleteTraceEvent(traceID, id string) error {
key := append(append([]byte(traceID), ':'), id...)
return rw.txn.Delete(key)
}

// ReadEvents reads events with the given trace ID from storage into a batch.
// ReadTraceEvents reads trace events with the given trace ID from storage into out.
//
// ReadEvents may implicitly commit the current transaction when the number
// of pending writes exceeds a threshold. This is due to how Badger internally
// iterates over uncommitted writes, where it will sort keys for each new
// iterator.
func (rw *ReadWriter) ReadEvents(traceID string, out *model.Batch) error {
// ReadTraceEvents may implicitly commit the current transaction when the number of
// pending writes exceeds a threshold. This is due to how Badger internally iterates
// over uncommitted writes, where it will sort keys for each new iterator.
func (rw *ReadWriter) ReadTraceEvents(traceID string, out *model.Batch) error {
opts := badger.DefaultIteratorOptions
rw.readKeyBuf = append(append(rw.readKeyBuf[:0], traceID...), ':')
opts.Prefix = rw.readKeyBuf
Expand All @@ -215,22 +188,14 @@ func (rw *ReadWriter) ReadEvents(traceID string, out *model.Batch) error {
continue
}
switch item.UserMeta() {
case entryMetaTransaction:
var event model.Transaction
if err := item.Value(func(data []byte) error {
return rw.s.codec.DecodeTransaction(data, &event)
}); err != nil {
return err
}
*out = append(*out, model.APMEvent{Transaction: &event})
case entryMetaSpan:
var event model.Span
case entryMetaTraceEvent:
var event model.APMEvent
if err := item.Value(func(data []byte) error {
return rw.s.codec.DecodeSpan(data, &event)
return rw.s.codec.DecodeEvent(data, &event)
}); err != nil {
return err
}
*out = append(*out, model.APMEvent{Span: &event})
*out = append(*out, event)
default:
// Unknown entry meta: ignore.
continue
Expand Down
33 changes: 16 additions & 17 deletions x-pack/apm-server/sampling/eventstorage/storage_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ func BenchmarkWriteTransaction(b *testing.B) {
readWriter := store.NewReadWriter()
defer readWriter.Close()

traceID := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
transactionID := []byte{1, 2, 3, 4, 5, 6, 7, 8}
transaction := &model.Transaction{
TraceID: hex.EncodeToString(traceID),
ID: hex.EncodeToString(transactionID),
traceID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
transactionID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8})
transaction := &model.APMEvent{
Transaction: &model.Transaction{TraceID: traceID, ID: transactionID},
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := readWriter.WriteTransaction(transaction); err != nil {
if err := readWriter.WriteTraceEvent(traceID, transactionID, transaction); err != nil {
b.Fatal(err)
}
}
Expand All @@ -53,7 +52,7 @@ func BenchmarkWriteTransaction(b *testing.B) {
}

func BenchmarkReadEvents(b *testing.B) {
traceUUID := uuid.Must(uuid.NewV4())
traceID := uuid.Must(uuid.NewV4()).String()

test := func(b *testing.B, codec eventstorage.Codec) {
// Test with varying numbers of events in the trace.
Expand All @@ -67,12 +66,14 @@ func BenchmarkReadEvents(b *testing.B) {
defer readWriter.Close()

for i := 0; i < count; i++ {
transactionUUID := uuid.Must(uuid.NewV4())
transaction := &model.Transaction{
TraceID: traceUUID.String(),
ID: transactionUUID.String(),
transactionID := uuid.Must(uuid.NewV4()).String()
transaction := &model.APMEvent{
Transaction: &model.Transaction{
TraceID: traceID,
ID: transactionID,
},
}
if err := readWriter.WriteTransaction(transaction); err != nil {
if err := readWriter.WriteTraceEvent(traceID, transactionID, transaction); err != nil {
b.Fatal(err)
}
}
Expand All @@ -85,7 +86,7 @@ func BenchmarkReadEvents(b *testing.B) {
var batch model.Batch
for i := 0; i < b.N; i++ {
batch = batch[:0]
if err := readWriter.ReadEvents(traceUUID.String(), &batch); err != nil {
if err := readWriter.ReadTraceEvents(traceID, &batch); err != nil {
b.Fatal(err)
}
if len(batch) != count {
Expand Down Expand Up @@ -156,7 +157,5 @@ func BenchmarkIsTraceSampled(b *testing.B) {

type nopCodec struct{}

func (nopCodec) DecodeSpan(data []byte, span *model.Span) error { return nil }
func (nopCodec) DecodeTransaction(data []byte, tx *model.Transaction) error { return nil }
func (nopCodec) EncodeSpan(*model.Span) ([]byte, error) { return nil, nil }
func (nopCodec) EncodeTransaction(*model.Transaction) ([]byte, error) { return nil, nil }
func (nopCodec) DecodeEvent(data []byte, event *model.APMEvent) error { return nil }
func (nopCodec) EncodeEvent(*model.APMEvent) ([]byte, error) { return nil, nil }
Loading