diff --git a/processor/servicegraphprocessor/internal/store/edge.go b/processor/servicegraphprocessor/internal/store/edge.go index 06fba27f85b5..584e566d3343 100644 --- a/processor/servicegraphprocessor/internal/store/edge.go +++ b/processor/servicegraphprocessor/internal/store/edge.go @@ -30,7 +30,7 @@ const ( // Edge is an Edge between two nodes in the graph type Edge struct { - key string + key Key TraceID pcommon.TraceID ConnectionType ConnectionType @@ -48,7 +48,7 @@ type Edge struct { expiration time.Time } -func newEdge(key string, ttl time.Duration) *Edge { +func newEdge(key Key, ttl time.Duration) *Edge { return &Edge{ key: key, Dimensions: make(map[string]string), diff --git a/processor/servicegraphprocessor/internal/store/interface.go b/processor/servicegraphprocessor/internal/store/interface.go deleted file mode 100644 index e493d64035bf..000000000000 --- a/processor/servicegraphprocessor/internal/store/interface.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package store // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/servicegraphprocessor/internal/store" - -type Callback func(e *Edge) - -// Store is an interface for building service graphs. -type Store interface { - // UpsertEdge inserts or updates an edge. - UpsertEdge(key string, update Callback) (isNew bool, err error) - // Expire evicts expired edges from the store. - Expire() -} diff --git a/processor/servicegraphprocessor/internal/store/store.go b/processor/servicegraphprocessor/internal/store/store.go index 12280fea659b..84303a81a983 100644 --- a/processor/servicegraphprocessor/internal/store/store.go +++ b/processor/servicegraphprocessor/internal/store/store.go @@ -19,18 +19,29 @@ import ( "errors" "sync" "time" + + "go.opentelemetry.io/collector/pdata/pcommon" ) var ( ErrTooManyItems = errors.New("too many items") ) -var _ Store = (*store)(nil) +type Callback func(e *Edge) + +type Key struct { + tid pcommon.TraceID + sid pcommon.SpanID +} + +func NewKey(tid pcommon.TraceID, sid pcommon.SpanID) Key { + return Key{tid: tid, sid: sid} +} -type store struct { +type Store struct { l *list.List mtx sync.Mutex - m map[string]*list.Element + m map[Key]*list.Element onComplete Callback onExpire Callback @@ -42,10 +53,10 @@ type store struct { // NewStore creates a Store to build service graphs. The store caches edges, each representing a // request between two services. Once an edge is complete its metrics can be collected. Edges that // have not found their pair are deleted after ttl time. -func NewStore(ttl time.Duration, maxItems int, onComplete, onExpire Callback) Store { - s := &store{ +func NewStore(ttl time.Duration, maxItems int, onComplete, onExpire Callback) *Store { + s := &Store{ l: list.New(), - m: make(map[string]*list.Element), + m: make(map[Key]*list.Element), onComplete: onComplete, onExpire: onExpire, @@ -58,14 +69,14 @@ func NewStore(ttl time.Duration, maxItems int, onComplete, onExpire Callback) St } // len is only used for testing. -func (s *store) len() int { +func (s *Store) len() int { return s.l.Len() } // UpsertEdge fetches an Edge from the store and updates it using the given callback. If the Edge // doesn't exist yet, it creates a new one with the default TTL. // If the Edge is complete after applying the callback, it's completed and removed. -func (s *store) UpsertEdge(key string, update Callback) (isNew bool, err error) { +func (s *Store) UpsertEdge(key Key, update Callback) (isNew bool, err error) { s.mtx.Lock() defer s.mtx.Unlock() @@ -103,7 +114,7 @@ func (s *store) UpsertEdge(key string, update Callback) (isNew bool, err error) } // Expire evicts all expired items in the store. -func (s *store) Expire() { +func (s *Store) Expire() { s.mtx.Lock() defer s.mtx.Unlock() @@ -116,7 +127,7 @@ func (s *store) Expire() { // Returns true if the head was evicted. // // Must be called holding lock. -func (s *store) tryEvictHead() bool { +func (s *Store) tryEvictHead() bool { head := s.l.Front() if head == nil { return false // list is empty diff --git a/processor/servicegraphprocessor/internal/store/store_test.go b/processor/servicegraphprocessor/internal/store/store_test.go index eb15e07c965c..6e6306f7ca39 100644 --- a/processor/servicegraphprocessor/internal/store/store_test.go +++ b/processor/servicegraphprocessor/internal/store/store_test.go @@ -15,30 +15,29 @@ package store // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/servicegraphprocessor/internal/store" import ( - "fmt" + "encoding/hex" "math/rand" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" ) const clientService = "client" func TestStoreUpsertEdge(t *testing.T) { - const keyStr = "key" + key := NewKey(pcommon.TraceID([16]byte{1, 2, 3}), pcommon.SpanID([8]byte{1, 2, 3})) var onCompletedCount int var onExpireCount int - storeInterface := NewStore(time.Hour, 1, countingCallback(&onCompletedCount), countingCallback(&onExpireCount)) - - s := storeInterface.(*store) + s := NewStore(time.Hour, 1, countingCallback(&onCompletedCount), countingCallback(&onExpireCount)) assert.Equal(t, 0, s.len()) // Insert first half of an edge - isNew, err := s.UpsertEdge(keyStr, func(e *Edge) { + isNew, err := s.UpsertEdge(key, func(e *Edge) { e.ClientService = clientService }) require.NoError(t, err) @@ -51,7 +50,7 @@ func TestStoreUpsertEdge(t *testing.T) { assert.Equal(t, 0, onExpireCount) // Insert the second half of an edge - isNew, err = s.UpsertEdge(keyStr, func(e *Edge) { + isNew, err = s.UpsertEdge(key, func(e *Edge) { assert.Equal(t, clientService, e.ClientService) e.ServerService = "server" }) @@ -64,7 +63,7 @@ func TestStoreUpsertEdge(t *testing.T) { assert.Equal(t, 0, onExpireCount) // Insert an edge that will immediately expire - isNew, err = s.UpsertEdge(keyStr, func(e *Edge) { + isNew, err = s.UpsertEdge(key, func(e *Edge) { e.ClientService = clientService e.expiration = time.UnixMicro(0) }) @@ -81,27 +80,27 @@ func TestStoreUpsertEdge(t *testing.T) { } func TestStoreUpsertEdge_errTooManyItems(t *testing.T) { + key1 := NewKey(pcommon.TraceID([16]byte{1, 2, 3}), pcommon.SpanID([8]byte{1, 2, 3})) + key2 := NewKey(pcommon.TraceID([16]byte{4, 5, 6}), pcommon.SpanID([8]byte{1, 2, 3})) var onCallbackCounter int - storeInterface := NewStore(time.Hour, 1, countingCallback(&onCallbackCounter), countingCallback(&onCallbackCounter)) - - s := storeInterface.(*store) + s := NewStore(time.Hour, 1, countingCallback(&onCallbackCounter), countingCallback(&onCallbackCounter)) assert.Equal(t, 0, s.len()) - isNew, err := s.UpsertEdge("key-1", func(e *Edge) { + isNew, err := s.UpsertEdge(key1, func(e *Edge) { e.ClientService = clientService }) require.NoError(t, err) require.Equal(t, true, isNew) assert.Equal(t, 1, s.len()) - _, err = s.UpsertEdge("key-2", func(e *Edge) { + _, err = s.UpsertEdge(key2, func(e *Edge) { e.ClientService = clientService }) require.ErrorIs(t, err, ErrTooManyItems) assert.Equal(t, 1, s.len()) - isNew, err = s.UpsertEdge("key-1", func(e *Edge) { + isNew, err = s.UpsertEdge(key1, func(e *Edge) { e.ClientService = clientService }) require.NoError(t, err) @@ -114,9 +113,9 @@ func TestStoreUpsertEdge_errTooManyItems(t *testing.T) { func TestStoreExpire(t *testing.T) { const testSize = 100 - keys := map[string]bool{} + keys := map[Key]struct{}{} for i := 0; i < testSize; i++ { - keys[fmt.Sprintf("key-%d", i)] = true + keys[NewKey(pcommon.TraceID([16]byte{byte(i)}), pcommon.SpanID([8]byte{1, 2, 3}))] = struct{}{} } var onCompletedCount int @@ -127,8 +126,7 @@ func TestStoreExpire(t *testing.T) { assert.Contains(t, keys, e.key) } // New edges are immediately expired - storeInterface := NewStore(-time.Second, testSize, onComplete, countingCallback(&onExpireCount)) - s := storeInterface.(*store) + s := NewStore(-time.Second, testSize, onComplete, countingCallback(&onExpireCount)) for key := range keys { isNew, err := s.UpsertEdge(key, noopCallback) @@ -142,7 +140,7 @@ func TestStoreExpire(t *testing.T) { assert.Equal(t, testSize, onExpireCount) } -func TestStore_concurrency(t *testing.T) { +func TestStoreConcurrency(t *testing.T) { s := NewStore(10*time.Millisecond, 100000, noopCallback, noopCallback) end := make(chan struct{}) @@ -158,16 +156,11 @@ func TestStore_concurrency(t *testing.T) { } } - var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") - go accessor(func() { - key := make([]rune, 6) - for i := range key { - key[i] = letters[rand.Intn(len(letters))] - } + key := NewKey(pcommon.TraceID([16]byte{byte(rand.Intn(32))}), pcommon.SpanID([8]byte{1, 2, 3})) - _, err := s.UpsertEdge(string(key), func(e *Edge) { - e.ClientService = string(key) + _, err := s.UpsertEdge(key, func(e *Edge) { + e.ClientService = hex.EncodeToString(key.tid[:]) }) assert.NoError(t, err) }) diff --git a/processor/servicegraphprocessor/processor.go b/processor/servicegraphprocessor/processor.go index 385bd118d50e..7f361ae21fd9 100644 --- a/processor/servicegraphprocessor/processor.go +++ b/processor/servicegraphprocessor/processor.go @@ -58,7 +58,7 @@ type processor struct { nextConsumer consumer.Traces metricsExporter consumer.Metrics - store store.Store + store *store.Store startTime time.Time @@ -196,7 +196,7 @@ func (p *processor) aggregateMetrics(ctx context.Context, td ptrace.Traces) (err fallthrough case ptrace.SpanKindClient: traceID := span.TraceID() - key := buildEdgeKey(traceID.HexString(), span.SpanID().HexString()) + key := store.NewKey(traceID, span.SpanID()) isNew, err = p.store.UpsertEdge(key, func(e *store.Edge) { e.TraceID = traceID e.ConnectionType = connectionType @@ -219,7 +219,7 @@ func (p *processor) aggregateMetrics(ctx context.Context, td ptrace.Traces) (err fallthrough case ptrace.SpanKindServer: traceID := span.TraceID() - key := buildEdgeKey(traceID.HexString(), span.ParentSpanID().HexString()) + key := store.NewKey(traceID, span.ParentSpanID()) isNew, err = p.store.UpsertEdge(key, func(e *store.Edge) { e.TraceID = traceID e.ConnectionType = connectionType @@ -491,14 +491,6 @@ func (p *processor) cleanCache() { } } -func buildEdgeKey(k1, k2 string) string { - var b strings.Builder - b.WriteString(k1) - b.WriteString("-") - b.WriteString(k2) - return b.String() -} - // durationToMillis converts the given duration to the number of milliseconds it represents. // Note that this can return sub-millisecond (i.e. < 1ms) values as well. func durationToMillis(d time.Duration) float64 {