From 54820a4086f63a53e899f6950689087c323866c0 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Mon, 10 Feb 2025 14:45:55 +0530 Subject: [PATCH] chore: make cslb configurable (#5451) --- internal/transformer-client/client.go | 117 ++++++++++++++++++++++++++ processor/transformer/transformer.go | 86 +++++++------------ router/transformer/transformer.go | 18 +++- 3 files changed, 164 insertions(+), 57 deletions(-) create mode 100644 internal/transformer-client/client.go diff --git a/internal/transformer-client/client.go b/internal/transformer-client/client.go new file mode 100644 index 0000000000..4a379ae076 --- /dev/null +++ b/internal/transformer-client/client.go @@ -0,0 +1,117 @@ +package transformerclient + +import ( + "context" + "net" + "net/http" + "time" + + "github.com/bufbuild/httplb" + "github.com/bufbuild/httplb/conn" + "github.com/bufbuild/httplb/picker" + "github.com/bufbuild/httplb/resolver" + + "github.com/rudderlabs/rudder-server/utils/sysUtils" +) + +type ClientConfig struct { + TransportConfig struct { + DisableKeepAlives bool // true + MaxConnsPerHost int // 100 + MaxIdleConnsPerHost int // 10 + IdleConnTimeout time.Duration // 30*time.Second + } + + ClientTimeout time.Duration // 600*time.Second + ClientTTL time.Duration // 10*time.Second + + ClientType string // stdlib(default), recycled, httplb + + PickerType string // power_of_two(default), round_robin, least_loaded_random, least_loaded_round_robin, random +} + +type Client interface { + Do(req *http.Request) (*http.Response, error) +} + +func NewClient(config *ClientConfig) Client { + transport := &http.Transport{ + DisableKeepAlives: true, + MaxConnsPerHost: 100, + MaxIdleConnsPerHost: 10, + IdleConnTimeout: 30 * time.Second, + } + client := &http.Client{ + Transport: transport, + Timeout: 600 * time.Second, + } + if config == nil { + return client + } + + transport.DisableKeepAlives = config.TransportConfig.DisableKeepAlives + if config.TransportConfig.MaxConnsPerHost != 0 { + transport.MaxConnsPerHost = config.TransportConfig.MaxConnsPerHost + } + if config.TransportConfig.MaxIdleConnsPerHost != 0 { + transport.MaxIdleConnsPerHost = config.TransportConfig.MaxIdleConnsPerHost + } + if config.TransportConfig.IdleConnTimeout != 0 { + transport.IdleConnTimeout = config.TransportConfig.IdleConnTimeout + } + + if config.ClientTimeout != 0 { + client.Timeout = config.ClientTimeout + } + + clientTTL := 10 * time.Second + if config.ClientTTL != 0 { + clientTTL = config.ClientTTL + } + + switch config.ClientType { + case "stdlib": + return client + case "recycled": + return sysUtils.NewRecycledHTTPClient(func() *http.Client { + return client + }, clientTTL) + case "httplb": + return httplb.NewClient( + httplb.WithRootContext(context.TODO()), + httplb.WithPicker(getPicker(config.PickerType)), + httplb.WithIdleConnectionTimeout(transport.IdleConnTimeout), + httplb.WithRequestTimeout(client.Timeout), + httplb.WithRoundTripperMaxLifetime(transport.IdleConnTimeout), + httplb.WithIdleTransportTimeout(2*transport.IdleConnTimeout), + httplb.WithResolver(resolver.NewDNSResolver(net.DefaultResolver, resolver.PreferIPv4, clientTTL)), + ) + default: + return client + } +} + +func getPicker(pickerType string) func(prev picker.Picker, allConns conn.Conns) picker.Picker { + switch pickerType { + case "power_of_two": + return picker.NewPowerOfTwo + case "round_robin": + return picker.NewRoundRobin + case "least_loaded_random": + return picker.NewLeastLoadedRandom + case "least_loaded_round_robin": + return picker.NewLeastLoadedRoundRobin + case "random": + return picker.NewRandom + default: + return picker.NewPowerOfTwo + } +} + +type HTTPLBTransport struct { + *http.Transport +} + +func (t *HTTPLBTransport) NewRoundTripper(scheme, target string, config httplb.TransportConfig) httplb.RoundTripperResult { + return httplb.RoundTripperResult{RoundTripper: t.Transport, Close: t.CloseIdleConnections} +} diff --git a/processor/transformer/transformer.go b/processor/transformer/transformer.go index ca2c6816ae..810d58aa8e 100644 --- a/processor/transformer/transformer.go +++ b/processor/transformer/transformer.go @@ -7,7 +7,6 @@ import ( "context" "fmt" "io" - "net" "net/http" "os" "runtime/trace" @@ -16,8 +15,6 @@ import ( "sync" "time" - "github.com/bufbuild/httplb" - "github.com/bufbuild/httplb/resolver" "github.com/cenkalti/backoff" jsoniter "github.com/json-iterator/go" "github.com/samber/lo" @@ -27,9 +24,9 @@ import ( "github.com/rudderlabs/rudder-go-kit/stats" backendconfig "github.com/rudderlabs/rudder-server/backend-config" + transformerclient "github.com/rudderlabs/rudder-server/internal/transformer-client" "github.com/rudderlabs/rudder-server/processor/integrations" "github.com/rudderlabs/rudder-server/utils/httputil" - "github.com/rudderlabs/rudder-server/utils/sysUtils" "github.com/rudderlabs/rudder-server/utils/types" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) @@ -202,11 +199,12 @@ type handle struct { guardConcurrency chan struct{} config struct { - maxConcurrency int - maxHTTPConnections int - maxHTTPIdleConnections int - maxIdleConnDuration time.Duration - disableKeepAlives bool + maxConcurrency int + maxHTTPConnections int + maxHTTPIdleConnections int + maxIdleConnDuration time.Duration + disableKeepAlives bool + collectInstanceLevelStats bool timeoutDuration time.Duration @@ -244,47 +242,22 @@ func NewTransformer(conf *config.Config, log logger.Logger, stat stats.Stats, op trans.config.maxRetry = conf.GetReloadableIntVar(30, 1, "Processor.maxRetry") trans.config.failOnUserTransformTimeout = conf.GetReloadableBoolVar(false, "Processor.Transformer.failOnUserTransformTimeout") trans.config.failOnError = conf.GetReloadableBoolVar(false, "Processor.Transformer.failOnError") - + trans.config.collectInstanceLevelStats = conf.GetBool("Processor.collectInstanceLevelStats", false) trans.config.maxRetryBackoffInterval = conf.GetReloadableDurationVar(30, time.Second, "Processor.Transformer.maxRetryBackoffInterval") trans.guardConcurrency = make(chan struct{}, trans.config.maxConcurrency) - clientType := conf.GetString("Transformer.Client.type", "stdlib") - - transport := &http.Transport{ - DisableKeepAlives: trans.config.disableKeepAlives, - MaxConnsPerHost: trans.config.maxHTTPConnections, - MaxIdleConnsPerHost: trans.config.maxHTTPIdleConnections, - IdleConnTimeout: trans.config.maxIdleConnDuration, - } - client := &http.Client{ - Transport: transport, - Timeout: trans.config.timeoutDuration, - } - - switch clientType { - case "stdlib": - trans.httpClient = client - case "recycled": - trans.httpClient = sysUtils.NewRecycledHTTPClient(func() *http.Client { - return client - }, config.GetDuration("Transformer.Client.ttl", 120, time.Second)) - case "httplb": - trans.httpClient = httplb.NewClient( - httplb.WithTransport("http", &HTTPLBTransport{ - Transport: transport, - }), - httplb.WithResolver( - resolver.NewDNSResolver( - net.DefaultResolver, - resolver.PreferIPv6, - config.GetDuration("Transformer.Client.ttl", 120, time.Second), // TTL value - ), - ), - ) - default: - panic(fmt.Sprintf("unknown transformer client type: %s", clientType)) + transformerClientConfig := &transformerclient.ClientConfig{ + ClientTimeout: trans.config.timeoutDuration, + ClientTTL: config.GetDuration("Transformer.Client.ttl", 10, time.Second), + ClientType: conf.GetString("Transformer.Client.type", "stdlib"), + PickerType: conf.GetString("Transformer.Client.httplb.pickerType", "power_of_two"), } + transformerClientConfig.TransportConfig.DisableKeepAlives = trans.config.disableKeepAlives + transformerClientConfig.TransportConfig.MaxConnsPerHost = trans.config.maxHTTPConnections + transformerClientConfig.TransportConfig.MaxIdleConnsPerHost = trans.config.maxHTTPIdleConnections + transformerClientConfig.TransportConfig.IdleConnTimeout = trans.config.maxIdleConnDuration + trans.httpClient = transformerclient.NewClient(transformerClientConfig) for _, opt := range opts { opt(&trans) @@ -314,14 +287,6 @@ func (trans *handle) Validate(ctx context.Context, clientEvents []TransformerEve return trans.transform(ctx, clientEvents, trans.trackingPlanValidationURL(), batchSize, trackingPlanValidationStage) } -type HTTPLBTransport struct { - *http.Transport -} - -func (t *HTTPLBTransport) NewRoundTripper(scheme, target string, config httplb.TransportConfig) httplb.RoundTripperResult { - return httplb.RoundTripperResult{RoundTripper: t.Transport, Close: t.CloseIdleConnections} -} - func (trans *handle) transform( ctx context.Context, clientEvents []TransformerEvent, @@ -552,10 +517,23 @@ func (trans *handle) doPost(ctx context.Context, rawJSON []byte, url, stage stri resp, reqErr = trans.httpClient.Do(req) }) - trans.stat.NewTaggedStat("processor.transformer_request_time", stats.TimerType, tags).SendTiming(time.Since(requestStartTime)) + duration := time.Since(requestStartTime) + trans.stat.NewTaggedStat("processor.transformer_request_time", stats.TimerType, tags).SendTiming(duration) if reqErr != nil { return reqErr } + headerResponseTime := resp.Header.Get("X-Response-Time") + instanceWorker := resp.Header.Get("X-Instance-ID") + if trans.config.collectInstanceLevelStats && instanceWorker != "" { + newTags := lo.Assign(tags) + newTags["instanceWorker"] = instanceWorker + dur := duration.Milliseconds() + headerTime, err := strconv.ParseFloat(strings.TrimSuffix(headerResponseTime, "ms"), 64) + if err == nil { + diff := float64(dur) - headerTime + trans.stat.NewTaggedStat("processor_transform_duration_diff_time", stats.TimerType, newTags).SendTiming(time.Duration(diff) * time.Millisecond) + } + } defer func() { httputil.CloseResponse(resp) }() diff --git a/router/transformer/transformer.go b/router/transformer/transformer.go index 3727b797a8..ad5fda6925 100644 --- a/router/transformer/transformer.go +++ b/router/transformer/transformer.go @@ -26,6 +26,7 @@ import ( kitsync "github.com/rudderlabs/rudder-go-kit/sync" backendconfig "github.com/rudderlabs/rudder-server/backend-config" + transformerclient "github.com/rudderlabs/rudder-server/internal/transformer-client" "github.com/rudderlabs/rudder-server/processor/integrations" "github.com/rudderlabs/rudder-server/router/types" oauthv2 "github.com/rudderlabs/rudder-server/services/oauth/v2" @@ -50,7 +51,7 @@ const ( type handle struct { tr *http.Transport // http client for router transformation request - client *http.Client + client sysUtils.HTTPClientI // Mockable http.client for transformer proxy request proxyClient sysUtils.HTTPClientI // http client timeout for transformer proxy request @@ -501,7 +502,17 @@ func (trans *handle) setup(destinationTimeout, transformTimeout time.Duration, c // Basically this timeout we will configure when we make final call to destination to send event trans.destinationTimeout = destinationTimeout // This client is used for Router Transformation - trans.client = &http.Client{Transport: trans.tr, Timeout: trans.transformTimeout} + transformerClientConfig := &transformerclient.ClientConfig{ + ClientTimeout: trans.transformTimeout, + ClientTTL: config.GetDuration("Transformer.Client.ttl", 10, time.Second), + ClientType: config.GetString("Transformer.Client.type", "stdlib"), + PickerType: config.GetString("Transformer.Client.httplb.pickerType", "power_of_two"), + } + transformerClientConfig.TransportConfig.DisableKeepAlives = config.GetBool("Transformer.Client.disableKeepAlives", true) + transformerClientConfig.TransportConfig.MaxConnsPerHost = config.GetInt("Transformer.Client.maxHTTPConnections", 100) + transformerClientConfig.TransportConfig.MaxIdleConnsPerHost = config.GetInt("Transformer.Client.maxHTTPIdleConnections", 10) + transformerClientConfig.TransportConfig.IdleConnTimeout = 30 * time.Second + trans.client = transformerclient.NewClient(transformerClientConfig) optionalArgs := &oauthv2httpclient.HttpClientOptionalArgs{ Locker: locker, Augmenter: extensions.RouterBodyAugmenter, @@ -517,7 +528,8 @@ func (trans *handle) setup(destinationTimeout, transformTimeout time.Duration, c Logger: logger.NewLogger().Child("TransformerProxyHttpClient"), } // This client is used for Transformer Proxy(delivered from transformer to destination) - trans.proxyClient = &http.Client{Transport: trans.tr, Timeout: trans.destinationTimeout + trans.transformTimeout} + transformerClientConfig.ClientTimeout = trans.destinationTimeout + trans.transformTimeout + trans.proxyClient = transformerclient.NewClient(transformerClientConfig) // This client is used for Transformer Proxy(delivered from transformer to destination) using oauthV2 trans.proxyClientOAuthV2 = oauthv2httpclient.NewOAuthHttpClient(&http.Client{Transport: trans.tr, Timeout: trans.destinationTimeout + trans.transformTimeout}, common.RudderFlowDelivery, cache, backendConfig, GetAuthErrorCategoryFromTransformProxyResponse, proxyClientOptionalArgs) trans.transformRequestTimerStat = stats.Default.NewStat("router.transformer_request_time", stats.TimerType)