Skip to content

Commit

Permalink
chore: fix gateway metrics (#5483)
Browse files Browse the repository at this point in the history
* chore: fix gateway metrics

* fix tests
  • Loading branch information
mihir20 authored Feb 10, 2025
1 parent 40d9c7d commit 77b7d02
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 37 deletions.
59 changes: 30 additions & 29 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1756,17 +1756,11 @@ var _ = Describe("Gateway", func() {
return []byte(fmt.Sprintf(`[%s,%s]`, internalBatchPayload(), internalBatchPayload()))
}

var statStore *memstats.Store
// a second after receivedAt
now, err := time.Parse(time.RFC3339Nano, "2024-01-01T01:01:02.000000001Z")
Expect(err).To(BeNil())

statStore, err := memstats.New(
memstats.WithNow(func() time.Time {
return now
}),
)
Expect(err).To(BeNil())

BeforeEach(func() {
c.mockSuppressUser = mocksTypes.NewMockUserSuppression(c.mockCtrl)
c.mockSuppressUserFeature = mocksApp.NewMockSuppressUserFeature(c.mockCtrl)
Expand All @@ -1789,6 +1783,13 @@ var _ = Describe("Gateway", func() {
internalBatchEndpoint = fmt.Sprintf("http://localhost:%d/internal/v1/batch", serverPort)
GinkgoT().Setenv("RSERVER_GATEWAY_WEB_PORT", strconv.Itoa(serverPort))

statStore, err = memstats.New(
memstats.WithNow(func() time.Time {
return now
}),
)
Expect(err).To(BeNil())

gateway = &Handle{}
srcDebugger = mocksrcdebugger.NewMockSourceDebugger(c.mockCtrl)
err = gateway.Setup(context.Background(), conf, logger.NOP, statStore, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), srcDebugger, nil)
Expand Down Expand Up @@ -1872,12 +1873,12 @@ var _ = Describe("Gateway", func() {
{
Name: "gateway.write_key_requests",
Tags: map[string]string{
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"workspaceId": "",
"sourceID": "",
"sourceType": "",
"sdkVersion": "",
"source": "",
"writeKey": WriteKeyEnabled,
"writeKey": "",
"reqType": "internalBatch",
},
Value: 1,
Expand All @@ -1888,10 +1889,10 @@ var _ = Describe("Gateway", func() {
Name: "gateway.write_key_successful_requests",
Tags: map[string]string{
"source": "",
"writeKey": WriteKeyEnabled,
"writeKey": "",
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"workspaceId": "",
"sourceID": "",
"sourceType": "",
"sdkVersion": "",
},
Expand All @@ -1903,10 +1904,10 @@ var _ = Describe("Gateway", func() {
Name: "gateway.write_key_failed_requests",
Tags: map[string]string{
"source": "",
"writeKey": WriteKeyEnabled,
"writeKey": "",
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"workspaceId": "",
"sourceID": "",
"sourceType": "",
"sdkVersion": "",
},
Expand Down Expand Up @@ -2007,10 +2008,10 @@ var _ = Describe("Gateway", func() {
Expect(err).To(BeNil())
Expect(http.StatusOK, resp.StatusCode)
successfulReqStat := statStore.Get("gateway.write_key_successful_requests", map[string]string{
"writeKey": WriteKeyEnabled,
"writeKey": "",
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"workspaceId": "",
"sourceID": "",
"sourceType": "",
"sdkVersion": "",
"source": "",
Expand All @@ -2029,16 +2030,16 @@ var _ = Describe("Gateway", func() {
Expect(err).To(BeNil())
Expect(http.StatusOK, resp.StatusCode)
successfulReqStat := statStore.Get("gateway.write_key_successful_requests", map[string]string{
"writeKey": WriteKeyEnabled,
"writeKey": "",
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"workspaceId": "",
"sourceID": "",
"sourceType": "",
"sdkVersion": "",
"source": "",
})
Expect(successfulReqStat).To(Not(BeNil()))
Expect(successfulReqStat.LastValue()).To(Equal(float64(3)))
Expect(successfulReqStat.LastValue()).To(Equal(float64(1)))
successfulEventStat := statStore.Get("gateway.write_key_successful_events", map[string]string{
"writeKey": WriteKeyEnabled,
"reqType": "internalBatch",
Expand All @@ -2049,7 +2050,7 @@ var _ = Describe("Gateway", func() {
"source": "",
})
Expect(successfulEventStat).To(Not(BeNil()))
Expect(successfulEventStat.LastValue()).To(Equal(float64(3)))
Expect(successfulEventStat.LastValue()).To(Equal(float64(2)))
eventsStat := statStore.Get("gateway.write_key_events", map[string]string{
"writeKey": WriteKeyEnabled,
"reqType": "internalBatch",
Expand All @@ -2060,7 +2061,7 @@ var _ = Describe("Gateway", func() {
"source": "",
})
Expect(eventsStat).To(Not(BeNil()))
Expect(eventsStat.Values()).To(Equal([]float64{1, 2, 3}))
Expect(eventsStat.Values()).To(Equal([]float64{1, 2}))
})

It("request failed db error", func() {
Expand All @@ -2072,10 +2073,10 @@ var _ = Describe("Gateway", func() {
Expect(err).To(BeNil())
Expect(http.StatusInternalServerError, resp.StatusCode)
failedReqStat := statStore.Get("gateway.write_key_failed_requests", map[string]string{
"writeKey": WriteKeyEnabled,
"writeKey": "",
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"workspaceId": "",
"sourceID": "",
"sourceType": "",
"sdkVersion": "",
"source": "",
Expand Down Expand Up @@ -2105,7 +2106,7 @@ var _ = Describe("Gateway", func() {
"source": "",
})
Expect(eventsStat).To(Not(BeNil()))
Expect(eventsStat.Values()).To(Equal([]float64{1, 2, 3, 4}))
Expect(eventsStat.Values()).To(Equal([]float64{1}))
})
})

Expand Down
10 changes: 7 additions & 3 deletions gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"time"

jsoniter "github.com/json-iterator/go"
"github.com/samber/lo"

"github.com/google/uuid"
"github.com/samber/lo"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"

Expand Down Expand Up @@ -675,13 +675,15 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc {
})
if err = gw.storeJobs(ctx, jobs); err != nil {
for _, jws := range jobsWithStats {
jws.stat.RequestEventsFailed(1, "storeFailed")
jws.stat.EventsFailed(1, "storeFailed")
jws.stat.Report(gw.stats)
}
stat.RequestFailed("storeFailed")
stat.Report(gw.stats)
goto requestError
}
for _, jws := range jobsWithStats {
jws.stat.RequestEventsSucceeded(1)
jws.stat.EventsSuccess(1)
jws.stat.Report(gw.stats)
// Sending events to config backend
if jws.stat.WriteKey == "" {
Expand All @@ -690,6 +692,8 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc {
}
gw.sourcehandle.RecordEvent(jws.stat.WriteKey, jws.job.EventPayload)
}
stat.RequestSucceeded()
stat.Report(gw.stats)
} else {
stat.RequestEventsSucceeded(0)
stat.Report(gw.stats)
Expand Down
25 changes: 20 additions & 5 deletions gateway/internal/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ func (ss *SourceStat) RequestEventsFailed(num int, reason string) {
ss.reason = reason
}

// EventsSuccess increments the events total & succeeded counters by num
func (ss *SourceStat) EventsSuccess(num int) {
ss.events.succeeded += num
ss.events.total += num
}

// EventsFailed increments the events total & failed counters by num
func (ss *SourceStat) EventsFailed(num int, reason string) {
ss.events.failed += num
ss.events.total += num
ss.reason = reason
}

func (ss *SourceStat) RequestEventsBot(num int) {
ss.events.bot += num
}
Expand All @@ -97,11 +110,13 @@ func (ss *SourceStat) Report(s stats.Stats) {
if ss.reason != "" {
failedTags["reason"] = ss.reason
}
s.NewTaggedStat("gateway.write_key_requests", stats.CountType, tags).Count(ss.requests.total)
s.NewTaggedStat("gateway.write_key_successful_requests", stats.CountType, tags).Count(ss.requests.succeeded)
s.NewTaggedStat("gateway.write_key_failed_requests", stats.CountType, failedTags).Count(ss.requests.failed)
s.NewTaggedStat("gateway.write_key_dropped_requests", stats.CountType, tags).Count(ss.requests.dropped)
s.NewTaggedStat("gateway.write_key_suppressed_requests", stats.CountType, tags).Count(ss.requests.suppressed)
if ss.requests.total > 0 {
s.NewTaggedStat("gateway.write_key_requests", stats.CountType, tags).Count(ss.requests.total)
s.NewTaggedStat("gateway.write_key_successful_requests", stats.CountType, tags).Count(ss.requests.succeeded)
s.NewTaggedStat("gateway.write_key_failed_requests", stats.CountType, failedTags).Count(ss.requests.failed)
s.NewTaggedStat("gateway.write_key_dropped_requests", stats.CountType, tags).Count(ss.requests.dropped)
s.NewTaggedStat("gateway.write_key_suppressed_requests", stats.CountType, tags).Count(ss.requests.suppressed)
}
if ss.events.total > 0 {
s.NewTaggedStat("gateway.write_key_events", stats.CountType, tags).Count(ss.events.total)
s.NewTaggedStat("gateway.write_key_successful_events", stats.CountType, tags).Count(ss.events.succeeded)
Expand Down

0 comments on commit 77b7d02

Please sign in to comment.