diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 865ea3878d1..fc9582055d1 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1107,6 +1107,11 @@ func (b *blockSeriesClient) Close() { runutil.CloseWithLogOnErr(b.logger, b.indexr, "series block") } +func (b *blockSeriesClient) CloseSend() error { + b.Close() + return nil +} + func (b *blockSeriesClient) MergeStats(stats *queryStats) *queryStats { stats.merge(b.indexr.stats) if !b.skipChunks { @@ -1574,8 +1579,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store tenant, ) - defer blockClient.Close() - g.Go(func() error { span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{ diff --git a/pkg/store/proxy_merge.go b/pkg/store/proxy_merge.go index 29d1e6560a3..2f4f9326053 100644 --- a/pkg/store/proxy_merge.go +++ b/pkg/store/proxy_merge.go @@ -446,8 +446,10 @@ func newAsyncRespSet( emptyStreamResponses prometheus.Counter, ) (respSet, error) { - var span opentracing.Span - var closeSeries context.CancelFunc + var ( + span opentracing.Span + cancel context.CancelFunc + ) storeID, storeAddr, isLocalStore := storeInfo(st) seriesCtx := grpc_opentracing.ClientAddContextTags(ctx, opentracing.Tags{ @@ -459,7 +461,7 @@ func newAsyncRespSet( "store.addr": storeAddr, }) - seriesCtx, closeSeries = context.WithCancel(seriesCtx) + seriesCtx, cancel = context.WithCancel(seriesCtx) shardMatcher := shardInfo.Matcher(buffers) @@ -474,7 +476,7 @@ func newAsyncRespSet( span.SetTag("err", err.Error()) span.Finish() - closeSeries() + cancel() return nil, err } @@ -490,6 +492,13 @@ func newAsyncRespSet( } } + closeSeries := func() { + cancel() + err := cl.CloseSend() + if err != nil { + level.Warn(logger).Log("msg", "detected close error", "err", err.Error()) + } + } switch retrievalStrategy { case LazyRetrieval: return newLazyRespSet( diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index e9e38894658..fcd7ddc5f63 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -2251,6 +2251,18 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { MinTime: math.MinInt64, MaxTime: math.MaxInt64, }, + &storetestutil.TestClient{ + StoreClient: storepb.ServerAsClient(&storeServerStub{ + delay: 50 * time.Millisecond, + responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }, + }), + MinTime: math.MinInt64, + MaxTime: math.MaxInt64, + }, } logger := log.NewNopLogger() @@ -2258,7 +2270,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { logger: logger, stores: func() []Client { return clients }, metrics: newProxyStoreMetrics(nil), - responseTimeout: 0, + responseTimeout: 50 * time.Millisecond, retrievalStrategy: EagerRetrieval, tsdbSelector: DefaultSelector, } @@ -2275,6 +2287,32 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { })) testutil.NotOk(t, ctx.Err()) }) + t.Run("client timeout", func(t *testing.T) { + ctx := context.Background() + testutil.NotOk(t, p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{ + ctx: ctx, + send: func(*storepb.SeriesResponse) error { + return nil + }, + })) + }) +} + +type storeServerStub struct { + storepb.StoreServer + + delay time.Duration + responses []*storepb.SeriesResponse +} + +func (m *storeServerStub) Series(_ *storepb.SeriesRequest, server storepb.Store_SeriesServer) error { + for _, r := range m.responses { + <-time.After(m.delay) + if err := server.Send(r); err != nil { + return err + } + } + return nil } func TestProxyStore_storeMatchMetadata(t *testing.T) { diff --git a/pkg/store/storepb/inprocess.go b/pkg/store/storepb/inprocess.go index a5b792bca19..0c3e7641baa 100644 --- a/pkg/store/storepb/inprocess.go +++ b/pkg/store/storepb/inprocess.go @@ -55,6 +55,9 @@ func (c *inProcessClient) Recv() (*SeriesResponse, error) { return nil, err } if !ok { + if c.ctx.Err() != nil { + return nil, c.ctx.Err() + } return nil, io.EOF } return resp, err diff --git a/pkg/store/storepb/testutil/store_series_client.go b/pkg/store/storepb/testutil/store_series_client.go index 2580692ca2a..e370f7c984b 100644 --- a/pkg/store/storepb/testutil/store_series_client.go +++ b/pkg/store/storepb/testutil/store_series_client.go @@ -50,3 +50,4 @@ func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) { func (c *StoreSeriesClient) Context() context.Context { return c.Ctx } +func (c *StoreSeriesClient) CloseSend() error { return nil }