Skip to content

Commit

Permalink
Change servicegraphprocessor to avoid allocating hex strings for map …
Browse files Browse the repository at this point in the history
…key (open-telemetry#16248)

Signed-off-by: Bogdan Drutu <[email protected]>

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored and shalper2 committed Dec 6, 2022
1 parent 18a6da4 commit 905f5c9
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 75 deletions.
4 changes: 2 additions & 2 deletions processor/servicegraphprocessor/internal/store/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (

// Edge is an Edge between two nodes in the graph
type Edge struct {
key string
key Key

TraceID pcommon.TraceID
ConnectionType ConnectionType
Expand All @@ -48,7 +48,7 @@ type Edge struct {
expiration time.Time
}

func newEdge(key string, ttl time.Duration) *Edge {
func newEdge(key Key, ttl time.Duration) *Edge {
return &Edge{
key: key,
Dimensions: make(map[string]string),
Expand Down
25 changes: 0 additions & 25 deletions processor/servicegraphprocessor/internal/store/interface.go

This file was deleted.

31 changes: 21 additions & 10 deletions processor/servicegraphprocessor/internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,29 @@ import (
"errors"
"sync"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
)

var (
ErrTooManyItems = errors.New("too many items")
)

var _ Store = (*store)(nil)
type Callback func(e *Edge)

type Key struct {
tid pcommon.TraceID
sid pcommon.SpanID
}

func NewKey(tid pcommon.TraceID, sid pcommon.SpanID) Key {
return Key{tid: tid, sid: sid}
}

type store struct {
type Store struct {
l *list.List
mtx sync.Mutex
m map[string]*list.Element
m map[Key]*list.Element

onComplete Callback
onExpire Callback
Expand All @@ -42,10 +53,10 @@ type store struct {
// NewStore creates a Store to build service graphs. The store caches edges, each representing a
// request between two services. Once an edge is complete its metrics can be collected. Edges that
// have not found their pair are deleted after ttl time.
func NewStore(ttl time.Duration, maxItems int, onComplete, onExpire Callback) Store {
s := &store{
func NewStore(ttl time.Duration, maxItems int, onComplete, onExpire Callback) *Store {
s := &Store{
l: list.New(),
m: make(map[string]*list.Element),
m: make(map[Key]*list.Element),

onComplete: onComplete,
onExpire: onExpire,
Expand All @@ -58,14 +69,14 @@ func NewStore(ttl time.Duration, maxItems int, onComplete, onExpire Callback) St
}

// len is only used for testing.
func (s *store) len() int {
func (s *Store) len() int {
return s.l.Len()
}

// UpsertEdge fetches an Edge from the store and updates it using the given callback. If the Edge
// doesn't exist yet, it creates a new one with the default TTL.
// If the Edge is complete after applying the callback, it's completed and removed.
func (s *store) UpsertEdge(key string, update Callback) (isNew bool, err error) {
func (s *Store) UpsertEdge(key Key, update Callback) (isNew bool, err error) {
s.mtx.Lock()
defer s.mtx.Unlock()

Expand Down Expand Up @@ -103,7 +114,7 @@ func (s *store) UpsertEdge(key string, update Callback) (isNew bool, err error)
}

// Expire evicts all expired items in the store.
func (s *store) Expire() {
func (s *Store) Expire() {
s.mtx.Lock()
defer s.mtx.Unlock()

Expand All @@ -116,7 +127,7 @@ func (s *store) Expire() {
// Returns true if the head was evicted.
//
// Must be called holding lock.
func (s *store) tryEvictHead() bool {
func (s *Store) tryEvictHead() bool {
head := s.l.Front()
if head == nil {
return false // list is empty
Expand Down
47 changes: 20 additions & 27 deletions processor/servicegraphprocessor/internal/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,29 @@
package store // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/servicegraphprocessor/internal/store"

import (
"fmt"
"encoding/hex"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
)

const clientService = "client"

func TestStoreUpsertEdge(t *testing.T) {
const keyStr = "key"
key := NewKey(pcommon.TraceID([16]byte{1, 2, 3}), pcommon.SpanID([8]byte{1, 2, 3}))

var onCompletedCount int
var onExpireCount int

storeInterface := NewStore(time.Hour, 1, countingCallback(&onCompletedCount), countingCallback(&onExpireCount))

s := storeInterface.(*store)
s := NewStore(time.Hour, 1, countingCallback(&onCompletedCount), countingCallback(&onExpireCount))
assert.Equal(t, 0, s.len())

// Insert first half of an edge
isNew, err := s.UpsertEdge(keyStr, func(e *Edge) {
isNew, err := s.UpsertEdge(key, func(e *Edge) {
e.ClientService = clientService
})
require.NoError(t, err)
Expand All @@ -51,7 +50,7 @@ func TestStoreUpsertEdge(t *testing.T) {
assert.Equal(t, 0, onExpireCount)

// Insert the second half of an edge
isNew, err = s.UpsertEdge(keyStr, func(e *Edge) {
isNew, err = s.UpsertEdge(key, func(e *Edge) {
assert.Equal(t, clientService, e.ClientService)
e.ServerService = "server"
})
Expand All @@ -64,7 +63,7 @@ func TestStoreUpsertEdge(t *testing.T) {
assert.Equal(t, 0, onExpireCount)

// Insert an edge that will immediately expire
isNew, err = s.UpsertEdge(keyStr, func(e *Edge) {
isNew, err = s.UpsertEdge(key, func(e *Edge) {
e.ClientService = clientService
e.expiration = time.UnixMicro(0)
})
Expand All @@ -81,27 +80,27 @@ func TestStoreUpsertEdge(t *testing.T) {
}

func TestStoreUpsertEdge_errTooManyItems(t *testing.T) {
key1 := NewKey(pcommon.TraceID([16]byte{1, 2, 3}), pcommon.SpanID([8]byte{1, 2, 3}))
key2 := NewKey(pcommon.TraceID([16]byte{4, 5, 6}), pcommon.SpanID([8]byte{1, 2, 3}))
var onCallbackCounter int

storeInterface := NewStore(time.Hour, 1, countingCallback(&onCallbackCounter), countingCallback(&onCallbackCounter))

s := storeInterface.(*store)
s := NewStore(time.Hour, 1, countingCallback(&onCallbackCounter), countingCallback(&onCallbackCounter))
assert.Equal(t, 0, s.len())

isNew, err := s.UpsertEdge("key-1", func(e *Edge) {
isNew, err := s.UpsertEdge(key1, func(e *Edge) {
e.ClientService = clientService
})
require.NoError(t, err)
require.Equal(t, true, isNew)
assert.Equal(t, 1, s.len())

_, err = s.UpsertEdge("key-2", func(e *Edge) {
_, err = s.UpsertEdge(key2, func(e *Edge) {
e.ClientService = clientService
})
require.ErrorIs(t, err, ErrTooManyItems)
assert.Equal(t, 1, s.len())

isNew, err = s.UpsertEdge("key-1", func(e *Edge) {
isNew, err = s.UpsertEdge(key1, func(e *Edge) {
e.ClientService = clientService
})
require.NoError(t, err)
Expand All @@ -114,9 +113,9 @@ func TestStoreUpsertEdge_errTooManyItems(t *testing.T) {
func TestStoreExpire(t *testing.T) {
const testSize = 100

keys := map[string]bool{}
keys := map[Key]struct{}{}
for i := 0; i < testSize; i++ {
keys[fmt.Sprintf("key-%d", i)] = true
keys[NewKey(pcommon.TraceID([16]byte{byte(i)}), pcommon.SpanID([8]byte{1, 2, 3}))] = struct{}{}
}

var onCompletedCount int
Expand All @@ -127,8 +126,7 @@ func TestStoreExpire(t *testing.T) {
assert.Contains(t, keys, e.key)
}
// New edges are immediately expired
storeInterface := NewStore(-time.Second, testSize, onComplete, countingCallback(&onExpireCount))
s := storeInterface.(*store)
s := NewStore(-time.Second, testSize, onComplete, countingCallback(&onExpireCount))

for key := range keys {
isNew, err := s.UpsertEdge(key, noopCallback)
Expand All @@ -142,7 +140,7 @@ func TestStoreExpire(t *testing.T) {
assert.Equal(t, testSize, onExpireCount)
}

func TestStore_concurrency(t *testing.T) {
func TestStoreConcurrency(t *testing.T) {
s := NewStore(10*time.Millisecond, 100000, noopCallback, noopCallback)

end := make(chan struct{})
Expand All @@ -158,16 +156,11 @@ func TestStore_concurrency(t *testing.T) {
}
}

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

go accessor(func() {
key := make([]rune, 6)
for i := range key {
key[i] = letters[rand.Intn(len(letters))]
}
key := NewKey(pcommon.TraceID([16]byte{byte(rand.Intn(32))}), pcommon.SpanID([8]byte{1, 2, 3}))

_, err := s.UpsertEdge(string(key), func(e *Edge) {
e.ClientService = string(key)
_, err := s.UpsertEdge(key, func(e *Edge) {
e.ClientService = hex.EncodeToString(key.tid[:])
})
assert.NoError(t, err)
})
Expand Down
14 changes: 3 additions & 11 deletions processor/servicegraphprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type processor struct {
nextConsumer consumer.Traces
metricsExporter consumer.Metrics

store store.Store
store *store.Store

startTime time.Time

Expand Down Expand Up @@ -196,7 +196,7 @@ func (p *processor) aggregateMetrics(ctx context.Context, td ptrace.Traces) (err
fallthrough
case ptrace.SpanKindClient:
traceID := span.TraceID()
key := buildEdgeKey(traceID.HexString(), span.SpanID().HexString())
key := store.NewKey(traceID, span.SpanID())
isNew, err = p.store.UpsertEdge(key, func(e *store.Edge) {
e.TraceID = traceID
e.ConnectionType = connectionType
Expand All @@ -219,7 +219,7 @@ func (p *processor) aggregateMetrics(ctx context.Context, td ptrace.Traces) (err
fallthrough
case ptrace.SpanKindServer:
traceID := span.TraceID()
key := buildEdgeKey(traceID.HexString(), span.ParentSpanID().HexString())
key := store.NewKey(traceID, span.ParentSpanID())
isNew, err = p.store.UpsertEdge(key, func(e *store.Edge) {
e.TraceID = traceID
e.ConnectionType = connectionType
Expand Down Expand Up @@ -491,14 +491,6 @@ func (p *processor) cleanCache() {
}
}

func buildEdgeKey(k1, k2 string) string {
var b strings.Builder
b.WriteString(k1)
b.WriteString("-")
b.WriteString(k2)
return b.String()
}

// durationToMillis converts the given duration to the number of milliseconds it represents.
// Note that this can return sub-millisecond (i.e. < 1ms) values as well.
func durationToMillis(d time.Duration) float64 {
Expand Down

0 comments on commit 905f5c9

Please sign in to comment.