Skip to content

Commit

Permalink
Fixes after review
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv committed Mar 25, 2019
1 parent e2a020d commit d2bba2e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 53 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func runQuery(
},
dialOpts,
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
engine = promql.NewEngine(
promql.EngineOpts{
Expand Down
70 changes: 18 additions & 52 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/improbable-eng/thanos/pkg/strutil"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/labels"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -45,50 +44,28 @@ type ProxyStore struct {
component component.StoreAPI
selectorLabels labels.Labels

responseTimeout time.Duration
streamResponseDuration *prometheus.HistogramVec
streamDuration *prometheus.HistogramVec
responseTimeout time.Duration
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL)
func NewProxyStore(
logger log.Logger,
reg prometheus.Registerer,
stores func() []Client,
component component.StoreAPI,
selectorLabels labels.Labels,
responseTimeout time.Duration,
) *ProxyStore {
streamResponseDuration := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_query_stream_read_duration_seconds",
Help: "Time it takes to perform a single read from GRPC stream.",
Buckets: []float64{
0.005, 0.01, 0.02, 0.03, 0.04, 0.05, 0.1, 0.25, 0.6, 1, 2, 3.5, 5,
}}, []string{"store"})
streamDuration := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_query_stream_duration_seconds",
Help: "Time it takes to consume GRPC stream.",
Buckets: []float64{
0.01, 0.02, 0.03, 0.04, 0.05, 0.1, 0.25, 0.6, 1, 2, 3.5, 5, 7.5, 10, 15, 20,
}}, []string{"store"})
reg.MustRegister(
streamResponseDuration,
streamDuration,
)

if logger == nil {
logger = log.NewNopLogger()
}

s := &ProxyStore{
logger: logger,
stores: stores,
component: component,
selectorLabels: selectorLabels,
responseTimeout: responseTimeout,
streamResponseDuration: streamResponseDuration,
streamDuration: streamDuration,
logger: logger,
stores: stores,
component: component,
selectorLabels: selectorLabels,
responseTimeout: responseTimeout,
}
return s
}
Expand Down Expand Up @@ -198,7 +175,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
}

// Schedule streamSeriesSet that translates gRPC streamed response into seriesSet (if series) or respCh if warnings.
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries, wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, s.streamResponseDuration.WithLabelValues(st.Addr()), s.streamDuration.WithLabelValues(st.Addr())))
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries, wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout))
}

level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))
Expand Down Expand Up @@ -254,10 +231,8 @@ type streamSeriesSet struct {
name string
partialResponse bool

responseTimeout time.Duration
closeSeries context.CancelFunc
streamResponseDuration prometheus.Observer
streamDuration prometheus.Observer
responseTimeout time.Duration
closeSeries context.CancelFunc
}

func startStreamSeriesSet(
Expand All @@ -270,35 +245,26 @@ func startStreamSeriesSet(
name string,
partialResponse bool,
responseTimeout time.Duration,
streamResponseDuration prometheus.Observer,
streamDuration prometheus.Observer,
) *streamSeriesSet {
s := &streamSeriesSet{
ctx: ctx,
logger: logger,
closeSeries: closeSeries,
stream: stream,
warnCh: warnCh,
recvCh: make(chan *storepb.Series, 10),
name: name,
partialResponse: partialResponse,
responseTimeout: responseTimeout,
streamResponseDuration: streamResponseDuration,
streamDuration: streamDuration,
ctx: ctx,
logger: logger,
closeSeries: closeSeries,
stream: stream,
warnCh: warnCh,
recvCh: make(chan *storepb.Series, 10),
name: name,
partialResponse: partialResponse,
responseTimeout: responseTimeout,
}

wg.Add(1)
go func() {
defer wg.Done()
defer close(s.recvCh)

timer := prometheus.NewTimer(s.streamDuration)
defer timer.ObserveDuration()

for {
timer := prometheus.NewTimer(s.streamResponseDuration)
r, err := s.stream.Recv()
timer.ObserveDuration()

if err == io.EOF {
return
Expand Down

0 comments on commit d2bba2e

Please sign in to comment.