diff --git a/test/fakeintake/client/client_integration_test.go b/test/fakeintake/client/client_integration_test.go index e35be678618b5..5c97109aa3589 100644 --- a/test/fakeintake/client/client_integration_test.go +++ b/test/fakeintake/client/client_integration_test.go @@ -24,6 +24,7 @@ import ( func TestIntegrationClient(t *testing.T) { t.Run("should get empty payloads from a server", func(t *testing.T) { fi, _ := server.InitialiseForTests(t) + defer fi.Stop() client := NewClient(fi.URL()) stats, err := client.RouteStats() @@ -33,6 +34,7 @@ func TestIntegrationClient(t *testing.T) { t.Run("should get all available payloads from a server on a given endpoint", func(t *testing.T) { fi, _ := server.InitialiseForTests(t) + defer fi.Stop() // post a test payloads to fakeintake testEndpoint := "/foo/bar" @@ -53,6 +55,7 @@ func TestIntegrationClient(t *testing.T) { t.Run("should flush payloads from a server on flush request", func(t *testing.T) { fi, _ := server.InitialiseForTests(t) + defer fi.Stop() // post a test payloads to fakeintake resp, err := http.Post(fmt.Sprintf("%s%s", fi.URL(), "/foo/bar"), "text/plain", strings.NewReader("totoro|5|tag:before,owner:pducolin")) @@ -90,6 +93,7 @@ func TestIntegrationClient(t *testing.T) { t.Run("should receive overridden response when configured on server", func(t *testing.T) { fi, _ := server.InitialiseForTests(t) + defer fi.Stop() client := NewClient(fi.URL()) err := client.ConfigureOverride(api.ResponseOverride{ diff --git a/test/fakeintake/server/server.go b/test/fakeintake/server/server.go index 7b3ecd2cdabbd..0d99161cf1ab4 100644 --- a/test/fakeintake/server/server.go +++ b/test/fakeintake/server/server.go @@ -65,7 +65,7 @@ type Server struct { urlMutex sync.RWMutex url string - store *serverstore.Store + store serverstore.Store responseOverridesMutex sync.RWMutex responseOverridesByMethod map[string]map[string]httpResponse @@ -87,11 +87,15 @@ func NewServer(options ...func(*Server)) *Server { registry := prometheus.NewRegistry() + storeMetrics := fi.store.GetInternalMetrics() registry.MustRegister( - collectors.NewBuildInfoCollector(), - collectors.NewGoCollector(), - collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), - fi.store.NbPayloads, + append( + []prometheus.Collector{ + collectors.NewBuildInfoCollector(), + collectors.NewGoCollector(), + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), + }, + storeMetrics...)..., ) mux := http.NewServeMux() @@ -217,6 +221,7 @@ func (fi *Server) Stop() error { return fmt.Errorf("server not running") } defer close(fi.shutdown) + defer fi.store.Close() err := fi.server.Shutdown(context.Background()) if err != nil { return err @@ -317,7 +322,12 @@ func (fi *Server) handleDatadogPostRequest(w http.ResponseWriter, req *http.Requ encoding = req.Header.Get("Content-Type") } - fi.store.AppendPayload(req.URL.Path, payload, encoding, fi.clock.Now().UTC()) + err = fi.store.AppendPayload(req.URL.Path, payload, encoding, fi.clock.Now().UTC()) + if err != nil { + response := buildErrorResponse(err) + writeHTTPResponse(w, response) + return nil + } if response, ok := fi.getResponseFromURLPath(http.MethodPost, req.URL.Path); ok { writeHTTPResponse(w, response) @@ -361,7 +371,7 @@ func (fi *Server) handleGetPayloads(w http.ResponseWriter, req *http.Request) { } jsonResp, err = json.Marshal(resp) } else if serverstore.IsRouteHandled(route) { - payloads, payloadErr := fi.store.GetJSONPayloads(route) + payloads, payloadErr := serverstore.GetJSONPayloads(fi.store, route) if payloadErr != nil { writeHTTPResponse(w, buildErrorResponse(payloadErr)) return diff --git a/test/fakeintake/server/serverstore/in_memory.go b/test/fakeintake/server/serverstore/in_memory.go new file mode 100644 index 0000000000000..18a21be840f97 --- /dev/null +++ b/test/fakeintake/server/serverstore/in_memory.go @@ -0,0 +1,106 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +package serverstore + +import ( + "log" + "sync" + "time" + + "github.com/DataDog/datadog-agent/test/fakeintake/api" + + "github.com/prometheus/client_golang/prometheus" +) + +// inMemoryStore implements a thread-safe storage for raw and json dumped payloads +type inMemoryStore struct { + mutex sync.RWMutex + + rawPayloads map[string][]api.Payload + + // NbPayloads is a prometheus metric to track the number of payloads collected by route + NbPayloads *prometheus.GaugeVec +} + +// newInMemoryStore initialise a new payloads store +func newInMemoryStore() *inMemoryStore { + return &inMemoryStore{ + mutex: sync.RWMutex{}, + rawPayloads: map[string][]api.Payload{}, + NbPayloads: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "payloads", + Help: "Number of payloads collected by route", + }, []string{"route"}), + } +} + +// AppendPayload adds a payload to the store and tries parsing and adding a dumped json to the parsed store +func (s *inMemoryStore) AppendPayload(route string, data []byte, encoding string, collectTime time.Time) error { + s.mutex.Lock() + defer s.mutex.Unlock() + rawPayload := api.Payload{ + Timestamp: collectTime, + Data: data, + Encoding: encoding, + } + s.rawPayloads[route] = append(s.rawPayloads[route], rawPayload) + s.NbPayloads.WithLabelValues(route).Set(float64(len(s.rawPayloads[route]))) + return nil +} + +// CleanUpPayloadsOlderThan removes payloads older than time +func (s *inMemoryStore) CleanUpPayloadsOlderThan(time time.Time) { + s.mutex.Lock() + defer s.mutex.Unlock() + log.Printf("Cleaning up payloads") + // clean up raw payloads + for route, payloads := range s.rawPayloads { + lastInvalidPayloadIndex := -1 + for i, payload := range payloads { + if payload.Timestamp.Before(time) { + lastInvalidPayloadIndex = i + } + } + s.rawPayloads[route] = s.rawPayloads[route][lastInvalidPayloadIndex+1:] + s.NbPayloads.WithLabelValues(route).Set(float64(len(s.rawPayloads[route]))) + } +} + +// GetRawPayloads returns payloads collected for route `route` +func (s *inMemoryStore) GetRawPayloads(route string) (payloads []api.Payload) { + s.mutex.RLock() + defer s.mutex.RUnlock() + payloads = make([]api.Payload, len(s.rawPayloads[route])) + copy(payloads, s.rawPayloads[route]) + return payloads +} + +// GetRouteStats returns stats on collectedraw payloads by route +func (s *inMemoryStore) GetRouteStats() map[string]int { + statsByRoute := map[string]int{} + s.mutex.RLock() + defer s.mutex.RUnlock() + for route, payloads := range s.rawPayloads { + statsByRoute[route] = len(payloads) + } + return statsByRoute +} + +// Flush cleans up any stored payload +func (s *inMemoryStore) Flush() { + s.mutex.Lock() + defer s.mutex.Unlock() + s.rawPayloads = map[string][]api.Payload{} + s.NbPayloads.Reset() +} + +// GetInternalMetrics returns the prometheus metrics for the store +func (s *inMemoryStore) GetInternalMetrics() []prometheus.Collector { + return []prometheus.Collector{s.NbPayloads} +} + +// Close is a noop +func (s *inMemoryStore) Close() {} diff --git a/test/fakeintake/server/serverstore/in_memory_test.go b/test/fakeintake/server/serverstore/in_memory_test.go new file mode 100644 index 0000000000000..04299575209b0 --- /dev/null +++ b/test/fakeintake/server/serverstore/in_memory_test.go @@ -0,0 +1,20 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +package serverstore + +import ( + "testing" + + "github.com/stretchr/testify/suite" +) + +func TestInMemoryStore(t *testing.T) { + suite.Run(t, &StoreTestSuite{ + StoreConstructor: func() Store { + return newInMemoryStore() + }, + }) +} diff --git a/test/fakeintake/server/serverstore/store.go b/test/fakeintake/server/serverstore/store.go index 8fe60ceea86e7..01f5b9fdb08dd 100644 --- a/test/fakeintake/server/serverstore/store.go +++ b/test/fakeintake/server/serverstore/store.go @@ -8,9 +8,9 @@ package serverstore import ( + "errors" "fmt" "log" - "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -18,121 +18,50 @@ import ( "github.com/DataDog/datadog-agent/test/fakeintake/api" ) -// Store implements a thread-safe storage for raw and json dumped payloads -type Store struct { - mutex sync.RWMutex - - rawPayloads map[string][]api.Payload - - NbPayloads *prometheus.GaugeVec -} - -// NewStore initialise a new payloads store -func NewStore() *Store { - return &Store{ - mutex: sync.RWMutex{}, - rawPayloads: map[string][]api.Payload{}, - NbPayloads: prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "payloads", - Help: "Number of payloads collected by route", - }, []string{"route"}), - } -} - -// AppendPayload adds a payload to the store and tries parsing and adding a dumped json to the parsed store -func (s *Store) AppendPayload(route string, data []byte, encoding string, collectTime time.Time) { - s.mutex.Lock() - defer s.mutex.Unlock() - rawPayload := api.Payload{ - Timestamp: collectTime, - Data: data, - Encoding: encoding, - } - s.rawPayloads[route] = append(s.rawPayloads[route], rawPayload) - s.NbPayloads.WithLabelValues(route).Set(float64(len(s.rawPayloads[route]))) -} - -// CleanUpPayloadsOlderThan removes payloads older than time -func (s *Store) CleanUpPayloadsOlderThan(time time.Time) { - s.mutex.Lock() - defer s.mutex.Unlock() - log.Printf("Cleaning up payloads") - // clean up raw payloads - for route, payloads := range s.rawPayloads { - lastInvalidPayloadIndex := -1 - for i, payload := range payloads { - if payload.Timestamp.Before(time) { - lastInvalidPayloadIndex = i - } - } - s.rawPayloads[route] = s.rawPayloads[route][lastInvalidPayloadIndex+1:] - s.NbPayloads.WithLabelValues(route).Set(float64(len(s.rawPayloads[route]))) - } +// Store is the interface for a store that can store payloads and try parsing them +type Store interface { + // AppendPayload adds a payload to the store and tries parsing and adding a dumped json to the parsed store + AppendPayload(route string, data []byte, encoding string, collectTime time.Time) error + // CleanUpPayloadsOlderThan removes payloads older than the given time + CleanUpPayloadsOlderThan(time.Time) + // GetRawPayloads returns all raw payloads for a given route + GetRawPayloads(route string) []api.Payload + // GetRouteStats returns the number of payloads for each route + GetRouteStats() map[string]int + // Flush flushes the store + Flush() + // GetInternalMetrics returns the prometheus metrics for the store + GetInternalMetrics() []prometheus.Collector + // Close closes the store + Close() } -// GetRawPayloads returns payloads collected for route `route` -func (s *Store) GetRawPayloads(route string) (payloads []api.Payload) { - s.mutex.RLock() - defer s.mutex.RUnlock() - payloads = make([]api.Payload, len(s.rawPayloads[route])) - copy(payloads, s.rawPayloads[route]) - return payloads +// NewStore returns a new store +func NewStore() Store { + return newInMemoryStore() } -// GetJSONPayloads returns payloads collected and parsed to json for route `route` -func (s *Store) GetJSONPayloads(route string) (payloads []api.ParsedPayload, err error) { - if _, found := parserMap[route]; !found { - // Short path to returns directly if no parser is registered for the given route. - // No need to acquire the lock in that case. - return nil, fmt.Errorf("Json payload not supported for this route") +// GetJSONPayloads returns the parsed payloads for a given route +func GetJSONPayloads(store Store, route string) ([]api.ParsedPayload, error) { + parser, ok := parserMap[route] + if !ok { + return nil, fmt.Errorf("no parser for route %s", route) } - s.mutex.RLock() - defer s.mutex.RUnlock() - - payloads = make([]api.ParsedPayload, 0, len(s.rawPayloads[route])) - for _, raw := range s.rawPayloads[route] { - if jsonPayload, err := s.encodeToJSONRawPayload(raw, route); err == nil && jsonPayload != nil { - payloads = append(payloads, *jsonPayload) - } - } - return payloads, nil -} - -// GetRouteStats returns stats on collectedraw payloads by route -func (s *Store) GetRouteStats() map[string]int { - statsByRoute := map[string]int{} - s.mutex.RLock() - defer s.mutex.RUnlock() - for route, payloads := range s.rawPayloads { - statsByRoute[route] = len(payloads) - } - return statsByRoute -} - -// Flush cleans up any stored payload -func (s *Store) Flush() { - s.mutex.Lock() - defer s.mutex.Unlock() - s.rawPayloads = map[string][]api.Payload{} - s.NbPayloads.Reset() -} - -// encodeToJSONRawPayload used to decode a raw Payload into a Json Payload -// to know how to parse the raw payload that could be JSON or Protobuf, the function -// need to know the route. -func (s *Store) encodeToJSONRawPayload(rawPayload api.Payload, route string) (*api.ParsedPayload, error) { - if parsePayload, ok := parserMap[route]; ok { - var err error - data, err := parsePayload(rawPayload) + payloads := store.GetRawPayloads(route) + parsedPayloads := make([]api.ParsedPayload, 0, len(payloads)) + var errs []error + for _, payload := range payloads { + parsedPayload, err := parser(payload) if err != nil { - return nil, err - } - parsedPayload := &api.ParsedPayload{ - Timestamp: rawPayload.Timestamp, - Data: data, - Encoding: rawPayload.Encoding, + log.Printf("failed to parse payload %+v: %v\n", payload, err) + errs = append(errs, err) + continue } - return parsedPayload, nil + parsedPayloads = append(parsedPayloads, api.ParsedPayload{ + Timestamp: payload.Timestamp, + Data: parsedPayload, + Encoding: payload.Encoding, + }) } - return nil, nil + return parsedPayloads, errors.Join(errs...) } diff --git a/test/fakeintake/server/serverstore/store_test.go b/test/fakeintake/server/serverstore/store_test.go new file mode 100644 index 0000000000000..4dfcca7edff2a --- /dev/null +++ b/test/fakeintake/server/serverstore/store_test.go @@ -0,0 +1,109 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +package serverstore + +import ( + "encoding/json" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/DataDog/datadog-agent/test/fakeintake/api" +) + +type StoreTestSuite struct { + suite.Suite + StoreConstructor func() Store +} + +func jsonParser(p api.Payload) (interface{}, error) { + var result map[string]interface{} + err := json.Unmarshal(p.Data, &result) + if err != nil { + return nil, err + } + return result, nil +} + +func (suite *StoreTestSuite) TestAppendPayload() { + store := suite.StoreConstructor() + defer store.Close() + + data := []byte(`{"key":"value"}`) + parserMap["testRoute"] = jsonParser + err := store.AppendPayload("testRoute", data, "json", time.Now()) + assert.NoError(suite.T(), err) + + rawPayloads := store.GetRawPayloads("testRoute") + assert.Len(suite.T(), rawPayloads, 1) + assert.Equal(suite.T(), data, rawPayloads[0].Data) + + jsonPayloads, err := GetJSONPayloads(store, "testRoute") + require.NoError(suite.T(), err) + assert.Len(suite.T(), jsonPayloads, 1) + assert.Equal(suite.T(), map[string]interface{}{"key": "value"}, jsonPayloads[0].Data) +} + +func (suite *StoreTestSuite) TestCleanUpPayloadsOlderThan() { + store := suite.StoreConstructor() + defer store.Close() + + now := time.Now() + + // Add an old payload expected to be cleaned up first + err := store.AppendPayload("testRoute", []byte("{}"), "json", now.Add(-48*time.Hour)) + require.NoError(suite.T(), err) + + err = store.AppendPayload("testRoute", []byte("{}"), "json", now) + require.NoError(suite.T(), err) + + rawPayloads := store.GetRawPayloads("testRoute") + assert.Len(suite.T(), rawPayloads, 2) + + store.CleanUpPayloadsOlderThan(now.Add(-24 * time.Hour)) + + rawPayloads = store.GetRawPayloads("testRoute") + assert.Len(suite.T(), rawPayloads, 1) + + jsonPayloads, err := GetJSONPayloads(store, "testRoute") + require.NoError(suite.T(), err) + assert.Len(suite.T(), jsonPayloads, 1) +} + +func (suite *StoreTestSuite) TestGetRouteStats() { + store := suite.StoreConstructor() + defer store.Close() + + err := store.AppendPayload("routeA", []byte("{}"), "json", time.Now()) + require.NoError(suite.T(), err) + + err = store.AppendPayload("routeB", []byte("{}"), "json", time.Now()) + require.NoError(suite.T(), err) + + stats := store.GetRouteStats() + + assert.Equal(suite.T(), 1, stats["routeA"]) + assert.Equal(suite.T(), 1, stats["routeB"]) +} + +func (suite *StoreTestSuite) TestFlush() { + store := suite.StoreConstructor() + defer store.Close() + + err := store.AppendPayload("testRoute", []byte("{}"), "json", time.Now()) + require.NoError(suite.T(), err) + + store.Flush() + + rawPayloads := store.GetRawPayloads("testRoute") + assert.Len(suite.T(), rawPayloads, 0) + + jsonPayloads, err := GetJSONPayloads(store, "testRoute") + assert.NoError(suite.T(), err) + assert.Len(suite.T(), jsonPayloads, 0) +}