Skip to content

Commit

Permalink
[CONTINT-3920][fakeintake] Make the store more generic (#24063)
Browse files Browse the repository at this point in the history
* Replace in memory store by an interface

* Make sure we close the store in tests and in the server at the right place

* rename as get internal metrics
  • Loading branch information
AliDatadog authored and misteriaud committed Apr 2, 2024
1 parent ce63682 commit b637db3
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 117 deletions.
4 changes: 4 additions & 0 deletions test/fakeintake/client/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
func TestIntegrationClient(t *testing.T) {
t.Run("should get empty payloads from a server", func(t *testing.T) {
fi, _ := server.InitialiseForTests(t)
defer fi.Stop()

client := NewClient(fi.URL())
stats, err := client.RouteStats()
Expand All @@ -33,6 +34,7 @@ func TestIntegrationClient(t *testing.T) {

t.Run("should get all available payloads from a server on a given endpoint", func(t *testing.T) {
fi, _ := server.InitialiseForTests(t)
defer fi.Stop()

// post a test payloads to fakeintake
testEndpoint := "/foo/bar"
Expand All @@ -53,6 +55,7 @@ func TestIntegrationClient(t *testing.T) {

t.Run("should flush payloads from a server on flush request", func(t *testing.T) {
fi, _ := server.InitialiseForTests(t)
defer fi.Stop()

// post a test payloads to fakeintake
resp, err := http.Post(fmt.Sprintf("%s%s", fi.URL(), "/foo/bar"), "text/plain", strings.NewReader("totoro|5|tag:before,owner:pducolin"))
Expand Down Expand Up @@ -90,6 +93,7 @@ func TestIntegrationClient(t *testing.T) {

t.Run("should receive overridden response when configured on server", func(t *testing.T) {
fi, _ := server.InitialiseForTests(t)
defer fi.Stop()

client := NewClient(fi.URL())
err := client.ConfigureOverride(api.ResponseOverride{
Expand Down
24 changes: 17 additions & 7 deletions test/fakeintake/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Server struct {
urlMutex sync.RWMutex
url string

store *serverstore.Store
store serverstore.Store

responseOverridesMutex sync.RWMutex
responseOverridesByMethod map[string]map[string]httpResponse
Expand All @@ -87,11 +87,15 @@ func NewServer(options ...func(*Server)) *Server {

registry := prometheus.NewRegistry()

storeMetrics := fi.store.GetInternalMetrics()
registry.MustRegister(
collectors.NewBuildInfoCollector(),
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
fi.store.NbPayloads,
append(
[]prometheus.Collector{
collectors.NewBuildInfoCollector(),
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
},
storeMetrics...)...,
)

mux := http.NewServeMux()
Expand Down Expand Up @@ -217,6 +221,7 @@ func (fi *Server) Stop() error {
return fmt.Errorf("server not running")
}
defer close(fi.shutdown)
defer fi.store.Close()
err := fi.server.Shutdown(context.Background())
if err != nil {
return err
Expand Down Expand Up @@ -317,7 +322,12 @@ func (fi *Server) handleDatadogPostRequest(w http.ResponseWriter, req *http.Requ
encoding = req.Header.Get("Content-Type")
}

fi.store.AppendPayload(req.URL.Path, payload, encoding, fi.clock.Now().UTC())
err = fi.store.AppendPayload(req.URL.Path, payload, encoding, fi.clock.Now().UTC())
if err != nil {
response := buildErrorResponse(err)
writeHTTPResponse(w, response)
return nil
}

if response, ok := fi.getResponseFromURLPath(http.MethodPost, req.URL.Path); ok {
writeHTTPResponse(w, response)
Expand Down Expand Up @@ -361,7 +371,7 @@ func (fi *Server) handleGetPayloads(w http.ResponseWriter, req *http.Request) {
}
jsonResp, err = json.Marshal(resp)
} else if serverstore.IsRouteHandled(route) {
payloads, payloadErr := fi.store.GetJSONPayloads(route)
payloads, payloadErr := serverstore.GetJSONPayloads(fi.store, route)
if payloadErr != nil {
writeHTTPResponse(w, buildErrorResponse(payloadErr))
return
Expand Down
106 changes: 106 additions & 0 deletions test/fakeintake/server/serverstore/in_memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

package serverstore

import (
"log"
"sync"
"time"

"github.com/DataDog/datadog-agent/test/fakeintake/api"

"github.com/prometheus/client_golang/prometheus"
)

// inMemoryStore implements a thread-safe storage for raw and json dumped payloads
type inMemoryStore struct {
mutex sync.RWMutex

rawPayloads map[string][]api.Payload

// NbPayloads is a prometheus metric to track the number of payloads collected by route
NbPayloads *prometheus.GaugeVec
}

// newInMemoryStore initialise a new payloads store
func newInMemoryStore() *inMemoryStore {
return &inMemoryStore{
mutex: sync.RWMutex{},
rawPayloads: map[string][]api.Payload{},
NbPayloads: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "payloads",
Help: "Number of payloads collected by route",
}, []string{"route"}),
}
}

// AppendPayload adds a payload to the store and tries parsing and adding a dumped json to the parsed store
func (s *inMemoryStore) AppendPayload(route string, data []byte, encoding string, collectTime time.Time) error {
s.mutex.Lock()
defer s.mutex.Unlock()
rawPayload := api.Payload{
Timestamp: collectTime,
Data: data,
Encoding: encoding,
}
s.rawPayloads[route] = append(s.rawPayloads[route], rawPayload)
s.NbPayloads.WithLabelValues(route).Set(float64(len(s.rawPayloads[route])))
return nil
}

// CleanUpPayloadsOlderThan removes payloads older than time
func (s *inMemoryStore) CleanUpPayloadsOlderThan(time time.Time) {
s.mutex.Lock()
defer s.mutex.Unlock()
log.Printf("Cleaning up payloads")
// clean up raw payloads
for route, payloads := range s.rawPayloads {
lastInvalidPayloadIndex := -1
for i, payload := range payloads {
if payload.Timestamp.Before(time) {
lastInvalidPayloadIndex = i
}
}
s.rawPayloads[route] = s.rawPayloads[route][lastInvalidPayloadIndex+1:]
s.NbPayloads.WithLabelValues(route).Set(float64(len(s.rawPayloads[route])))
}
}

// GetRawPayloads returns payloads collected for route `route`
func (s *inMemoryStore) GetRawPayloads(route string) (payloads []api.Payload) {
s.mutex.RLock()
defer s.mutex.RUnlock()
payloads = make([]api.Payload, len(s.rawPayloads[route]))
copy(payloads, s.rawPayloads[route])
return payloads
}

// GetRouteStats returns stats on collectedraw payloads by route
func (s *inMemoryStore) GetRouteStats() map[string]int {
statsByRoute := map[string]int{}
s.mutex.RLock()
defer s.mutex.RUnlock()
for route, payloads := range s.rawPayloads {
statsByRoute[route] = len(payloads)
}
return statsByRoute
}

// Flush cleans up any stored payload
func (s *inMemoryStore) Flush() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.rawPayloads = map[string][]api.Payload{}
s.NbPayloads.Reset()
}

// GetInternalMetrics returns the prometheus metrics for the store
func (s *inMemoryStore) GetInternalMetrics() []prometheus.Collector {
return []prometheus.Collector{s.NbPayloads}
}

// Close is a noop
func (s *inMemoryStore) Close() {}
20 changes: 20 additions & 0 deletions test/fakeintake/server/serverstore/in_memory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

package serverstore

import (
"testing"

"github.com/stretchr/testify/suite"
)

func TestInMemoryStore(t *testing.T) {
suite.Run(t, &StoreTestSuite{
StoreConstructor: func() Store {
return newInMemoryStore()
},
})
}
149 changes: 39 additions & 110 deletions test/fakeintake/server/serverstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,131 +8,60 @@
package serverstore

import (
"errors"
"fmt"
"log"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/DataDog/datadog-agent/test/fakeintake/api"
)

// Store implements a thread-safe storage for raw and json dumped payloads
type Store struct {
mutex sync.RWMutex

rawPayloads map[string][]api.Payload

NbPayloads *prometheus.GaugeVec
}

// NewStore initialise a new payloads store
func NewStore() *Store {
return &Store{
mutex: sync.RWMutex{},
rawPayloads: map[string][]api.Payload{},
NbPayloads: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "payloads",
Help: "Number of payloads collected by route",
}, []string{"route"}),
}
}

// AppendPayload adds a payload to the store and tries parsing and adding a dumped json to the parsed store
func (s *Store) AppendPayload(route string, data []byte, encoding string, collectTime time.Time) {
s.mutex.Lock()
defer s.mutex.Unlock()
rawPayload := api.Payload{
Timestamp: collectTime,
Data: data,
Encoding: encoding,
}
s.rawPayloads[route] = append(s.rawPayloads[route], rawPayload)
s.NbPayloads.WithLabelValues(route).Set(float64(len(s.rawPayloads[route])))
}

// CleanUpPayloadsOlderThan removes payloads older than time
func (s *Store) CleanUpPayloadsOlderThan(time time.Time) {
s.mutex.Lock()
defer s.mutex.Unlock()
log.Printf("Cleaning up payloads")
// clean up raw payloads
for route, payloads := range s.rawPayloads {
lastInvalidPayloadIndex := -1
for i, payload := range payloads {
if payload.Timestamp.Before(time) {
lastInvalidPayloadIndex = i
}
}
s.rawPayloads[route] = s.rawPayloads[route][lastInvalidPayloadIndex+1:]
s.NbPayloads.WithLabelValues(route).Set(float64(len(s.rawPayloads[route])))
}
// Store is the interface for a store that can store payloads and try parsing them
type Store interface {
// AppendPayload adds a payload to the store and tries parsing and adding a dumped json to the parsed store
AppendPayload(route string, data []byte, encoding string, collectTime time.Time) error
// CleanUpPayloadsOlderThan removes payloads older than the given time
CleanUpPayloadsOlderThan(time.Time)
// GetRawPayloads returns all raw payloads for a given route
GetRawPayloads(route string) []api.Payload
// GetRouteStats returns the number of payloads for each route
GetRouteStats() map[string]int
// Flush flushes the store
Flush()
// GetInternalMetrics returns the prometheus metrics for the store
GetInternalMetrics() []prometheus.Collector
// Close closes the store
Close()
}

// GetRawPayloads returns payloads collected for route `route`
func (s *Store) GetRawPayloads(route string) (payloads []api.Payload) {
s.mutex.RLock()
defer s.mutex.RUnlock()
payloads = make([]api.Payload, len(s.rawPayloads[route]))
copy(payloads, s.rawPayloads[route])
return payloads
// NewStore returns a new store
func NewStore() Store {
return newInMemoryStore()
}

// GetJSONPayloads returns payloads collected and parsed to json for route `route`
func (s *Store) GetJSONPayloads(route string) (payloads []api.ParsedPayload, err error) {
if _, found := parserMap[route]; !found {
// Short path to returns directly if no parser is registered for the given route.
// No need to acquire the lock in that case.
return nil, fmt.Errorf("Json payload not supported for this route")
// GetJSONPayloads returns the parsed payloads for a given route
func GetJSONPayloads(store Store, route string) ([]api.ParsedPayload, error) {
parser, ok := parserMap[route]
if !ok {
return nil, fmt.Errorf("no parser for route %s", route)
}
s.mutex.RLock()
defer s.mutex.RUnlock()

payloads = make([]api.ParsedPayload, 0, len(s.rawPayloads[route]))
for _, raw := range s.rawPayloads[route] {
if jsonPayload, err := s.encodeToJSONRawPayload(raw, route); err == nil && jsonPayload != nil {
payloads = append(payloads, *jsonPayload)
}
}
return payloads, nil
}

// GetRouteStats returns stats on collectedraw payloads by route
func (s *Store) GetRouteStats() map[string]int {
statsByRoute := map[string]int{}
s.mutex.RLock()
defer s.mutex.RUnlock()
for route, payloads := range s.rawPayloads {
statsByRoute[route] = len(payloads)
}
return statsByRoute
}

// Flush cleans up any stored payload
func (s *Store) Flush() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.rawPayloads = map[string][]api.Payload{}
s.NbPayloads.Reset()
}

// encodeToJSONRawPayload used to decode a raw Payload into a Json Payload
// to know how to parse the raw payload that could be JSON or Protobuf, the function
// need to know the route.
func (s *Store) encodeToJSONRawPayload(rawPayload api.Payload, route string) (*api.ParsedPayload, error) {
if parsePayload, ok := parserMap[route]; ok {
var err error
data, err := parsePayload(rawPayload)
payloads := store.GetRawPayloads(route)
parsedPayloads := make([]api.ParsedPayload, 0, len(payloads))
var errs []error
for _, payload := range payloads {
parsedPayload, err := parser(payload)
if err != nil {
return nil, err
}
parsedPayload := &api.ParsedPayload{
Timestamp: rawPayload.Timestamp,
Data: data,
Encoding: rawPayload.Encoding,
log.Printf("failed to parse payload %+v: %v\n", payload, err)
errs = append(errs, err)
continue
}
return parsedPayload, nil
parsedPayloads = append(parsedPayloads, api.ParsedPayload{
Timestamp: payload.Timestamp,
Data: parsedPayload,
Encoding: payload.Encoding,
})
}
return nil, nil
return parsedPayloads, errors.Join(errs...)
}
Loading

0 comments on commit b637db3

Please sign in to comment.