Skip to content

Commit

Permalink
sidecar: allow custom http con pool size fix #1953 (#1969)
Browse files Browse the repository at this point in the history
Signed-off-by: Brett Jones <[email protected]>
  • Loading branch information
blockloop authored and squat committed Jan 8, 2020
1 parent 880c8f8 commit 8dbfe3d
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 9 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel

## Unreleased

- [#1969](https://github.com/thanos-io/thanos/pull/1969) Sidecar: allow setting http connection pool size via flags

### Fixed

- [#1919](https://github.com/thanos-io/thanos/issues/1919) Compactor: Fixed potential data loss when uploading older blocks, or upload taking long time while compactor is
Expand Down
19 changes: 17 additions & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"math"
"net/http"
"net/url"
"sync"
"time"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/exthttp"
thanosmodel "github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
Expand All @@ -30,6 +32,8 @@ import (
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"

"gopkg.in/alecthomas/kingpin.v2"
)

Expand All @@ -45,6 +49,9 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) {
promReadyTimeout := cmd.Flag("prometheus.ready_timeout", "Maximum time to wait for the Prometheus instance to start up").
Default("10m").Duration()

connectionPoolSize := cmd.Flag("receive.connection-pool-size", "Controls the http MaxIdleConns. Default is 0, which is unlimited").Int()
connectionPoolSizePerHost := cmd.Flag("receive.connection-pool-size-per-host", "Controls the http MaxIdleConnsPerHost").Default("100").Int()

dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB.").
Default("./data").String()

Expand Down Expand Up @@ -95,6 +102,8 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) {
*ignoreBlockSize,
component.Sidecar,
*minTime,
*connectionPoolSize,
*connectionPoolSizePerHost,
)
}
}
Expand All @@ -120,6 +129,8 @@ func runSidecar(
ignoreBlockSize bool,
comp component.Component,
limitMinTime thanosmodel.TimeOrDurationValue,
connectionPoolSize int,
connectionPoolSizePerHost int,
) error {
var m = &promMetadata{
promURL: promURL,
Expand Down Expand Up @@ -243,8 +254,12 @@ func runSidecar(
}

{
promStore, err := store.NewPrometheusStore(
logger, nil, promURL, component.Sidecar, m.Labels, m.Timestamps)
t := exthttp.NewTransport()
t.MaxIdleConnsPerHost = connectionPoolSizePerHost
t.MaxIdleConns = connectionPoolSize
c := &http.Client{Transport: tracing.HTTPTripperware(logger, t)}

promStore, err := store.NewPrometheusStore(logger, c, promURL, component.Sidecar, m.Labels, m.Timestamps)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down
5 changes: 5 additions & 0 deletions docs/components/sidecar.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ Flags:
--prometheus.ready_timeout=10m
Maximum time to wait for the Prometheus
instance to start up
--receive.connection-pool-size=RECEIVE.CONNECTION-POOL-SIZE
Controls the http MaxIdleConns. Default is 0,
which is unlimited
--receive.connection-pool-size-per-host=100
Controls the http MaxIdleConnsPerHost
--tsdb.path="./data" Data directory of TSDB.
--reloader.config-file="" Config file watched by the reloader.
--reloader.config-envsubst-file=""
Expand Down
24 changes: 24 additions & 0 deletions pkg/exthttp/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package exthttp

import (
"net"
"net/http"
"time"
)

// NewTransport creates a new http.Transport with default settings.
func NewTransport() *http.Transport {
return &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
}
3 changes: 2 additions & 1 deletion pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/exthttp"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
Expand Down Expand Up @@ -79,7 +80,7 @@ func NewPrometheusStore(
}
if client == nil {
client = &http.Client{
Transport: tracing.HTTPTripperware(logger, http.DefaultTransport),
Transport: tracing.HTTPTripperware(logger, exthttp.NewTransport()),
}
}
p := &PrometheusStore{
Expand Down
10 changes: 4 additions & 6 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
Expand Down Expand Up @@ -364,8 +365,7 @@ func TestPrometheusStore_Series_MatchExternalLabel_e2e(t *testing.T) {

proxy, err := NewPrometheusStore(nil, nil, u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 },
)
func() (int64, int64) { return 0, math.MaxInt64 })
testutil.Ok(t, err)
srv := newStoreSeriesServer(ctx)

Expand Down Expand Up @@ -410,8 +410,7 @@ func TestPrometheusStore_Info(t *testing.T) {

proxy, err := NewPrometheusStore(nil, nil, nil, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 123, 456 },
)
func() (int64, int64) { return 123, 456 })
testutil.Ok(t, err)

resp, err := proxy.Info(ctx, &storepb.InfoRequest{})
Expand Down Expand Up @@ -489,8 +488,7 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOfUint16_e2e(t

proxy, err := NewPrometheusStore(nil, nil, u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 },
)
func() (int64, int64) { return 0, math.MaxInt64 })
testutil.Ok(t, err)

// We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only.
Expand Down

0 comments on commit 8dbfe3d

Please sign in to comment.