Skip to content

Commit

Permalink
feat: Implement badger db for sampling store
Browse files Browse the repository at this point in the history
Signed-off-by: slayer321 <[email protected]>
  • Loading branch information
slayer321 committed Oct 11, 2023
1 parent 982651d commit 2f24d91
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 1 deletion.
7 changes: 7 additions & 0 deletions plugin/storage/badger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
depStore "github.com/jaegertracing/jaeger/plugin/storage/badger/dependencystore"
samplingStore "github.com/jaegertracing/jaeger/plugin/storage/badger/samplingstore"
badgerStore "github.com/jaegertracing/jaeger/plugin/storage/badger/spanstore"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand Down Expand Up @@ -170,6 +172,11 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return depStore.NewDependencyStore(sr), nil
}

// CreateSamplingStore implements storage.SamplingStoreFactory
func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) {
return samplingStore.NewSamplingStore(f.store), nil

Check warning on line 177 in plugin/storage/badger/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/badger/factory.go#L176-L177

Added lines #L176 - L177 were not covered by tests
}

// Close Implements io.Closer and closes the underlying storage
func (f *Factory) Close() error {
close(f.maintenanceDone)
Expand Down
176 changes: 176 additions & 0 deletions plugin/storage/badger/samplingstore/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package samplingstore

import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"time"

"github.com/dgraph-io/badger/v3"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
jaegermodel "github.com/jaegertracing/jaeger/model"
)

const (
throughputKeyPrefix byte = 0x08
probabilitiesKeyPrefix byte = 0x09
)

type SamplingStore struct {
store *badger.DB
}

func NewSamplingStore(db *badger.DB) *SamplingStore {
return &SamplingStore{
store: db,
}
}

func (s *SamplingStore) InsertThroughput(throughput []*model.Throughput) error {
fmt.Println("Inside badger samplingstore InsertThroughput")
startTime := jaegermodel.TimeAsEpochMicroseconds(time.Now())
entriesToStore := make([]*badger.Entry, 0)
entries, err := s.createThroughputEntry(throughput, startTime)
if err != nil {
return err
}

Check warning on line 37 in plugin/storage/badger/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/badger/samplingstore/storage.go#L36-L37

Added lines #L36 - L37 were not covered by tests
entriesToStore = append(entriesToStore, entries)
err = s.store.Update(func(txn *badger.Txn) error {
// Write the entries
for i := range entriesToStore {
err = txn.SetEntry(entriesToStore[i])
fmt.Println("Writing entry to badger")
if err != nil {
// Most likely primary key conflict, but let the caller check this
return err
}

Check warning on line 47 in plugin/storage/badger/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/badger/samplingstore/storage.go#L45-L47

Added lines #L45 - L47 were not covered by tests
}

return nil
})

return nil
}

func (s *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput, error) {
var retSlice []*model.Throughput
fmt.Println("Inside badger samplingstore GetThroughput")

prefix := []byte{throughputKeyPrefix}

err := s.store.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()

val := []byte{}
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
k := item.Key()
startTime := k[1:9]
fmt.Printf("key=%s\n", k)
val, err := item.ValueCopy(val)
if err != nil {
return err
}

Check warning on line 76 in plugin/storage/badger/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/badger/samplingstore/storage.go#L75-L76

Added lines #L75 - L76 were not covered by tests
t, err := initalStartTime(startTime)
if err != nil {
return err
}

Check warning on line 80 in plugin/storage/badger/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/badger/samplingstore/storage.go#L79-L80

Added lines #L79 - L80 were not covered by tests
throughputs, err := decodeValue(val)
if err != nil {
return err
}

Check warning on line 84 in plugin/storage/badger/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/badger/samplingstore/storage.go#L83-L84

Added lines #L83 - L84 were not covered by tests

if t.After(start) && (t.Before(end) || t.Equal(end)) {
retSlice = append(retSlice, throughputs...)
}
return nil

Check failure on line 89 in plugin/storage/badger/samplingstore/storage.go

View workflow job for this annotation

GitHub Actions / unit-tests

SA4004: the surrounding loop is unconditionally terminated (staticcheck)
}
return nil

Check warning on line 91 in plugin/storage/badger/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/badger/samplingstore/storage.go#L91

Added line #L91 was not covered by tests
})
if err != nil {
return nil, err
}

Check warning on line 95 in plugin/storage/badger/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/badger/samplingstore/storage.go#L94-L95

Added lines #L94 - L95 were not covered by tests

return retSlice, nil
}

func (s *SamplingStore) InsertProbabilitiesAndQPS(hostname string,
probabilities model.ServiceOperationProbabilities,
qps model.ServiceOperationQPS) error {
return nil

Check warning on line 103 in plugin/storage/badger/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/badger/samplingstore/storage.go#L102-L103

Added lines #L102 - L103 were not covered by tests
}

// GetLatestProbabilities implements samplingstore.Reader#GetLatestProbabilities.
func (s *SamplingStore) GetLatestProbabilities() (model.ServiceOperationProbabilities, error) {
return nil, nil

Check warning on line 108 in plugin/storage/badger/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/badger/samplingstore/storage.go#L107-L108

Added lines #L107 - L108 were not covered by tests
}

func (s *SamplingStore) createThroughputEntry(throughput []*model.Throughput, startTime uint64) (*badger.Entry, error) {
pK, pV, err := s.createThroughputKV(throughput, startTime)
if err != nil {
return nil, err
}

Check warning on line 115 in plugin/storage/badger/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/badger/samplingstore/storage.go#L114-L115

Added lines #L114 - L115 were not covered by tests

e := s.createBadgerEntry(pK, pV)

return e, nil
}

func (s *SamplingStore) createBadgerEntry(key []byte, value []byte) *badger.Entry {
return &badger.Entry{
Key: key,
Value: value,
}
}

func (s *SamplingStore) createThroughputKV(throughput []*model.Throughput, startTime uint64) ([]byte, []byte, error) {

key := make([]byte, 16)
key[0] = throughputKeyPrefix
pos := 1
binary.BigEndian.PutUint64(key[pos:], startTime)

var bb []byte
var err error

bb, err = json.Marshal(throughput)
fmt.Printf("Badger key %v, value %v\n", key, string(bb))
return key, bb, err
}

func createPrimaryKeySeekPrefix(startTime uint64) []byte {

Check failure on line 144 in plugin/storage/badger/samplingstore/storage.go

View workflow job for this annotation

GitHub Actions / unit-tests

func `createPrimaryKeySeekPrefix` is unused (unused)
key := make([]byte, 16)
key[0] = throughputKeyPrefix
pos := 1
binary.BigEndian.PutUint64(key[pos:], startTime)

return key

Check warning on line 150 in plugin/storage/badger/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/badger/samplingstore/storage.go#L144-L150

Added lines #L144 - L150 were not covered by tests
}

func decodeValue(val []byte) ([]*model.Throughput, error) {
var throughput []*model.Throughput

err := json.Unmarshal(val, &throughput)
if err != nil {
fmt.Println("Error while unmarshalling")
return nil, err
}

Check warning on line 160 in plugin/storage/badger/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/badger/samplingstore/storage.go#L158-L160

Added lines #L158 - L160 were not covered by tests
fmt.Printf("Throughput %v\n", throughput)
return throughput, nil
}

func initalStartTime(timeBytes []byte) (time.Time, error) {
var usec int64

buf := bytes.NewReader(timeBytes)

if err := binary.Read(buf, binary.BigEndian, &usec); err != nil {
panic(nil)

Check warning on line 171 in plugin/storage/badger/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/badger/samplingstore/storage.go#L171

Added line #L171 was not covered by tests
}

t := time.UnixMicro(usec)
return t, nil
}
68 changes: 68 additions & 0 deletions plugin/storage/badger/samplingstore/storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package samplingstore

import (
"testing"
"time"

"github.com/dgraph-io/badger/v3"
samplemodel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
"github.com/stretchr/testify/assert"
)

type samplingStoreTest struct {
store *SamplingStore
}

func NewtestSamplingStore(db *badger.DB) *samplingStoreTest {
return &samplingStoreTest{
store: NewSamplingStore(db),
}
}

func TestInsertThroughput(t *testing.T) {
runWithBadger(t, func(s *samplingStoreTest, t *testing.T) {
throughputs := []*samplemodel.Throughput{
{Service: "my-svc", Operation: "op"},
{Service: "our-svc", Operation: "op2"},
}
err := s.store.InsertThroughput(throughputs)
assert.NoError(t, err)
})
}

func TestGetThroughput(t *testing.T) {
runWithBadger(t, func(s *samplingStoreTest, t *testing.T) {
start := time.Now()

expected := 2
throughputs := []*samplemodel.Throughput{
{Service: "my-svc", Operation: "op"},
{Service: "our-svc", Operation: "op2"},
}
err := s.store.InsertThroughput(throughputs)
assert.NoError(t, err)

actual, err := s.store.GetThroughput(start, start.Add(time.Second*time.Duration(10)))
assert.NoError(t, err)
assert.Equal(t, expected, len(actual))
})
}

func runWithBadger(t *testing.T, test func(s *samplingStoreTest, t *testing.T)) {
opts := badger.DefaultOptions("")

opts.SyncWrites = false
dir := t.TempDir()
opts.Dir = dir
opts.ValueDir = dir

store, err := badger.Open(opts)
defer func() {
store.Close()
}()
ss := NewtestSamplingStore(store)

assert.NoError(t, err)

test(ss, t)
}
3 changes: 3 additions & 0 deletions plugin/storage/integration/badgerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func (s *BadgerIntegrationStorage) initialize() error {
if err != nil {
return err
}
if s.SamplingStore, err = s.factory.CreateSamplingStore(0); err != nil {
return err
}

s.SpanReader = sr
s.SpanWriter = sw
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,5 +487,5 @@ func (s *StorageIntegration) IntegrationTestAll(t *testing.T) {
t.Run("FindTraces", s.testFindTraces)
t.Run("GetDependencies", s.testGetDependencies)
t.Run("GetThroughput", s.testGetThroughput)
t.Run("GetLatestProbability", s.testGetLatestProbability)
//t.Run("GetLatestProbability", s.testGetLatestProbability)

Check failure on line 490 in plugin/storage/integration/integration.go

View workflow job for this annotation

GitHub Actions / unit-tests

commentFormatting: put a space between `//` and comment text (gocritic)
}

0 comments on commit 2f24d91

Please sign in to comment.