Skip to content

Commit

Permalink
chore: make cslb configurable (#5451)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 authored Feb 10, 2025
1 parent 77b7d02 commit 54820a4
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 57 deletions.
117 changes: 117 additions & 0 deletions internal/transformer-client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package transformerclient

import (
"context"
"net"
"net/http"
"time"

"github.com/bufbuild/httplb"
"github.com/bufbuild/httplb/conn"
"github.com/bufbuild/httplb/picker"
"github.com/bufbuild/httplb/resolver"

"github.com/rudderlabs/rudder-server/utils/sysUtils"
)

type ClientConfig struct {
TransportConfig struct {
DisableKeepAlives bool // true
MaxConnsPerHost int // 100
MaxIdleConnsPerHost int // 10
IdleConnTimeout time.Duration // 30*time.Second
}

ClientTimeout time.Duration // 600*time.Second
ClientTTL time.Duration // 10*time.Second

ClientType string // stdlib(default), recycled, httplb

PickerType string // power_of_two(default), round_robin, least_loaded_random, least_loaded_round_robin, random
}

type Client interface {
Do(req *http.Request) (*http.Response, error)
}

func NewClient(config *ClientConfig) Client {
transport := &http.Transport{
DisableKeepAlives: true,
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 30 * time.Second,
}
client := &http.Client{
Transport: transport,
Timeout: 600 * time.Second,
}
if config == nil {
return client
}

transport.DisableKeepAlives = config.TransportConfig.DisableKeepAlives
if config.TransportConfig.MaxConnsPerHost != 0 {
transport.MaxConnsPerHost = config.TransportConfig.MaxConnsPerHost
}
if config.TransportConfig.MaxIdleConnsPerHost != 0 {
transport.MaxIdleConnsPerHost = config.TransportConfig.MaxIdleConnsPerHost
}
if config.TransportConfig.IdleConnTimeout != 0 {
transport.IdleConnTimeout = config.TransportConfig.IdleConnTimeout
}

if config.ClientTimeout != 0 {
client.Timeout = config.ClientTimeout
}

clientTTL := 10 * time.Second
if config.ClientTTL != 0 {
clientTTL = config.ClientTTL
}

switch config.ClientType {
case "stdlib":
return client
case "recycled":
return sysUtils.NewRecycledHTTPClient(func() *http.Client {
return client
}, clientTTL)
case "httplb":
return httplb.NewClient(
httplb.WithRootContext(context.TODO()),
httplb.WithPicker(getPicker(config.PickerType)),
httplb.WithIdleConnectionTimeout(transport.IdleConnTimeout),
httplb.WithRequestTimeout(client.Timeout),
httplb.WithRoundTripperMaxLifetime(transport.IdleConnTimeout),
httplb.WithIdleTransportTimeout(2*transport.IdleConnTimeout),
httplb.WithResolver(resolver.NewDNSResolver(net.DefaultResolver, resolver.PreferIPv4, clientTTL)),
)
default:
return client
}
}

func getPicker(pickerType string) func(prev picker.Picker, allConns conn.Conns) picker.Picker {
switch pickerType {
case "power_of_two":
return picker.NewPowerOfTwo
case "round_robin":
return picker.NewRoundRobin
case "least_loaded_random":
return picker.NewLeastLoadedRandom
case "least_loaded_round_robin":
return picker.NewLeastLoadedRoundRobin
case "random":
return picker.NewRandom
default:
return picker.NewPowerOfTwo
}
}

type HTTPLBTransport struct {
*http.Transport
}

func (t *HTTPLBTransport) NewRoundTripper(scheme, target string, config httplb.TransportConfig) httplb.RoundTripperResult {
return httplb.RoundTripperResult{RoundTripper: t.Transport, Close: t.CloseIdleConnections}
}
86 changes: 32 additions & 54 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"
"io"
"net"
"net/http"
"os"
"runtime/trace"
Expand All @@ -16,8 +15,6 @@ import (
"sync"
"time"

"github.com/bufbuild/httplb"
"github.com/bufbuild/httplb/resolver"
"github.com/cenkalti/backoff"
jsoniter "github.com/json-iterator/go"
"github.com/samber/lo"
Expand All @@ -27,9 +24,9 @@ import (
"github.com/rudderlabs/rudder-go-kit/stats"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
transformerclient "github.com/rudderlabs/rudder-server/internal/transformer-client"
"github.com/rudderlabs/rudder-server/processor/integrations"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/sysUtils"
"github.com/rudderlabs/rudder-server/utils/types"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)
Expand Down Expand Up @@ -202,11 +199,12 @@ type handle struct {
guardConcurrency chan struct{}

config struct {
maxConcurrency int
maxHTTPConnections int
maxHTTPIdleConnections int
maxIdleConnDuration time.Duration
disableKeepAlives bool
maxConcurrency int
maxHTTPConnections int
maxHTTPIdleConnections int
maxIdleConnDuration time.Duration
disableKeepAlives bool
collectInstanceLevelStats bool

timeoutDuration time.Duration

Expand Down Expand Up @@ -244,47 +242,22 @@ func NewTransformer(conf *config.Config, log logger.Logger, stat stats.Stats, op
trans.config.maxRetry = conf.GetReloadableIntVar(30, 1, "Processor.maxRetry")
trans.config.failOnUserTransformTimeout = conf.GetReloadableBoolVar(false, "Processor.Transformer.failOnUserTransformTimeout")
trans.config.failOnError = conf.GetReloadableBoolVar(false, "Processor.Transformer.failOnError")

trans.config.collectInstanceLevelStats = conf.GetBool("Processor.collectInstanceLevelStats", false)
trans.config.maxRetryBackoffInterval = conf.GetReloadableDurationVar(30, time.Second, "Processor.Transformer.maxRetryBackoffInterval")

trans.guardConcurrency = make(chan struct{}, trans.config.maxConcurrency)

clientType := conf.GetString("Transformer.Client.type", "stdlib")

transport := &http.Transport{
DisableKeepAlives: trans.config.disableKeepAlives,
MaxConnsPerHost: trans.config.maxHTTPConnections,
MaxIdleConnsPerHost: trans.config.maxHTTPIdleConnections,
IdleConnTimeout: trans.config.maxIdleConnDuration,
}
client := &http.Client{
Transport: transport,
Timeout: trans.config.timeoutDuration,
}

switch clientType {
case "stdlib":
trans.httpClient = client
case "recycled":
trans.httpClient = sysUtils.NewRecycledHTTPClient(func() *http.Client {
return client
}, config.GetDuration("Transformer.Client.ttl", 120, time.Second))
case "httplb":
trans.httpClient = httplb.NewClient(
httplb.WithTransport("http", &HTTPLBTransport{
Transport: transport,
}),
httplb.WithResolver(
resolver.NewDNSResolver(
net.DefaultResolver,
resolver.PreferIPv6,
config.GetDuration("Transformer.Client.ttl", 120, time.Second), // TTL value
),
),
)
default:
panic(fmt.Sprintf("unknown transformer client type: %s", clientType))
transformerClientConfig := &transformerclient.ClientConfig{
ClientTimeout: trans.config.timeoutDuration,
ClientTTL: config.GetDuration("Transformer.Client.ttl", 10, time.Second),
ClientType: conf.GetString("Transformer.Client.type", "stdlib"),
PickerType: conf.GetString("Transformer.Client.httplb.pickerType", "power_of_two"),
}
transformerClientConfig.TransportConfig.DisableKeepAlives = trans.config.disableKeepAlives
transformerClientConfig.TransportConfig.MaxConnsPerHost = trans.config.maxHTTPConnections
transformerClientConfig.TransportConfig.MaxIdleConnsPerHost = trans.config.maxHTTPIdleConnections
transformerClientConfig.TransportConfig.IdleConnTimeout = trans.config.maxIdleConnDuration
trans.httpClient = transformerclient.NewClient(transformerClientConfig)

for _, opt := range opts {
opt(&trans)
Expand Down Expand Up @@ -314,14 +287,6 @@ func (trans *handle) Validate(ctx context.Context, clientEvents []TransformerEve
return trans.transform(ctx, clientEvents, trans.trackingPlanValidationURL(), batchSize, trackingPlanValidationStage)
}

type HTTPLBTransport struct {
*http.Transport
}

func (t *HTTPLBTransport) NewRoundTripper(scheme, target string, config httplb.TransportConfig) httplb.RoundTripperResult {
return httplb.RoundTripperResult{RoundTripper: t.Transport, Close: t.CloseIdleConnections}
}

func (trans *handle) transform(
ctx context.Context,
clientEvents []TransformerEvent,
Expand Down Expand Up @@ -552,10 +517,23 @@ func (trans *handle) doPost(ctx context.Context, rawJSON []byte, url, stage stri

resp, reqErr = trans.httpClient.Do(req)
})
trans.stat.NewTaggedStat("processor.transformer_request_time", stats.TimerType, tags).SendTiming(time.Since(requestStartTime))
duration := time.Since(requestStartTime)
trans.stat.NewTaggedStat("processor.transformer_request_time", stats.TimerType, tags).SendTiming(duration)
if reqErr != nil {
return reqErr
}
headerResponseTime := resp.Header.Get("X-Response-Time")
instanceWorker := resp.Header.Get("X-Instance-ID")
if trans.config.collectInstanceLevelStats && instanceWorker != "" {
newTags := lo.Assign(tags)
newTags["instanceWorker"] = instanceWorker
dur := duration.Milliseconds()
headerTime, err := strconv.ParseFloat(strings.TrimSuffix(headerResponseTime, "ms"), 64)
if err == nil {
diff := float64(dur) - headerTime
trans.stat.NewTaggedStat("processor_transform_duration_diff_time", stats.TimerType, newTags).SendTiming(time.Duration(diff) * time.Millisecond)
}
}

defer func() { httputil.CloseResponse(resp) }()

Expand Down
18 changes: 15 additions & 3 deletions router/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
kitsync "github.com/rudderlabs/rudder-go-kit/sync"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
transformerclient "github.com/rudderlabs/rudder-server/internal/transformer-client"
"github.com/rudderlabs/rudder-server/processor/integrations"
"github.com/rudderlabs/rudder-server/router/types"
oauthv2 "github.com/rudderlabs/rudder-server/services/oauth/v2"
Expand All @@ -50,7 +51,7 @@ const (
type handle struct {
tr *http.Transport
// http client for router transformation request
client *http.Client
client sysUtils.HTTPClientI
// Mockable http.client for transformer proxy request
proxyClient sysUtils.HTTPClientI
// http client timeout for transformer proxy request
Expand Down Expand Up @@ -501,7 +502,17 @@ func (trans *handle) setup(destinationTimeout, transformTimeout time.Duration, c
// Basically this timeout we will configure when we make final call to destination to send event
trans.destinationTimeout = destinationTimeout
// This client is used for Router Transformation
trans.client = &http.Client{Transport: trans.tr, Timeout: trans.transformTimeout}
transformerClientConfig := &transformerclient.ClientConfig{
ClientTimeout: trans.transformTimeout,
ClientTTL: config.GetDuration("Transformer.Client.ttl", 10, time.Second),
ClientType: config.GetString("Transformer.Client.type", "stdlib"),
PickerType: config.GetString("Transformer.Client.httplb.pickerType", "power_of_two"),
}
transformerClientConfig.TransportConfig.DisableKeepAlives = config.GetBool("Transformer.Client.disableKeepAlives", true)
transformerClientConfig.TransportConfig.MaxConnsPerHost = config.GetInt("Transformer.Client.maxHTTPConnections", 100)
transformerClientConfig.TransportConfig.MaxIdleConnsPerHost = config.GetInt("Transformer.Client.maxHTTPIdleConnections", 10)
transformerClientConfig.TransportConfig.IdleConnTimeout = 30 * time.Second
trans.client = transformerclient.NewClient(transformerClientConfig)
optionalArgs := &oauthv2httpclient.HttpClientOptionalArgs{
Locker: locker,
Augmenter: extensions.RouterBodyAugmenter,
Expand All @@ -517,7 +528,8 @@ func (trans *handle) setup(destinationTimeout, transformTimeout time.Duration, c
Logger: logger.NewLogger().Child("TransformerProxyHttpClient"),
}
// This client is used for Transformer Proxy(delivered from transformer to destination)
trans.proxyClient = &http.Client{Transport: trans.tr, Timeout: trans.destinationTimeout + trans.transformTimeout}
transformerClientConfig.ClientTimeout = trans.destinationTimeout + trans.transformTimeout
trans.proxyClient = transformerclient.NewClient(transformerClientConfig)
// This client is used for Transformer Proxy(delivered from transformer to destination) using oauthV2
trans.proxyClientOAuthV2 = oauthv2httpclient.NewOAuthHttpClient(&http.Client{Transport: trans.tr, Timeout: trans.destinationTimeout + trans.transformTimeout}, common.RudderFlowDelivery, cache, backendConfig, GetAuthErrorCategoryFromTransformProxyResponse, proxyClientOptionalArgs)
trans.transformRequestTimerStat = stats.Default.NewStat("router.transformer_request_time", stats.TimerType)
Expand Down

0 comments on commit 54820a4

Please sign in to comment.