Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: sync release v1.42.3 to main branch #5492

Merged
merged 9 commits into from
Feb 10, 2025
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# Changelog

## [1.42.3](https://github.com/rudderlabs/rudder-server/compare/v1.42.2...v1.42.3) (2025-02-10)


### Bug Fixes

* async framework destination live events ([#5480](https://github.com/rudderlabs/rudder-server/issues/5480)) ([40d9c7d](https://github.com/rudderlabs/rudder-server/commit/40d9c7d449ceae6c9393cefbf49993a3207f6cb6))


### Miscellaneous

* fix gateway metrics ([#5483](https://github.com/rudderlabs/rudder-server/issues/5483)) ([77b7d02](https://github.com/rudderlabs/rudder-server/commit/77b7d025fa9a4d22fbad5851b6efd14a9e973d5d))
* make cslb configurable ([#5451](https://github.com/rudderlabs/rudder-server/issues/5451)) ([54820a4](https://github.com/rudderlabs/rudder-server/commit/54820a4086f63a53e899f6950689087c323866c0))

## [1.42.2](https://github.com/rudderlabs/rudder-server/compare/v1.42.1...v1.42.2) (2025-02-06)


Expand Down
59 changes: 30 additions & 29 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1756,17 +1756,11 @@ var _ = Describe("Gateway", func() {
return []byte(fmt.Sprintf(`[%s,%s]`, internalBatchPayload(), internalBatchPayload()))
}

var statStore *memstats.Store
// a second after receivedAt
now, err := time.Parse(time.RFC3339Nano, "2024-01-01T01:01:02.000000001Z")
Expect(err).To(BeNil())

statStore, err := memstats.New(
memstats.WithNow(func() time.Time {
return now
}),
)
Expect(err).To(BeNil())

BeforeEach(func() {
c.mockSuppressUser = mocksTypes.NewMockUserSuppression(c.mockCtrl)
c.mockSuppressUserFeature = mocksApp.NewMockSuppressUserFeature(c.mockCtrl)
Expand All @@ -1789,6 +1783,13 @@ var _ = Describe("Gateway", func() {
internalBatchEndpoint = fmt.Sprintf("http://localhost:%d/internal/v1/batch", serverPort)
GinkgoT().Setenv("RSERVER_GATEWAY_WEB_PORT", strconv.Itoa(serverPort))

statStore, err = memstats.New(
memstats.WithNow(func() time.Time {
return now
}),
)
Expect(err).To(BeNil())

gateway = &Handle{}
srcDebugger = mocksrcdebugger.NewMockSourceDebugger(c.mockCtrl)
err = gateway.Setup(context.Background(), conf, logger.NOP, statStore, c.mockApp, c.mockBackendConfig, c.mockJobsDB, c.mockErrJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), transformer.NewNoOpService(), srcDebugger, nil)
Expand Down Expand Up @@ -1872,12 +1873,12 @@ var _ = Describe("Gateway", func() {
{
Name: "gateway.write_key_requests",
Tags: map[string]string{
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"workspaceId": "",
"sourceID": "",
"sourceType": "",
"sdkVersion": "",
"source": "",
"writeKey": WriteKeyEnabled,
"writeKey": "",
"reqType": "internalBatch",
},
Value: 1,
Expand All @@ -1888,10 +1889,10 @@ var _ = Describe("Gateway", func() {
Name: "gateway.write_key_successful_requests",
Tags: map[string]string{
"source": "",
"writeKey": WriteKeyEnabled,
"writeKey": "",
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"workspaceId": "",
"sourceID": "",
"sourceType": "",
"sdkVersion": "",
},
Expand All @@ -1903,10 +1904,10 @@ var _ = Describe("Gateway", func() {
Name: "gateway.write_key_failed_requests",
Tags: map[string]string{
"source": "",
"writeKey": WriteKeyEnabled,
"writeKey": "",
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"workspaceId": "",
"sourceID": "",
"sourceType": "",
"sdkVersion": "",
},
Expand Down Expand Up @@ -2007,10 +2008,10 @@ var _ = Describe("Gateway", func() {
Expect(err).To(BeNil())
Expect(http.StatusOK, resp.StatusCode)
successfulReqStat := statStore.Get("gateway.write_key_successful_requests", map[string]string{
"writeKey": WriteKeyEnabled,
"writeKey": "",
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"workspaceId": "",
"sourceID": "",
"sourceType": "",
"sdkVersion": "",
"source": "",
Expand All @@ -2029,16 +2030,16 @@ var _ = Describe("Gateway", func() {
Expect(err).To(BeNil())
Expect(http.StatusOK, resp.StatusCode)
successfulReqStat := statStore.Get("gateway.write_key_successful_requests", map[string]string{
"writeKey": WriteKeyEnabled,
"writeKey": "",
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"workspaceId": "",
"sourceID": "",
"sourceType": "",
"sdkVersion": "",
"source": "",
})
Expect(successfulReqStat).To(Not(BeNil()))
Expect(successfulReqStat.LastValue()).To(Equal(float64(3)))
Expect(successfulReqStat.LastValue()).To(Equal(float64(1)))
successfulEventStat := statStore.Get("gateway.write_key_successful_events", map[string]string{
"writeKey": WriteKeyEnabled,
"reqType": "internalBatch",
Expand All @@ -2049,7 +2050,7 @@ var _ = Describe("Gateway", func() {
"source": "",
})
Expect(successfulEventStat).To(Not(BeNil()))
Expect(successfulEventStat.LastValue()).To(Equal(float64(3)))
Expect(successfulEventStat.LastValue()).To(Equal(float64(2)))
eventsStat := statStore.Get("gateway.write_key_events", map[string]string{
"writeKey": WriteKeyEnabled,
"reqType": "internalBatch",
Expand All @@ -2060,7 +2061,7 @@ var _ = Describe("Gateway", func() {
"source": "",
})
Expect(eventsStat).To(Not(BeNil()))
Expect(eventsStat.Values()).To(Equal([]float64{1, 2, 3}))
Expect(eventsStat.Values()).To(Equal([]float64{1, 2}))
})

It("request failed db error", func() {
Expand All @@ -2072,10 +2073,10 @@ var _ = Describe("Gateway", func() {
Expect(err).To(BeNil())
Expect(http.StatusInternalServerError, resp.StatusCode)
failedReqStat := statStore.Get("gateway.write_key_failed_requests", map[string]string{
"writeKey": WriteKeyEnabled,
"writeKey": "",
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"workspaceId": "",
"sourceID": "",
"sourceType": "",
"sdkVersion": "",
"source": "",
Expand Down Expand Up @@ -2105,7 +2106,7 @@ var _ = Describe("Gateway", func() {
"source": "",
})
Expect(eventsStat).To(Not(BeNil()))
Expect(eventsStat.Values()).To(Equal([]float64{1, 2, 3, 4}))
Expect(eventsStat.Values()).To(Equal([]float64{1}))
})
})

Expand Down
10 changes: 7 additions & 3 deletions gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"time"

jsoniter "github.com/json-iterator/go"
"github.com/samber/lo"

"github.com/google/uuid"
"github.com/samber/lo"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"

Expand Down Expand Up @@ -675,13 +675,15 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc {
})
if err = gw.storeJobs(ctx, jobs); err != nil {
for _, jws := range jobsWithStats {
jws.stat.RequestEventsFailed(1, "storeFailed")
jws.stat.EventsFailed(1, "storeFailed")
jws.stat.Report(gw.stats)
}
stat.RequestFailed("storeFailed")
stat.Report(gw.stats)
goto requestError
}
for _, jws := range jobsWithStats {
jws.stat.RequestEventsSucceeded(1)
jws.stat.EventsSuccess(1)
jws.stat.Report(gw.stats)
// Sending events to config backend
if jws.stat.WriteKey == "" {
Expand All @@ -690,6 +692,8 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc {
}
gw.sourcehandle.RecordEvent(jws.stat.WriteKey, jws.job.EventPayload)
}
stat.RequestSucceeded()
stat.Report(gw.stats)
} else {
stat.RequestEventsSucceeded(0)
stat.Report(gw.stats)
Expand Down
25 changes: 20 additions & 5 deletions gateway/internal/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ func (ss *SourceStat) RequestEventsFailed(num int, reason string) {
ss.reason = reason
}

// EventsSuccess increments the events total & succeeded counters by num
func (ss *SourceStat) EventsSuccess(num int) {
ss.events.succeeded += num
ss.events.total += num
}

// EventsFailed increments the events total & failed counters by num
func (ss *SourceStat) EventsFailed(num int, reason string) {
ss.events.failed += num
ss.events.total += num
ss.reason = reason
}

func (ss *SourceStat) RequestEventsBot(num int) {
ss.events.bot += num
}
Expand All @@ -97,11 +110,13 @@ func (ss *SourceStat) Report(s stats.Stats) {
if ss.reason != "" {
failedTags["reason"] = ss.reason
}
s.NewTaggedStat("gateway.write_key_requests", stats.CountType, tags).Count(ss.requests.total)
s.NewTaggedStat("gateway.write_key_successful_requests", stats.CountType, tags).Count(ss.requests.succeeded)
s.NewTaggedStat("gateway.write_key_failed_requests", stats.CountType, failedTags).Count(ss.requests.failed)
s.NewTaggedStat("gateway.write_key_dropped_requests", stats.CountType, tags).Count(ss.requests.dropped)
s.NewTaggedStat("gateway.write_key_suppressed_requests", stats.CountType, tags).Count(ss.requests.suppressed)
if ss.requests.total > 0 {
s.NewTaggedStat("gateway.write_key_requests", stats.CountType, tags).Count(ss.requests.total)
s.NewTaggedStat("gateway.write_key_successful_requests", stats.CountType, tags).Count(ss.requests.succeeded)
s.NewTaggedStat("gateway.write_key_failed_requests", stats.CountType, failedTags).Count(ss.requests.failed)
s.NewTaggedStat("gateway.write_key_dropped_requests", stats.CountType, tags).Count(ss.requests.dropped)
s.NewTaggedStat("gateway.write_key_suppressed_requests", stats.CountType, tags).Count(ss.requests.suppressed)
}
if ss.events.total > 0 {
s.NewTaggedStat("gateway.write_key_events", stats.CountType, tags).Count(ss.events.total)
s.NewTaggedStat("gateway.write_key_successful_events", stats.CountType, tags).Count(ss.events.succeeded)
Expand Down
117 changes: 117 additions & 0 deletions internal/transformer-client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package transformerclient

import (
"context"
"net"
"net/http"
"time"

"github.com/bufbuild/httplb"
"github.com/bufbuild/httplb/conn"
"github.com/bufbuild/httplb/picker"
"github.com/bufbuild/httplb/resolver"

"github.com/rudderlabs/rudder-server/utils/sysUtils"
)

type ClientConfig struct {
TransportConfig struct {
DisableKeepAlives bool // true
MaxConnsPerHost int // 100
MaxIdleConnsPerHost int // 10
IdleConnTimeout time.Duration // 30*time.Second
}

ClientTimeout time.Duration // 600*time.Second
ClientTTL time.Duration // 10*time.Second

ClientType string // stdlib(default), recycled, httplb

PickerType string // power_of_two(default), round_robin, least_loaded_random, least_loaded_round_robin, random
}

type Client interface {
Do(req *http.Request) (*http.Response, error)
}

func NewClient(config *ClientConfig) Client {
transport := &http.Transport{
DisableKeepAlives: true,
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 30 * time.Second,
}
client := &http.Client{
Transport: transport,
Timeout: 600 * time.Second,
}
if config == nil {
return client
}

Check warning on line 50 in internal/transformer-client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/transformer-client/client.go#L49-L50

Added lines #L49 - L50 were not covered by tests

transport.DisableKeepAlives = config.TransportConfig.DisableKeepAlives
if config.TransportConfig.MaxConnsPerHost != 0 {
transport.MaxConnsPerHost = config.TransportConfig.MaxConnsPerHost
}
if config.TransportConfig.MaxIdleConnsPerHost != 0 {
transport.MaxIdleConnsPerHost = config.TransportConfig.MaxIdleConnsPerHost
}
if config.TransportConfig.IdleConnTimeout != 0 {
transport.IdleConnTimeout = config.TransportConfig.IdleConnTimeout
}

if config.ClientTimeout != 0 {
client.Timeout = config.ClientTimeout
}

clientTTL := 10 * time.Second
if config.ClientTTL != 0 {
clientTTL = config.ClientTTL
}

switch config.ClientType {
case "stdlib":
return client
case "recycled":
return sysUtils.NewRecycledHTTPClient(func() *http.Client {
return client
}, clientTTL)
case "httplb":
return httplb.NewClient(
httplb.WithRootContext(context.TODO()),
httplb.WithPicker(getPicker(config.PickerType)),
httplb.WithIdleConnectionTimeout(transport.IdleConnTimeout),
httplb.WithRequestTimeout(client.Timeout),
httplb.WithRoundTripperMaxLifetime(transport.IdleConnTimeout),
httplb.WithIdleTransportTimeout(2*transport.IdleConnTimeout),
httplb.WithResolver(resolver.NewDNSResolver(net.DefaultResolver, resolver.PreferIPv4, clientTTL)),
)
default:
return client

Check warning on line 90 in internal/transformer-client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/transformer-client/client.go#L75-L90

Added lines #L75 - L90 were not covered by tests
}
}

func getPicker(pickerType string) func(prev picker.Picker, allConns conn.Conns) picker.Picker {
switch pickerType {
case "power_of_two":
return picker.NewPowerOfTwo
case "round_robin":
return picker.NewRoundRobin
case "least_loaded_random":
return picker.NewLeastLoadedRandom
case "least_loaded_round_robin":
return picker.NewLeastLoadedRoundRobin
case "random":
return picker.NewRandom
default:
return picker.NewPowerOfTwo

Check warning on line 107 in internal/transformer-client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/transformer-client/client.go#L94-L107

Added lines #L94 - L107 were not covered by tests
}
}

type HTTPLBTransport struct {
*http.Transport
}

func (t *HTTPLBTransport) NewRoundTripper(scheme, target string, config httplb.TransportConfig) httplb.RoundTripperResult {
return httplb.RoundTripperResult{RoundTripper: t.Transport, Close: t.CloseIdleConnections}

Check warning on line 116 in internal/transformer-client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/transformer-client/client.go#L115-L116

Added lines #L115 - L116 were not covered by tests
}
Loading
Loading