diff --git a/gateway/gateway_test.go b/gateway/gateway_test.go index 5126e6f265..5aa55417b0 100644 --- a/gateway/gateway_test.go +++ b/gateway/gateway_test.go @@ -1756,17 +1756,11 @@ var _ = Describe("Gateway", func() { return []byte(fmt.Sprintf(`[%s,%s]`, internalBatchPayload(), internalBatchPayload())) } + var statStore *memstats.Store // a second after receivedAt now, err := time.Parse(time.RFC3339Nano, "2024-01-01T01:01:02.000000001Z") Expect(err).To(BeNil()) - statStore, err := memstats.New( - memstats.WithNow(func() time.Time { - return now - }), - ) - Expect(err).To(BeNil()) - BeforeEach(func() { c.mockSuppressUser = mocksTypes.NewMockUserSuppression(c.mockCtrl) c.mockSuppressUserFeature = mocksApp.NewMockSuppressUserFeature(c.mockCtrl) @@ -1789,6 +1783,13 @@ var _ = Describe("Gateway", func() { internalBatchEndpoint = fmt.Sprintf("http://localhost:%d/internal/v1/batch", serverPort) GinkgoT().Setenv("RSERVER_GATEWAY_WEB_PORT", strconv.Itoa(serverPort)) + statStore, err = memstats.New( + memstats.WithNow(func() time.Time { + return now + }), + ) + Expect(err).To(BeNil()) + gateway = &Handle{} srcDebugger = mocksrcdebugger.NewMockSourceDebugger(c.mockCtrl) err = gateway.Setup(context.Background(), conf, logger.NOP, statStore, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), srcDebugger, nil) @@ -1872,12 +1873,12 @@ var _ = Describe("Gateway", func() { { Name: "gateway.write_key_requests", Tags: map[string]string{ - "workspaceId": WorkspaceID, - "sourceID": SourceIDEnabled, + "workspaceId": "", + "sourceID": "", "sourceType": "", "sdkVersion": "", "source": "", - "writeKey": WriteKeyEnabled, + "writeKey": "", "reqType": "internalBatch", }, Value: 1, @@ -1888,10 +1889,10 @@ var _ = Describe("Gateway", func() { Name: "gateway.write_key_successful_requests", Tags: map[string]string{ "source": "", - "writeKey": WriteKeyEnabled, + "writeKey": "", "reqType": "internalBatch", - "workspaceId": WorkspaceID, - "sourceID": SourceIDEnabled, + "workspaceId": "", + "sourceID": "", "sourceType": "", "sdkVersion": "", }, @@ -1903,10 +1904,10 @@ var _ = Describe("Gateway", func() { Name: "gateway.write_key_failed_requests", Tags: map[string]string{ "source": "", - "writeKey": WriteKeyEnabled, + "writeKey": "", "reqType": "internalBatch", - "workspaceId": WorkspaceID, - "sourceID": SourceIDEnabled, + "workspaceId": "", + "sourceID": "", "sourceType": "", "sdkVersion": "", }, @@ -2007,10 +2008,10 @@ var _ = Describe("Gateway", func() { Expect(err).To(BeNil()) Expect(http.StatusOK, resp.StatusCode) successfulReqStat := statStore.Get("gateway.write_key_successful_requests", map[string]string{ - "writeKey": WriteKeyEnabled, + "writeKey": "", "reqType": "internalBatch", - "workspaceId": WorkspaceID, - "sourceID": SourceIDEnabled, + "workspaceId": "", + "sourceID": "", "sourceType": "", "sdkVersion": "", "source": "", @@ -2029,16 +2030,16 @@ var _ = Describe("Gateway", func() { Expect(err).To(BeNil()) Expect(http.StatusOK, resp.StatusCode) successfulReqStat := statStore.Get("gateway.write_key_successful_requests", map[string]string{ - "writeKey": WriteKeyEnabled, + "writeKey": "", "reqType": "internalBatch", - "workspaceId": WorkspaceID, - "sourceID": SourceIDEnabled, + "workspaceId": "", + "sourceID": "", "sourceType": "", "sdkVersion": "", "source": "", }) Expect(successfulReqStat).To(Not(BeNil())) - Expect(successfulReqStat.LastValue()).To(Equal(float64(3))) + Expect(successfulReqStat.LastValue()).To(Equal(float64(1))) successfulEventStat := statStore.Get("gateway.write_key_successful_events", map[string]string{ "writeKey": WriteKeyEnabled, "reqType": "internalBatch", @@ -2049,7 +2050,7 @@ var _ = Describe("Gateway", func() { "source": "", }) Expect(successfulEventStat).To(Not(BeNil())) - Expect(successfulEventStat.LastValue()).To(Equal(float64(3))) + Expect(successfulEventStat.LastValue()).To(Equal(float64(2))) eventsStat := statStore.Get("gateway.write_key_events", map[string]string{ "writeKey": WriteKeyEnabled, "reqType": "internalBatch", @@ -2060,7 +2061,7 @@ var _ = Describe("Gateway", func() { "source": "", }) Expect(eventsStat).To(Not(BeNil())) - Expect(eventsStat.Values()).To(Equal([]float64{1, 2, 3})) + Expect(eventsStat.Values()).To(Equal([]float64{1, 2})) }) It("request failed db error", func() { @@ -2072,10 +2073,10 @@ var _ = Describe("Gateway", func() { Expect(err).To(BeNil()) Expect(http.StatusInternalServerError, resp.StatusCode) failedReqStat := statStore.Get("gateway.write_key_failed_requests", map[string]string{ - "writeKey": WriteKeyEnabled, + "writeKey": "", "reqType": "internalBatch", - "workspaceId": WorkspaceID, - "sourceID": SourceIDEnabled, + "workspaceId": "", + "sourceID": "", "sourceType": "", "sdkVersion": "", "source": "", @@ -2105,7 +2106,7 @@ var _ = Describe("Gateway", func() { "source": "", }) Expect(eventsStat).To(Not(BeNil())) - Expect(eventsStat.Values()).To(Equal([]float64{1, 2, 3, 4})) + Expect(eventsStat.Values()).To(Equal([]float64{1})) }) }) diff --git a/gateway/handle.go b/gateway/handle.go index 33261ab3c7..381f462bfa 100644 --- a/gateway/handle.go +++ b/gateway/handle.go @@ -13,9 +13,9 @@ import ( "time" jsoniter "github.com/json-iterator/go" + "github.com/samber/lo" "github.com/google/uuid" - "github.com/samber/lo" "github.com/tidwall/gjson" "github.com/tidwall/sjson" @@ -675,13 +675,15 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc { }) if err = gw.storeJobs(ctx, jobs); err != nil { for _, jws := range jobsWithStats { - jws.stat.RequestEventsFailed(1, "storeFailed") + jws.stat.EventsFailed(1, "storeFailed") jws.stat.Report(gw.stats) } + stat.RequestFailed("storeFailed") + stat.Report(gw.stats) goto requestError } for _, jws := range jobsWithStats { - jws.stat.RequestEventsSucceeded(1) + jws.stat.EventsSuccess(1) jws.stat.Report(gw.stats) // Sending events to config backend if jws.stat.WriteKey == "" { @@ -690,6 +692,8 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc { } gw.sourcehandle.RecordEvent(jws.stat.WriteKey, jws.job.EventPayload) } + stat.RequestSucceeded() + stat.Report(gw.stats) } else { stat.RequestEventsSucceeded(0) stat.Report(gw.stats) diff --git a/gateway/internal/stats/stats.go b/gateway/internal/stats/stats.go index e6b5c0f031..b3d12b3170 100644 --- a/gateway/internal/stats/stats.go +++ b/gateway/internal/stats/stats.go @@ -77,6 +77,19 @@ func (ss *SourceStat) RequestEventsFailed(num int, reason string) { ss.reason = reason } +// EventsSuccess increments the events total & succeeded counters by num +func (ss *SourceStat) EventsSuccess(num int) { + ss.events.succeeded += num + ss.events.total += num +} + +// EventsFailed increments the events total & failed counters by num +func (ss *SourceStat) EventsFailed(num int, reason string) { + ss.events.failed += num + ss.events.total += num + ss.reason = reason +} + func (ss *SourceStat) RequestEventsBot(num int) { ss.events.bot += num } @@ -97,11 +110,13 @@ func (ss *SourceStat) Report(s stats.Stats) { if ss.reason != "" { failedTags["reason"] = ss.reason } - s.NewTaggedStat("gateway.write_key_requests", stats.CountType, tags).Count(ss.requests.total) - s.NewTaggedStat("gateway.write_key_successful_requests", stats.CountType, tags).Count(ss.requests.succeeded) - s.NewTaggedStat("gateway.write_key_failed_requests", stats.CountType, failedTags).Count(ss.requests.failed) - s.NewTaggedStat("gateway.write_key_dropped_requests", stats.CountType, tags).Count(ss.requests.dropped) - s.NewTaggedStat("gateway.write_key_suppressed_requests", stats.CountType, tags).Count(ss.requests.suppressed) + if ss.requests.total > 0 { + s.NewTaggedStat("gateway.write_key_requests", stats.CountType, tags).Count(ss.requests.total) + s.NewTaggedStat("gateway.write_key_successful_requests", stats.CountType, tags).Count(ss.requests.succeeded) + s.NewTaggedStat("gateway.write_key_failed_requests", stats.CountType, failedTags).Count(ss.requests.failed) + s.NewTaggedStat("gateway.write_key_dropped_requests", stats.CountType, tags).Count(ss.requests.dropped) + s.NewTaggedStat("gateway.write_key_suppressed_requests", stats.CountType, tags).Count(ss.requests.suppressed) + } if ss.events.total > 0 { s.NewTaggedStat("gateway.write_key_events", stats.CountType, tags).Count(ss.events.total) s.NewTaggedStat("gateway.write_key_successful_events", stats.CountType, tags).Count(ss.events.succeeded)