diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go
index 5f2c3b5ab5fd8..e10e5af3012a6 100644
--- a/pkg/bloombuild/builder/builder.go
+++ b/pkg/bloombuild/builder/builder.go
@@ -335,7 +335,7 @@ func (b *Builder) processTask(
 		// Fetch blocks that aren't up to date but are in the desired fingerprint range
 		// to try and accelerate bloom creation.
 		level.Debug(logger).Log("msg", "loading series and blocks for gap", "blocks", len(gap.Blocks))
-		seriesItr, blocksIter, err := b.loadWorkForGap(ctx, task.Table, tenant, task.TSDB, gap)
+		seriesItr, blocksIter, err := b.loadWorkForGap(ctx, task.Table, gap)
 		if err != nil {
 			level.Error(logger).Log("msg", "failed to get series and blocks", "err", err)
 			return nil, fmt.Errorf("failed to get series and blocks: %w", err)
@@ -454,15 +454,9 @@ func (b *Builder) processTask(
 func (b *Builder) loadWorkForGap(
 	ctx context.Context,
 	table config.DayTable,
-	tenant string,
-	id tsdb.Identifier,
-	gap protos.GapWithBlocks,
+	gap protos.Gap,
 ) (iter.Iterator[*v1.Series], iter.CloseResetIterator[*v1.SeriesWithBlooms], error) {
-	// load a series iterator for the gap
-	seriesItr, err := b.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.Bounds)
-	if err != nil {
-		return nil, nil, errors.Wrap(err, "failed to load tsdb")
-	}
+	seriesItr := iter.NewCancelableIter[*v1.Series](ctx, iter.NewSliceIter[*v1.Series](gap.Series))
 
 	// load a blocks iterator for the gap
 	fetcher, err := b.bloomStore.Fetcher(table.ModelTime())
diff --git a/pkg/bloombuild/common/tsdb.go b/pkg/bloombuild/common/tsdb.go
index 8082a8b319a47..a2e22529523b2 100644
--- a/pkg/bloombuild/common/tsdb.go
+++ b/pkg/bloombuild/common/tsdb.go
@@ -9,7 +9,6 @@ import (
 	"strings"
 
 	"github.com/go-kit/log"
-	"github.com/go-kit/log/level"
 	"github.com/pkg/errors"
 	"github.com/prometheus/common/model"
 	"github.com/prometheus/prometheus/model/labels"
@@ -30,6 +29,11 @@ const (
 	gzipExtension = ".gz"
 )
 
+type ClosableForSeries interface {
+	sharding.ForSeries
+	Close() error
+}
+
 type TSDBStore interface {
 	UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error)
 	ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
@@ -38,8 +42,7 @@ type TSDBStore interface {
 		table config.DayTable,
 		tenant string,
 		id tsdb.Identifier,
-		bounds v1.FingerprintBounds,
-	) (iter.Iterator[*v1.Series], error)
+	) (ClosableForSeries, error)
 }
 
 // BloomTSDBStore is a wrapper around the storage.Client interface which
@@ -90,8 +93,7 @@ func (b *BloomTSDBStore) LoadTSDB(
 	table config.DayTable,
 	tenant string,
 	id tsdb.Identifier,
-	bounds v1.FingerprintBounds,
-) (iter.Iterator[*v1.Series], error) {
+) (ClosableForSeries, error) {
 	withCompression := id.Name() + gzipExtension
 
 	data, err := b.storage.GetUserFile(ctx, table.Addr(), tenant, withCompression)
@@ -118,13 +120,8 @@ func (b *BloomTSDBStore) LoadTSDB(
 	}
 
 	idx := tsdb.NewTSDBIndex(reader)
-	defer func() {
-		if err := idx.Close(); err != nil {
-			level.Error(b.logger).Log("msg", "failed to close index", "err", err)
-		}
-	}()
 
-	return NewTSDBSeriesIter(ctx, tenant, idx, bounds)
+	return idx, nil
 }
 
 func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[*v1.Series], error) {
@@ -251,12 +248,11 @@ func (s *TSDBStores) LoadTSDB(
 	table config.DayTable,
 	tenant string,
 	id tsdb.Identifier,
-	bounds v1.FingerprintBounds,
-) (iter.Iterator[*v1.Series], error) {
+) (ClosableForSeries, error) {
 	store, err := s.storeForPeriod(table.DayTime)
 	if err != nil {
 		return nil, err
 	}
 
-	return store.LoadTSDB(ctx, table, tenant, id, bounds)
+	return store.LoadTSDB(ctx, table, tenant, id)
 }
diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go
index 285795a8327d2..f65fdf59c9acb 100644
--- a/pkg/bloombuild/planner/planner.go
+++ b/pkg/bloombuild/planner/planner.go
@@ -365,6 +365,29 @@ func (p *Planner) computeTasks(
 		return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err)
 	}
 
+	// Resolve TSDBs
+	tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
+	if err != nil {
+		level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
+		return nil, nil, fmt.Errorf("failed to resolve tsdbs: %w", err)
+	}
+
+	if len(tsdbs) == 0 {
+		return nil, metas, nil
+	}
+
+	openTSDBs, err := openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs)
+	if err != nil {
+		return nil, nil, fmt.Errorf("failed to open all tsdbs: %w", err)
+	}
+	defer func() {
+		for idx, reader := range openTSDBs {
+			if err := reader.Close(); err != nil {
+				level.Error(logger).Log("msg", "failed to close index", "err", err, "tsdb", idx.Name())
+			}
+		}
+	}()
+
 	for _, ownershipRange := range ownershipRanges {
 		logger := log.With(logger, "ownership", ownershipRange.String())
 
@@ -372,7 +395,7 @@ func (p *Planner) computeTasks(
 		metasInBounds := bloomshipper.FilterMetasOverlappingBounds(metas, ownershipRange)
 
 		// Find gaps in the TSDBs for this tenant/table
-		gaps, err := p.findOutdatedGaps(ctx, tenant, table, ownershipRange, metasInBounds, logger)
+		gaps, err := p.findOutdatedGaps(ctx, tenant, openTSDBs, ownershipRange, metasInBounds, logger)
 		if err != nil {
 			level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err)
 			continue
@@ -453,6 +476,26 @@ func (p *Planner) processTenantTaskResults(
 	return tasksSucceed, nil
 }
 
+func openAllTSDBs(
+	ctx context.Context,
+	table config.DayTable,
+	tenant string,
+	store common.TSDBStore,
+	tsdbs []tsdb.SingleTenantTSDBIdentifier,
+) (map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, error) {
+	openTSDBs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, len(tsdbs))
+	for _, idx := range tsdbs {
+		tsdb, err := store.LoadTSDB(ctx, table, tenant, idx)
+		if err != nil {
+			return nil, fmt.Errorf("failed to load tsdb: %w", err)
+		}
+
+		openTSDBs[idx] = tsdb
+	}
+
+	return openTSDBs, nil
+}
+
 // deleteOutdatedMetasAndBlocks filters out the outdated metas from the `metas` argument and deletes them from the store.
 // It returns the up-to-date metas from the `metas` argument.
 func (p *Planner) deleteOutdatedMetasAndBlocks(
@@ -655,28 +698,17 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli
 //     This is a performance optimization to avoid expensive re-reindexing
 type blockPlan struct {
 	tsdb tsdb.SingleTenantTSDBIdentifier
-	gaps []protos.GapWithBlocks
+	gaps []protos.Gap
 }
 
 func (p *Planner) findOutdatedGaps(
 	ctx context.Context,
 	tenant string,
-	table config.DayTable,
+	tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
 	ownershipRange v1.FingerprintBounds,
 	metas []bloomshipper.Meta,
 	logger log.Logger,
 ) ([]blockPlan, error) {
-	// Resolve TSDBs
-	tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
-	if err != nil {
-		level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
-		return nil, fmt.Errorf("failed to resolve tsdbs: %w", err)
-	}
-
-	if len(tsdbs) == 0 {
-		return nil, nil
-	}
-
 	// Determine which TSDBs have gaps in the ownership range and need to
 	// be processed.
 	tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(ownershipRange, tsdbs, metas)
@@ -690,7 +722,7 @@ func (p *Planner) findOutdatedGaps(
 		return nil, nil
 	}
 
-	work, err := blockPlansForGaps(tsdbsWithGaps, metas)
+	work, err := blockPlansForGaps(ctx, tenant, tsdbsWithGaps, metas)
 	if err != nil {
 		level.Error(logger).Log("msg", "failed to create plan", "err", err)
 		return nil, fmt.Errorf("failed to create plan: %w", err)
@@ -701,18 +733,19 @@ func (p *Planner) findOutdatedGaps(
 
 // Used to signal the gaps that need to be populated for a tsdb
 type tsdbGaps struct {
-	tsdb tsdb.SingleTenantTSDBIdentifier
-	gaps []v1.FingerprintBounds
+	tsdbIdentifier tsdb.SingleTenantTSDBIdentifier
+	tsdb           common.ClosableForSeries
+	gaps           []v1.FingerprintBounds
 }
 
 // gapsBetweenTSDBsAndMetas returns if the metas are up-to-date with the TSDBs. This is determined by asserting
 // that for each TSDB, there are metas covering the entire ownership range which were generated from that specific TSDB.
 func gapsBetweenTSDBsAndMetas(
 	ownershipRange v1.FingerprintBounds,
-	tsdbs []tsdb.SingleTenantTSDBIdentifier,
+	tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
 	metas []bloomshipper.Meta,
 ) (res []tsdbGaps, err error) {
-	for _, db := range tsdbs {
+	for db, tsdb := range tsdbs {
 		id := db.Name()
 
 		relevantMetas := make([]v1.FingerprintBounds, 0, len(metas))
@@ -731,8 +764,9 @@ func gapsBetweenTSDBsAndMetas(
 
 		if len(gaps) > 0 {
 			res = append(res, tsdbGaps{
-				tsdb: db,
-				gaps: gaps,
+				tsdbIdentifier: db,
+				tsdb:           tsdb,
+				gaps:           gaps,
 			})
 		}
 	}
@@ -743,22 +777,35 @@ func gapsBetweenTSDBsAndMetas(
 // blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks.
 // This allows us to expedite bloom generation by using existing blocks to fill in the gaps
 // since many will contain the same chunks.
-func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan, error) {
+func blockPlansForGaps(
+	ctx context.Context,
+	tenant string,
+	tsdbs []tsdbGaps,
+	metas []bloomshipper.Meta,
+) ([]blockPlan, error) {
 	plans := make([]blockPlan, 0, len(tsdbs))
 
 	for _, idx := range tsdbs {
 		plan := blockPlan{
-			tsdb: idx.tsdb,
-			gaps: make([]protos.GapWithBlocks, 0, len(idx.gaps)),
+			tsdb: idx.tsdbIdentifier,
+			gaps: make([]protos.Gap, 0, len(idx.gaps)),
 		}
 
 		for _, gap := range idx.gaps {
-			planGap := protos.GapWithBlocks{
+			planGap := protos.Gap{
 				Bounds: gap,
 			}
 
-			for _, meta := range metas {
+			seriesItr, err := common.NewTSDBSeriesIter(ctx, tenant, idx.tsdb, gap)
+			if err != nil {
+				return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.String(), err)
+			}
+			planGap.Series, err = iter.Collect(seriesItr)
+			if err != nil {
+				return nil, fmt.Errorf("failed to collect series: %w", err)
+			}
 
+			for _, meta := range metas {
 				if meta.Bounds.Intersection(gap) == nil {
 					// this meta doesn't overlap the gap, skip
 					continue
diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go
index ca5c1d0c15b09..88e45c725917e 100644
--- a/pkg/bloombuild/planner/planner_test.go
+++ b/pkg/bloombuild/planner/planner_test.go
@@ -16,10 +16,12 @@ import (
 	"github.com/pkg/errors"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/common/model"
+	"github.com/prometheus/prometheus/model/labels"
 	"github.com/stretchr/testify/require"
 	"go.uber.org/atomic"
 	"google.golang.org/grpc"
 
+	"github.com/grafana/loki/v3/pkg/bloombuild/common"
 	"github.com/grafana/loki/v3/pkg/bloombuild/protos"
 	"github.com/grafana/loki/v3/pkg/chunkenc"
 	iter "github.com/grafana/loki/v3/pkg/iter/v2"
@@ -31,6 +33,7 @@ import (
 	"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
 	bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
 	"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
+	"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
 	"github.com/grafana/loki/v3/pkg/storage/types"
 	"github.com/grafana/loki/v3/pkg/util/mempool"
 )
@@ -68,14 +71,16 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) {
 		err            bool
 		exp            []tsdbGaps
 		ownershipRange v1.FingerprintBounds
-		tsdbs          []tsdb.SingleTenantTSDBIdentifier
+		tsdbs          map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries
 		metas          []bloomshipper.Meta
 	}{
 		{
 			desc:           "non-overlapping tsdbs and metas",
 			err:            true,
 			ownershipRange: v1.NewBounds(0, 10),
-			tsdbs:          []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)},
+			tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{
+				tsdbID(0): nil,
+			},
 			metas: []bloomshipper.Meta{
 				genMeta(11, 20, []int{0}, nil),
 			},
@@ -83,13 +88,15 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) {
 		{
 			desc:           "single tsdb",
 			ownershipRange: v1.NewBounds(0, 10),
-			tsdbs:          []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)},
+			tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{
+				tsdbID(0): nil,
+			},
 			metas: []bloomshipper.Meta{
 				genMeta(4, 8, []int{0}, nil),
 			},
 			exp: []tsdbGaps{
 				{
-					tsdb: tsdbID(0),
+					tsdbIdentifier: tsdbID(0),
 					gaps: []v1.FingerprintBounds{
 						v1.NewBounds(0, 3),
 						v1.NewBounds(9, 10),
@@ -100,20 +107,23 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) {
 		{
 			desc:           "multiple tsdbs with separate blocks",
 			ownershipRange: v1.NewBounds(0, 10),
-			tsdbs:          []tsdb.SingleTenantTSDBIdentifier{tsdbID(0), tsdbID(1)},
+			tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{
+				tsdbID(0): nil,
+				tsdbID(1): nil,
+			},
 			metas: []bloomshipper.Meta{
 				genMeta(0, 5, []int{0}, nil),
 				genMeta(6, 10, []int{1}, nil),
 			},
 			exp: []tsdbGaps{
 				{
-					tsdb: tsdbID(0),
+					tsdbIdentifier: tsdbID(0),
 					gaps: []v1.FingerprintBounds{
 						v1.NewBounds(6, 10),
 					},
 				},
 				{
-					tsdb: tsdbID(1),
+					tsdbIdentifier: tsdbID(1),
 					gaps: []v1.FingerprintBounds{
 						v1.NewBounds(0, 5),
 					},
@@ -123,20 +133,23 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) {
 		{
 			desc:           "multiple tsdbs with the same blocks",
 			ownershipRange: v1.NewBounds(0, 10),
-			tsdbs:          []tsdb.SingleTenantTSDBIdentifier{tsdbID(0), tsdbID(1)},
+			tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{
+				tsdbID(0): nil,
+				tsdbID(1): nil,
+			},
 			metas: []bloomshipper.Meta{
 				genMeta(0, 5, []int{0, 1}, nil),
 				genMeta(6, 8, []int{1}, nil),
 			},
 			exp: []tsdbGaps{
 				{
-					tsdb: tsdbID(0),
+					tsdbIdentifier: tsdbID(0),
 					gaps: []v1.FingerprintBounds{
 						v1.NewBounds(6, 10),
 					},
 				},
 				{
-					tsdb: tsdbID(1),
+					tsdbIdentifier: tsdbID(1),
 					gaps: []v1.FingerprintBounds{
 						v1.NewBounds(9, 10),
 					},
@@ -220,9 +233,10 @@ func Test_blockPlansForGaps(t *testing.T) {
 			exp: []blockPlan{
 				{
 					tsdb: tsdbID(0),
-					gaps: []protos.GapWithBlocks{
+					gaps: []protos.Gap{
 						{
 							Bounds: v1.NewBounds(0, 10),
+							Series: genSeries(v1.NewBounds(0, 10)),
 						},
 					},
 				},
@@ -238,9 +252,10 @@ func Test_blockPlansForGaps(t *testing.T) {
 			exp: []blockPlan{
 				{
 					tsdb: tsdbID(0),
-					gaps: []protos.GapWithBlocks{
+					gaps: []protos.Gap{
 						{
 							Bounds: v1.NewBounds(0, 10),
+							Series: genSeries(v1.NewBounds(0, 10)),
 							Blocks: []bloomshipper.BlockRef{genBlockRef(9, 20)},
 						},
 					},
@@ -261,9 +276,10 @@ func Test_blockPlansForGaps(t *testing.T) {
 			exp: []blockPlan{
 				{
 					tsdb: tsdbID(0),
-					gaps: []protos.GapWithBlocks{
+					gaps: []protos.Gap{
 						{
 							Bounds: v1.NewBounds(0, 8),
+							Series: genSeries(v1.NewBounds(0, 8)),
 						},
 					},
 				},
@@ -280,9 +296,10 @@ func Test_blockPlansForGaps(t *testing.T) {
 			exp: []blockPlan{
 				{
 					tsdb: tsdbID(0),
-					gaps: []protos.GapWithBlocks{
+					gaps: []protos.Gap{
 						{
 							Bounds: v1.NewBounds(0, 8),
+							Series: genSeries(v1.NewBounds(0, 8)),
 							Blocks: []bloomshipper.BlockRef{genBlockRef(5, 20)},
 						},
 					},
@@ -306,14 +323,16 @@ func Test_blockPlansForGaps(t *testing.T) {
 			exp: []blockPlan{
 				{
 					tsdb: tsdbID(0),
-					gaps: []protos.GapWithBlocks{
+					gaps: []protos.Gap{
 						// tsdb (id=0) can source chunks from the blocks built from tsdb (id=1)
 						{
 							Bounds: v1.NewBounds(3, 5),
+							Series: genSeries(v1.NewBounds(3, 5)),
 							Blocks: []bloomshipper.BlockRef{genBlockRef(3, 5)},
 						},
 						{
 							Bounds: v1.NewBounds(9, 10),
+							Series: genSeries(v1.NewBounds(9, 10)),
 							Blocks: []bloomshipper.BlockRef{genBlockRef(8, 10)},
 						},
 					},
@@ -321,9 +340,10 @@ func Test_blockPlansForGaps(t *testing.T) {
 				// tsdb (id=1) can source chunks from the blocks built from tsdb (id=0)
 				{
 					tsdb: tsdbID(1),
-					gaps: []protos.GapWithBlocks{
+					gaps: []protos.Gap{
 						{
 							Bounds: v1.NewBounds(0, 2),
+							Series: genSeries(v1.NewBounds(0, 2)),
 							Blocks: []bloomshipper.BlockRef{
 								genBlockRef(0, 1),
 								genBlockRef(1, 2),
@@ -331,6 +351,7 @@ func Test_blockPlansForGaps(t *testing.T) {
 						},
 						{
 							Bounds: v1.NewBounds(6, 7),
+							Series: genSeries(v1.NewBounds(6, 7)),
 							Blocks: []bloomshipper.BlockRef{genBlockRef(6, 8)},
 						},
 					},
@@ -354,9 +375,10 @@ func Test_blockPlansForGaps(t *testing.T) {
 			exp: []blockPlan{
 				{
 					tsdb: tsdbID(0),
-					gaps: []protos.GapWithBlocks{
+					gaps: []protos.Gap{
 						{
 							Bounds: v1.NewBounds(0, 10),
+							Series: genSeries(v1.NewBounds(0, 10)),
 							Blocks: []bloomshipper.BlockRef{
 								genBlockRef(1, 4),
 								genBlockRef(5, 10),
@@ -369,20 +391,86 @@ func Test_blockPlansForGaps(t *testing.T) {
 		},
 	} {
 		t.Run(tc.desc, func(t *testing.T) {
+			// We add series spanning the whole FP ownership range
+			tsdbs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries)
+			for _, id := range tc.tsdbs {
+				tsdbs[id] = newFakeForSeries(genSeries(tc.ownershipRange))
+			}
+
 			// we reuse the gapsBetweenTSDBsAndMetas function to generate the gaps as this function is tested
 			// separately and it's used to generate input in our regular code path (easier to write tests this way).
-			gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tc.tsdbs, tc.metas)
+			gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tsdbs, tc.metas)
 			require.NoError(t, err)
 
-			plans, err := blockPlansForGaps(gaps, tc.metas)
+			plans, err := blockPlansForGaps(
+				context.Background(),
+				"fakeTenant",
+				gaps,
+				tc.metas,
+			)
 			if tc.err {
 				require.Error(t, err)
 				return
 			}
 			require.Equal(t, tc.exp, plans)
+		})
+	}
+}
 
+func genSeries(bounds v1.FingerprintBounds) []*v1.Series {
+	series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1))
+	for i := bounds.Min; i <= bounds.Max; i++ {
+		series = append(series, &v1.Series{
+			Fingerprint: i,
+			Chunks: v1.ChunkRefs{
+				{
+					From:     0,
+					Through:  1,
+					Checksum: 1,
+				},
+			},
 		})
 	}
+	return series
+}
+
+type fakeForSeries struct {
+	series []*v1.Series
+}
+
+func newFakeForSeries(series []*v1.Series) *fakeForSeries {
+	return &fakeForSeries{
+		series: series,
+	}
+}
+
+func (f fakeForSeries) ForSeries(_ context.Context, _ string, ff index.FingerprintFilter, _ model.Time, _ model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) (stop bool), _ ...*labels.Matcher) error {
+	overlapping := make([]*v1.Series, 0, len(f.series))
+	for _, s := range f.series {
+		if ff.Match(s.Fingerprint) {
+			overlapping = append(overlapping, s)
+		}
+	}
+
+	for _, s := range overlapping {
+		chunks := make([]index.ChunkMeta, 0, len(s.Chunks))
+		for _, c := range s.Chunks {
+			chunks = append(chunks, index.ChunkMeta{
+				MinTime:  int64(c.From),
+				MaxTime:  int64(c.Through),
+				Checksum: c.Checksum,
+			})
+		}
+
+		if fn(labels.EmptyLabels(), s.Fingerprint, chunks) {
+			break
+		}
+	}
+	return nil
+}
+
+func (f fakeForSeries) Close() error {
+	return nil
 }
 
 func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask {
diff --git a/pkg/bloombuild/protos/compat.go b/pkg/bloombuild/protos/compat.go
index ad7c492cc5fc9..468278e77dbea 100644
--- a/pkg/bloombuild/protos/compat.go
+++ b/pkg/bloombuild/protos/compat.go
@@ -7,14 +7,16 @@ import (
 	"github.com/pkg/errors"
 	"github.com/prometheus/common/model"
 
+	"github.com/grafana/loki/v3/pkg/logproto"
 	v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
 	"github.com/grafana/loki/v3/pkg/storage/config"
 	"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
 	"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
 )
 
-type GapWithBlocks struct {
+type Gap struct {
 	Bounds v1.FingerprintBounds
+	Series []*v1.Series
 	Blocks []bloomshipper.BlockRef
 }
 
@@ -25,7 +27,7 @@ type Task struct {
 	Tenant          string
 	OwnershipBounds v1.FingerprintBounds
 	TSDB            tsdb.SingleTenantTSDBIdentifier
-	Gaps            []GapWithBlocks
+	Gaps            []Gap
 }
 
 func NewTask(
@@ -33,10 +35,10 @@ func NewTask(
 	tenant string,
 	bounds v1.FingerprintBounds,
 	tsdb tsdb.SingleTenantTSDBIdentifier,
-	gaps []GapWithBlocks,
+	gaps []Gap,
 ) *Task {
 	return &Task{
-		ID: fmt.Sprintf("%s-%s-%s-%d-%d", table.Addr(), tenant, bounds.String(), tsdb.Checksum, len(gaps)),
+		ID: fmt.Sprintf("%s-%s-%s-%d", table.Addr(), tenant, bounds.String(), len(gaps)),
 
 		Table:           table,
 		Tenant:          tenant,
@@ -56,12 +58,25 @@ func FromProtoTask(task *ProtoTask) (*Task, error) {
 		return nil, fmt.Errorf("failed to parse tsdb path %s", task.Tsdb)
 	}
 
-	gaps := make([]GapWithBlocks, 0, len(task.Gaps))
+	gaps := make([]Gap, 0, len(task.Gaps))
 	for _, gap := range task.Gaps {
 		bounds := v1.FingerprintBounds{
 			Min: gap.Bounds.Min,
 			Max: gap.Bounds.Max,
 		}
+
+		series := make([]*v1.Series, 0, len(gap.Series))
+		for _, s := range gap.Series {
+			chunks := make(v1.ChunkRefs, 0, len(s.Chunks))
+			for _, c := range s.Chunks {
+				chunks = append(chunks, v1.ChunkRef(*c))
+			}
+			series = append(series, &v1.Series{
+				Fingerprint: model.Fingerprint(s.Fingerprint),
+				Chunks:      chunks,
+			})
+		}
+
 		blocks := make([]bloomshipper.BlockRef, 0, len(gap.BlockRef))
 		for _, block := range gap.BlockRef {
 			b, err := bloomshipper.BlockRefFromKey(block)
@@ -71,8 +86,9 @@ func FromProtoTask(task *ProtoTask) (*Task, error) {
 
 			blocks = append(blocks, b)
 		}
-		gaps = append(gaps, GapWithBlocks{
+		gaps = append(gaps, Gap{
 			Bounds: bounds,
+			Series: series,
 			Blocks: blocks,
 		})
 	}
@@ -102,11 +118,26 @@ func (t *Task) ToProtoTask() *ProtoTask {
 			blockRefs = append(blockRefs, block.String())
 		}
 
+		series := make([]*ProtoSeries, 0, len(gap.Series))
+		for _, s := range gap.Series {
+			chunks := make([]*logproto.ShortRef, 0, len(s.Chunks))
+			for _, c := range s.Chunks {
+				chunk := logproto.ShortRef(c)
+				chunks = append(chunks, &chunk)
+			}
+
+			series = append(series, &ProtoSeries{
+				Fingerprint: uint64(s.Fingerprint),
+				Chunks:      chunks,
+			})
+		}
+
 		protoGaps = append(protoGaps, &ProtoGapWithBlocks{
 			Bounds: ProtoFingerprintBounds{
 				Min: gap.Bounds.Min,
 				Max: gap.Bounds.Max,
 			},
+			Series:   series,
 			BlockRef: blockRefs,
 		})
 	}
diff --git a/pkg/bloombuild/protos/types.pb.go b/pkg/bloombuild/protos/types.pb.go
index e528aa61e9178..f355b64711168 100644
--- a/pkg/bloombuild/protos/types.pb.go
+++ b/pkg/bloombuild/protos/types.pb.go
@@ -7,6 +7,7 @@ import (
 	fmt "fmt"
 	_ "github.com/gogo/protobuf/gogoproto"
 	proto "github.com/gogo/protobuf/proto"
+	logproto "github.com/grafana/loki/v3/pkg/logproto"
 	github_com_prometheus_common_model "github.com/prometheus/common/model"
 	io "io"
 	math "math"
@@ -131,15 +132,67 @@ func (m *DayTable) GetPrefix() string {
 	return ""
 }
 
+type ProtoSeries struct {
+	Fingerprint uint64               `protobuf:"varint,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"`
+	Chunks      []*logproto.ShortRef `protobuf:"bytes,2,rep,name=chunks,proto3" json:"chunks,omitempty"`
+}
+
+func (m *ProtoSeries) Reset()      { *m = ProtoSeries{} }
+func (*ProtoSeries) ProtoMessage() {}
+func (*ProtoSeries) Descriptor() ([]byte, []int) {
+	return fileDescriptor_5325fb0610e1e9ae, []int{2}
+}
+func (m *ProtoSeries) XXX_Unmarshal(b []byte) error {
+	return m.Unmarshal(b)
+}
+func (m *ProtoSeries) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	if deterministic {
+		return xxx_messageInfo_ProtoSeries.Marshal(b, m, deterministic)
+	} else {
+		b = b[:cap(b)]
+		n, err := m.MarshalToSizedBuffer(b)
+		if err != nil {
+			return nil, err
+		}
+		return b[:n], nil
+	}
+}
+func (m *ProtoSeries) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_ProtoSeries.Merge(m, src)
+}
+func (m *ProtoSeries) XXX_Size() int {
+	return m.Size()
+}
+func (m *ProtoSeries) XXX_DiscardUnknown() {
+	xxx_messageInfo_ProtoSeries.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ProtoSeries proto.InternalMessageInfo
+
+func (m *ProtoSeries) GetFingerprint() uint64 {
+	if m != nil {
+		return m.Fingerprint
+	}
+	return 0
+}
+
+func (m *ProtoSeries) GetChunks() []*logproto.ShortRef {
+	if m != nil {
+		return m.Chunks
+	}
+	return nil
+}
+
 type ProtoGapWithBlocks struct {
 	Bounds   ProtoFingerprintBounds `protobuf:"bytes,1,opt,name=bounds,proto3" json:"bounds"`
-	BlockRef []string               `protobuf:"bytes,2,rep,name=blockRef,proto3" json:"blockRef,omitempty"`
+	Series   []*ProtoSeries         `protobuf:"bytes,2,rep,name=series,proto3" json:"series,omitempty"`
+	BlockRef []string               `protobuf:"bytes,3,rep,name=blockRef,proto3" json:"blockRef,omitempty"`
 }
 
 func (m *ProtoGapWithBlocks) Reset()      { *m = ProtoGapWithBlocks{} }
 func (*ProtoGapWithBlocks) ProtoMessage() {}
 func (*ProtoGapWithBlocks) Descriptor() ([]byte, []int) {
-	return fileDescriptor_5325fb0610e1e9ae, []int{2}
+	return fileDescriptor_5325fb0610e1e9ae, []int{3}
 }
 func (m *ProtoGapWithBlocks) XXX_Unmarshal(b []byte) error {
 	return m.Unmarshal(b)
@@ -175,6 +228,13 @@ func (m *ProtoGapWithBlocks) GetBounds() ProtoFingerprintBounds {
 	return ProtoFingerprintBounds{}
 }
 
+func (m *ProtoGapWithBlocks) GetSeries() []*ProtoSeries {
+	if m != nil {
+		return m.Series
+	}
+	return nil
+}
+
 func (m *ProtoGapWithBlocks) GetBlockRef() []string {
 	if m != nil {
 		return m.BlockRef
@@ -197,7 +257,7 @@ type ProtoTask struct {
 func (m *ProtoTask) Reset()      { *m = ProtoTask{} }
 func (*ProtoTask) ProtoMessage() {}
 func (*ProtoTask) Descriptor() ([]byte, []int) {
-	return fileDescriptor_5325fb0610e1e9ae, []int{3}
+	return fileDescriptor_5325fb0610e1e9ae, []int{4}
 }
 func (m *ProtoTask) XXX_Unmarshal(b []byte) error {
 	return m.Unmarshal(b)
@@ -277,7 +337,7 @@ type ProtoMeta struct {
 func (m *ProtoMeta) Reset()      { *m = ProtoMeta{} }
 func (*ProtoMeta) ProtoMessage() {}
 func (*ProtoMeta) Descriptor() ([]byte, []int) {
-	return fileDescriptor_5325fb0610e1e9ae, []int{4}
+	return fileDescriptor_5325fb0610e1e9ae, []int{5}
 }
 func (m *ProtoMeta) XXX_Unmarshal(b []byte) error {
 	return m.Unmarshal(b)
@@ -336,7 +396,7 @@ type ProtoTaskResult struct {
 func (m *ProtoTaskResult) Reset()      { *m = ProtoTaskResult{} }
 func (*ProtoTaskResult) ProtoMessage() {}
 func (*ProtoTaskResult) Descriptor() ([]byte, []int) {
-	return fileDescriptor_5325fb0610e1e9ae, []int{5}
+	return fileDescriptor_5325fb0610e1e9ae, []int{6}
 }
 func (m *ProtoTaskResult) XXX_Unmarshal(b []byte) error {
 	return m.Unmarshal(b)
@@ -389,6 +449,7 @@ func (m *ProtoTaskResult) GetCreatedMetas() []*ProtoMeta {
 func init() {
 	proto.RegisterType((*ProtoFingerprintBounds)(nil), "protos.ProtoFingerprintBounds")
 	proto.RegisterType((*DayTable)(nil), "protos.DayTable")
+	proto.RegisterType((*ProtoSeries)(nil), "protos.ProtoSeries")
 	proto.RegisterType((*ProtoGapWithBlocks)(nil), "protos.ProtoGapWithBlocks")
 	proto.RegisterType((*ProtoTask)(nil), "protos.ProtoTask")
 	proto.RegisterType((*ProtoMeta)(nil), "protos.ProtoMeta")
@@ -398,42 +459,47 @@ func init() {
 func init() { proto.RegisterFile("pkg/bloombuild/protos/types.proto", fileDescriptor_5325fb0610e1e9ae) }
 
 var fileDescriptor_5325fb0610e1e9ae = []byte{
-	// 551 bytes of a gzipped FileDescriptorProto
-	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0xb1, 0x6f, 0xd3, 0x4e,
-	0x18, 0xb5, 0xe3, 0x34, 0xbf, 0xe6, 0xd2, 0x5f, 0x81, 0x53, 0x55, 0x59, 0x11, 0xba, 0x04, 0x0f,
-	0x28, 0x93, 0x2d, 0x05, 0x75, 0x40, 0x62, 0xb2, 0xa2, 0x22, 0x40, 0x95, 0xd0, 0x35, 0x12, 0x12,
-	0xdb, 0x39, 0xbe, 0x3a, 0x56, 0x6c, 0x9f, 0xe5, 0x3b, 0xa3, 0x64, 0xe3, 0x4f, 0xe0, 0xcf, 0x60,
-	0xe6, 0xaf, 0xe8, 0x98, 0xb1, 0x53, 0x44, 0x9c, 0x05, 0x75, 0xea, 0xc4, 0xc0, 0x84, 0xee, 0xce,
-	0x29, 0x09, 0x62, 0x82, 0xe9, 0xbe, 0xf7, 0xdd, 0x77, 0xef, 0x7b, 0xef, 0xc9, 0x06, 0x4f, 0xf2,
-	0x59, 0xe4, 0x05, 0x09, 0x63, 0x69, 0x50, 0xc6, 0x49, 0xe8, 0xe5, 0x05, 0x13, 0x8c, 0x7b, 0x62,
-	0x91, 0x53, 0xee, 0x2a, 0x00, 0x5b, 0xba, 0xd7, 0x3d, 0x89, 0x58, 0xc4, 0x54, 0xed, 0xc9, 0x4a,
-	0xdf, 0x3a, 0x5f, 0x4c, 0x70, 0xfa, 0x56, 0x56, 0xe7, 0x71, 0x16, 0xd1, 0x22, 0x2f, 0xe2, 0x4c,
-	0xf8, 0xac, 0xcc, 0x42, 0x0e, 0xdf, 0x00, 0x2b, 0x8d, 0x33, 0xdb, 0xec, 0x9b, 0x83, 0xa6, 0xff,
-	0xfc, 0x76, 0xd5, 0x93, 0xf0, 0xc7, 0xaa, 0xe7, 0x46, 0xb1, 0x98, 0x96, 0x81, 0x3b, 0x61, 0xa9,
-	0xdc, 0x97, 0x52, 0x31, 0xa5, 0x25, 0xf7, 0x26, 0x2c, 0x4d, 0x59, 0xe6, 0xa5, 0x2c, 0xa4, 0x89,
-	0xbb, 0xc3, 0x86, 0xe5, 0x33, 0x45, 0x46, 0xe6, 0x76, 0x63, 0x87, 0x8c, 0xcc, 0xff, 0x8a, 0x8c,
-	0xcc, 0x9d, 0xd7, 0xe0, 0x70, 0x44, 0x16, 0x63, 0x12, 0x24, 0x14, 0x3e, 0x05, 0xc7, 0x21, 0x59,
-	0x8c, 0xe3, 0x94, 0x72, 0x41, 0xd2, 0xfc, 0xe2, 0x52, 0x09, 0xb6, 0xf0, 0x6f, 0x5d, 0x78, 0x0a,
-	0x5a, 0x79, 0x41, 0xaf, 0x62, 0xad, 0xa1, 0x8d, 0x6b, 0xe4, 0xcc, 0x01, 0x54, 0xfe, 0x5f, 0x92,
-	0xfc, 0x5d, 0x2c, 0xa6, 0x7e, 0xc2, 0x26, 0x33, 0x0e, 0xcf, 0x41, 0x2b, 0x50, 0x29, 0x28, 0xb6,
-	0xce, 0x10, 0xe9, 0xb8, 0xb8, 0xfb, 0xe7, 0xac, 0xfc, 0xe3, 0xeb, 0x55, 0xcf, 0xb8, 0x5d, 0xf5,
-	0xea, 0x57, 0xb8, 0x3e, 0x61, 0x17, 0x1c, 0x06, 0x92, 0x11, 0xd3, 0x2b, 0xbb, 0xd1, 0xb7, 0x06,
-	0x6d, 0x7c, 0x8f, 0x9d, 0xef, 0x26, 0x68, 0x2b, 0xba, 0x31, 0xe1, 0x33, 0x78, 0x0c, 0x1a, 0x71,
-	0xa8, 0xb6, 0xb5, 0x71, 0x23, 0x0e, 0xe1, 0x19, 0x38, 0x10, 0xd2, 0xa0, 0x92, 0xdb, 0x19, 0x3e,
-	0xdc, 0x0a, 0xd8, 0x1a, 0xf7, 0xff, 0xaf, 0x57, 0xea, 0x31, 0xac, 0x0f, 0x69, 0x53, 0xd0, 0x8c,
-	0x64, 0xc2, 0xb6, 0xb4, 0x4d, 0x8d, 0x76, 0x0c, 0x35, 0xff, 0xc9, 0x10, 0x04, 0x4d, 0xc1, 0xc3,
-	0xc0, 0x3e, 0x50, 0xec, 0xaa, 0x86, 0x2e, 0x68, 0x46, 0x24, 0xe7, 0x76, 0xab, 0x6f, 0x0d, 0x3a,
-	0xc3, 0xee, 0x1e, 0xf3, 0x5e, 0xac, 0x58, 0xcd, 0x39, 0x51, 0xed, 0xfb, 0x82, 0x0a, 0x02, 0x6d,
-	0xf0, 0x5f, 0x4a, 0x05, 0x91, 0x01, 0x69, 0xf3, 0x5b, 0x08, 0x1d, 0x70, 0xc4, 0x59, 0x59, 0x4c,
-	0x28, 0x1f, 0x5f, 0x8e, 0x7c, 0x5e, 0xe7, 0xb7, 0xd7, 0x83, 0x8f, 0x41, 0x7b, 0x9b, 0x27, 0xb7,
-	0x2d, 0x35, 0xf0, 0xab, 0xe1, 0x7c, 0x00, 0x0f, 0xee, 0x03, 0xc6, 0x94, 0x97, 0x89, 0x50, 0xf9,
-	0x10, 0x3e, 0x7b, 0x35, 0xaa, 0xb7, 0xd5, 0x08, 0x9e, 0x80, 0x03, 0x5a, 0x14, 0xac, 0xa8, 0xbf,
-	0x0e, 0x0d, 0xe0, 0x19, 0x38, 0x9a, 0x14, 0x94, 0x08, 0x1a, 0x4a, 0xad, 0x7a, 0x43, 0x67, 0xf8,
-	0x68, 0xcf, 0xa1, 0xbc, 0xc1, 0x7b, 0x63, 0xfe, 0x8b, 0xe5, 0x1a, 0x19, 0x37, 0x6b, 0x64, 0xdc,
-	0xad, 0x91, 0xf9, 0xb1, 0x42, 0xe6, 0xe7, 0x0a, 0x99, 0xd7, 0x15, 0x32, 0x97, 0x15, 0x32, 0xbf,
-	0x56, 0xc8, 0xfc, 0x56, 0x21, 0xe3, 0xae, 0x42, 0xe6, 0xa7, 0x0d, 0x32, 0x96, 0x1b, 0x64, 0xdc,
-	0x6c, 0x90, 0xf1, 0xbe, 0xfe, 0x51, 0x03, 0x7d, 0x3e, 0xfb, 0x19, 0x00, 0x00, 0xff, 0xff, 0xdc,
-	0x0e, 0x2e, 0xd1, 0xdc, 0x03, 0x00, 0x00,
+	// 630 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x54, 0x3f, 0x6f, 0xd3, 0x40,
+	0x1c, 0xb5, 0x93, 0x34, 0x34, 0x97, 0x52, 0xe0, 0xa8, 0x2a, 0x2b, 0x42, 0x97, 0xe0, 0x01, 0x55,
+	0x20, 0x39, 0x52, 0x50, 0x07, 0x24, 0x26, 0xab, 0x2a, 0x02, 0x54, 0x09, 0x5d, 0x22, 0x21, 0xc1,
+	0x74, 0x8e, 0x2f, 0x8e, 0x15, 0xdb, 0x67, 0xf9, 0xce, 0xd0, 0x6c, 0x7c, 0x04, 0xbe, 0x04, 0x12,
+	0x33, 0x9f, 0xa2, 0x63, 0xc7, 0x4e, 0x11, 0x75, 0x17, 0xd4, 0xa9, 0x13, 0x03, 0x13, 0xba, 0x3f,
+	0x69, 0x13, 0xc4, 0x04, 0xd3, 0xdd, 0xfb, 0xdd, 0xef, 0xde, 0xef, 0xbd, 0x77, 0x96, 0xc1, 0xc3,
+	0x7c, 0x16, 0xf5, 0x83, 0x84, 0xb1, 0x34, 0x28, 0xe3, 0x24, 0xec, 0xe7, 0x05, 0x13, 0x8c, 0xf7,
+	0xc5, 0x3c, 0xa7, 0xdc, 0x53, 0x00, 0x36, 0x75, 0xad, 0xb3, 0x13, 0xb1, 0x88, 0xa9, 0x7d, 0x5f,
+	0xee, 0xf4, 0x69, 0xa7, 0x2b, 0x09, 0x12, 0x16, 0xe9, 0x03, 0xc5, 0x14, 0x11, 0x41, 0x3f, 0x92,
+	0xb9, 0x6e, 0x70, 0xbf, 0xd9, 0x60, 0xf7, 0x8d, 0xdc, 0x1d, 0xc6, 0x59, 0x44, 0x8b, 0xbc, 0x88,
+	0x33, 0xe1, 0xb3, 0x32, 0x0b, 0x39, 0x7c, 0x0d, 0xea, 0x69, 0x9c, 0x39, 0x76, 0xcf, 0xde, 0x6b,
+	0xf8, 0xcf, 0x2e, 0x17, 0x5d, 0x09, 0x7f, 0x2d, 0xba, 0x5e, 0x14, 0x8b, 0x69, 0x19, 0x78, 0x63,
+	0x96, 0x4a, 0x41, 0x29, 0x15, 0x53, 0x5a, 0xf2, 0xfe, 0x98, 0xa5, 0x29, 0xcb, 0xfa, 0x29, 0x0b,
+	0x69, 0xe2, 0xad, 0xb0, 0x61, 0x79, 0x4d, 0x91, 0x91, 0x63, 0xa7, 0xb6, 0x42, 0x46, 0x8e, 0xff,
+	0x89, 0x8c, 0x1c, 0xbb, 0xaf, 0xc0, 0xe6, 0x01, 0x99, 0x8f, 0x48, 0x90, 0x50, 0xf8, 0x08, 0x6c,
+	0x87, 0x64, 0x3e, 0x8a, 0x53, 0xca, 0x05, 0x49, 0xf3, 0xa3, 0xa1, 0x12, 0x5c, 0xc7, 0x7f, 0x54,
+	0xe1, 0x2e, 0x68, 0xe6, 0x05, 0x9d, 0xc4, 0x5a, 0x43, 0x0b, 0x1b, 0xe4, 0xbe, 0x07, 0x6d, 0xe5,
+	0x7f, 0x48, 0x8b, 0x98, 0x72, 0xd8, 0x03, 0xed, 0xc9, 0xcd, 0x38, 0x6d, 0x1e, 0xaf, 0x96, 0xe0,
+	0x63, 0xd0, 0x1c, 0x4f, 0xcb, 0x6c, 0xc6, 0x9d, 0x5a, 0xaf, 0xbe, 0xd7, 0x1e, 0x40, 0x6f, 0x99,
+	0xaf, 0x37, 0x9c, 0xb2, 0x42, 0x60, 0x3a, 0xc1, 0xa6, 0xc3, 0xfd, 0x62, 0x03, 0xa8, 0xd8, 0x5f,
+	0x90, 0xfc, 0x6d, 0x2c, 0xa6, 0x7e, 0xc2, 0xc6, 0x33, 0x0e, 0x0f, 0x41, 0x33, 0x50, 0x19, 0x2b,
+	0xfe, 0xf6, 0x00, 0xe9, 0xc7, 0xe0, 0xde, 0xdf, 0x5f, 0xc2, 0xdf, 0x3e, 0x59, 0x74, 0xad, 0xcb,
+	0x45, 0xd7, 0xdc, 0xc2, 0x66, 0x85, 0x4f, 0x40, 0x93, 0x2b, 0xd9, 0x46, 0xca, 0xfd, 0x35, 0x1e,
+	0xed, 0x08, 0x9b, 0x16, 0xd8, 0x01, 0x9b, 0x81, 0x1c, 0x8f, 0xe9, 0xc4, 0xa9, 0xf7, 0xea, 0x7b,
+	0x2d, 0x7c, 0x8d, 0xdd, 0x9f, 0x36, 0x68, 0xa9, 0x3b, 0x23, 0xc2, 0x67, 0x70, 0x1b, 0xd4, 0xe2,
+	0x50, 0x49, 0x6b, 0xe1, 0x5a, 0x1c, 0xc2, 0x7d, 0xb0, 0x21, 0x64, 0xd6, 0x2a, 0xb9, 0xf6, 0xe0,
+	0xee, 0x72, 0xca, 0xf2, 0x0d, 0xfc, 0xdb, 0x46, 0x9f, 0x6e, 0xc3, 0x7a, 0x91, 0x89, 0x0b, 0x9a,
+	0x91, 0x4c, 0x38, 0x75, 0x9d, 0xb8, 0x46, 0x2b, 0xee, 0x1b, 0xff, 0xe5, 0x1e, 0x82, 0x86, 0xe0,
+	0x61, 0xe0, 0x6c, 0x28, 0x76, 0xb5, 0x87, 0x1e, 0x68, 0x44, 0x24, 0xe7, 0x4e, 0x53, 0xe5, 0xd1,
+	0x59, 0x63, 0x5e, 0x7b, 0x03, 0xac, 0xfa, 0xdc, 0xc8, 0xf8, 0x3e, 0xa2, 0x82, 0x40, 0x07, 0xdc,
+	0x4a, 0xa9, 0x20, 0x32, 0x20, 0x6d, 0x7e, 0x09, 0xa1, 0x0b, 0xb6, 0x38, 0x2b, 0x8b, 0x31, 0xe5,
+	0xa3, 0xe1, 0x81, 0xaf, 0xe3, 0x6e, 0xe1, 0xb5, 0x1a, 0x7c, 0x00, 0x5a, 0xcb, 0x3c, 0xb9, 0x09,
+	0xf8, 0xa6, 0xe0, 0x7e, 0x00, 0x77, 0xae, 0x03, 0xc6, 0x94, 0x97, 0x89, 0x50, 0xf9, 0x10, 0x3e,
+	0x7b, 0x79, 0x60, 0xa6, 0x19, 0x04, 0x77, 0xc0, 0x06, 0x2d, 0x0a, 0x56, 0x98, 0x0f, 0x55, 0x03,
+	0xb8, 0x0f, 0xb6, 0xc6, 0x05, 0x25, 0x82, 0x86, 0x52, 0xab, 0x9e, 0xd0, 0x1e, 0xdc, 0x5b, 0x73,
+	0x28, 0x4f, 0xf0, 0x5a, 0x9b, 0xff, 0xfc, 0xf4, 0x1c, 0x59, 0x67, 0xe7, 0xc8, 0xba, 0x3a, 0x47,
+	0xf6, 0xa7, 0x0a, 0xd9, 0x5f, 0x2b, 0x64, 0x9f, 0x54, 0xc8, 0x3e, 0xad, 0x90, 0xfd, 0xbd, 0x42,
+	0xf6, 0x8f, 0x0a, 0x59, 0x57, 0x15, 0xb2, 0x3f, 0x5f, 0x20, 0xeb, 0xf4, 0x02, 0x59, 0x67, 0x17,
+	0xc8, 0x7a, 0x67, 0x7e, 0x2a, 0x81, 0x5e, 0x9f, 0xfe, 0x0e, 0x00, 0x00, 0xff, 0xff, 0xf9, 0x38,
+	0x02, 0xe1, 0x88, 0x04, 0x00, 0x00,
 }
 
 func (this *ProtoFingerprintBounds) Equal(that interface{}) bool {
@@ -490,6 +556,38 @@ func (this *DayTable) Equal(that interface{}) bool {
 	}
 	return true
 }
+func (this *ProtoSeries) Equal(that interface{}) bool {
+	if that == nil {
+		return this == nil
+	}
+
+	that1, ok := that.(*ProtoSeries)
+	if !ok {
+		that2, ok := that.(ProtoSeries)
+		if ok {
+			that1 = &that2
+		} else {
+			return false
+		}
+	}
+	if that1 == nil {
+		return this == nil
+	} else if this == nil {
+		return false
+	}
+	if this.Fingerprint != that1.Fingerprint {
+		return false
+	}
+	if len(this.Chunks) != len(that1.Chunks) {
+		return false
+	}
+	for i := range this.Chunks {
+		if !this.Chunks[i].Equal(that1.Chunks[i]) {
+			return false
+		}
+	}
+	return true
+}
 func (this *ProtoGapWithBlocks) Equal(that interface{}) bool {
 	if that == nil {
 		return this == nil
@@ -512,6 +610,14 @@ func (this *ProtoGapWithBlocks) Equal(that interface{}) bool {
 	if !this.Bounds.Equal(&that1.Bounds) {
 		return false
 	}
+	if len(this.Series) != len(that1.Series) {
+		return false
+	}
+	for i := range this.Series {
+		if !this.Series[i].Equal(that1.Series[i]) {
+			return false
+		}
+	}
 	if len(this.BlockRef) != len(that1.BlockRef) {
 		return false
 	}
@@ -663,13 +769,29 @@ func (this *DayTable) GoString() string {
 	s = append(s, "}")
 	return strings.Join(s, "")
 }
-func (this *ProtoGapWithBlocks) GoString() string {
+func (this *ProtoSeries) GoString() string {
 	if this == nil {
 		return "nil"
 	}
 	s := make([]string, 0, 6)
+	s = append(s, "&protos.ProtoSeries{")
+	s = append(s, "Fingerprint: "+fmt.Sprintf("%#v", this.Fingerprint)+",\n")
+	if this.Chunks != nil {
+		s = append(s, "Chunks: "+fmt.Sprintf("%#v", this.Chunks)+",\n")
+	}
+	s = append(s, "}")
+	return strings.Join(s, "")
+}
+func (this *ProtoGapWithBlocks) GoString() string {
+	if this == nil {
+		return "nil"
+	}
+	s := make([]string, 0, 7)
 	s = append(s, "&protos.ProtoGapWithBlocks{")
 	s = append(s, "Bounds: "+strings.Replace(this.Bounds.GoString(), `&`, ``, 1)+",\n")
+	if this.Series != nil {
+		s = append(s, "Series: "+fmt.Sprintf("%#v", this.Series)+",\n")
+	}
 	s = append(s, "BlockRef: "+fmt.Sprintf("%#v", this.BlockRef)+",\n")
 	s = append(s, "}")
 	return strings.Join(s, "")
@@ -793,6 +915,48 @@ func (m *DayTable) MarshalToSizedBuffer(dAtA []byte) (int, error) {
 	return len(dAtA) - i, nil
 }
 
+func (m *ProtoSeries) Marshal() (dAtA []byte, err error) {
+	size := m.Size()
+	dAtA = make([]byte, size)
+	n, err := m.MarshalToSizedBuffer(dAtA[:size])
+	if err != nil {
+		return nil, err
+	}
+	return dAtA[:n], nil
+}
+
+func (m *ProtoSeries) MarshalTo(dAtA []byte) (int, error) {
+	size := m.Size()
+	return m.MarshalToSizedBuffer(dAtA[:size])
+}
+
+func (m *ProtoSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) {
+	i := len(dAtA)
+	_ = i
+	var l int
+	_ = l
+	if len(m.Chunks) > 0 {
+		for iNdEx := len(m.Chunks) - 1; iNdEx >= 0; iNdEx-- {
+			{
+				size, err := m.Chunks[iNdEx].MarshalToSizedBuffer(dAtA[:i])
+				if err != nil {
+					return 0, err
+				}
+				i -= size
+				i = encodeVarintTypes(dAtA, i, uint64(size))
+			}
+			i--
+			dAtA[i] = 0x12
+		}
+	}
+	if m.Fingerprint != 0 {
+		i = encodeVarintTypes(dAtA, i, uint64(m.Fingerprint))
+		i--
+		dAtA[i] = 0x8
+	}
+	return len(dAtA) - i, nil
+}
+
 func (m *ProtoGapWithBlocks) Marshal() (dAtA []byte, err error) {
 	size := m.Size()
 	dAtA = make([]byte, size)
@@ -819,6 +983,20 @@ func (m *ProtoGapWithBlocks) MarshalToSizedBuffer(dAtA []byte) (int, error) {
 			copy(dAtA[i:], m.BlockRef[iNdEx])
 			i = encodeVarintTypes(dAtA, i, uint64(len(m.BlockRef[iNdEx])))
 			i--
+			dAtA[i] = 0x1a
+		}
+	}
+	if len(m.Series) > 0 {
+		for iNdEx := len(m.Series) - 1; iNdEx >= 0; iNdEx-- {
+			{
+				size, err := m.Series[iNdEx].MarshalToSizedBuffer(dAtA[:i])
+				if err != nil {
+					return 0, err
+				}
+				i -= size
+				i = encodeVarintTypes(dAtA, i, uint64(size))
+			}
+			i--
 			dAtA[i] = 0x12
 		}
 	}
@@ -1054,6 +1232,24 @@ func (m *DayTable) Size() (n int) {
 	return n
 }
 
+func (m *ProtoSeries) Size() (n int) {
+	if m == nil {
+		return 0
+	}
+	var l int
+	_ = l
+	if m.Fingerprint != 0 {
+		n += 1 + sovTypes(uint64(m.Fingerprint))
+	}
+	if len(m.Chunks) > 0 {
+		for _, e := range m.Chunks {
+			l = e.Size()
+			n += 1 + l + sovTypes(uint64(l))
+		}
+	}
+	return n
+}
+
 func (m *ProtoGapWithBlocks) Size() (n int) {
 	if m == nil {
 		return 0
@@ -1062,6 +1258,12 @@ func (m *ProtoGapWithBlocks) Size() (n int) {
 	_ = l
 	l = m.Bounds.Size()
 	n += 1 + l + sovTypes(uint64(l))
+	if len(m.Series) > 0 {
+		for _, e := range m.Series {
+			l = e.Size()
+			n += 1 + l + sovTypes(uint64(l))
+		}
+	}
 	if len(m.BlockRef) > 0 {
 		for _, s := range m.BlockRef {
 			l = len(s)
@@ -1178,12 +1380,34 @@ func (this *DayTable) String() string {
 	}, "")
 	return s
 }
+func (this *ProtoSeries) String() string {
+	if this == nil {
+		return "nil"
+	}
+	repeatedStringForChunks := "[]*ShortRef{"
+	for _, f := range this.Chunks {
+		repeatedStringForChunks += strings.Replace(fmt.Sprintf("%v", f), "ShortRef", "logproto.ShortRef", 1) + ","
+	}
+	repeatedStringForChunks += "}"
+	s := strings.Join([]string{`&ProtoSeries{`,
+		`Fingerprint:` + fmt.Sprintf("%v", this.Fingerprint) + `,`,
+		`Chunks:` + repeatedStringForChunks + `,`,
+		`}`,
+	}, "")
+	return s
+}
 func (this *ProtoGapWithBlocks) String() string {
 	if this == nil {
 		return "nil"
 	}
+	repeatedStringForSeries := "[]*ProtoSeries{"
+	for _, f := range this.Series {
+		repeatedStringForSeries += strings.Replace(f.String(), "ProtoSeries", "ProtoSeries", 1) + ","
+	}
+	repeatedStringForSeries += "}"
 	s := strings.Join([]string{`&ProtoGapWithBlocks{`,
 		`Bounds:` + strings.Replace(strings.Replace(this.Bounds.String(), "ProtoFingerprintBounds", "ProtoFingerprintBounds", 1), `&`, ``, 1) + `,`,
+		`Series:` + repeatedStringForSeries + `,`,
 		`BlockRef:` + fmt.Sprintf("%v", this.BlockRef) + `,`,
 		`}`,
 	}, "")
@@ -1441,6 +1665,112 @@ func (m *DayTable) Unmarshal(dAtA []byte) error {
 	}
 	return nil
 }
+func (m *ProtoSeries) Unmarshal(dAtA []byte) error {
+	l := len(dAtA)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowTypes
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := dAtA[iNdEx]
+			iNdEx++
+			wire |= uint64(b&0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: ProtoSeries: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: ProtoSeries: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		case 1:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Fingerprint", wireType)
+			}
+			m.Fingerprint = 0
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowTypes
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				m.Fingerprint |= uint64(b&0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 2:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Chunks", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowTypes
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				msglen |= int(b&0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if msglen < 0 {
+				return ErrInvalidLengthTypes
+			}
+			postIndex := iNdEx + msglen
+			if postIndex < 0 {
+				return ErrInvalidLengthTypes
+			}
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Chunks = append(m.Chunks, &logproto.ShortRef{})
+			if err := m.Chunks[len(m.Chunks)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		default:
+			iNdEx = preIndex
+			skippy, err := skipTypes(dAtA[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthTypes
+			}
+			if (iNdEx + skippy) < 0 {
+				return ErrInvalidLengthTypes
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
 func (m *ProtoGapWithBlocks) Unmarshal(dAtA []byte) error {
 	l := len(dAtA)
 	iNdEx := 0
@@ -1504,6 +1834,40 @@ func (m *ProtoGapWithBlocks) Unmarshal(dAtA []byte) error {
 			}
 			iNdEx = postIndex
 		case 2:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Series", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowTypes
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				msglen |= int(b&0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if msglen < 0 {
+				return ErrInvalidLengthTypes
+			}
+			postIndex := iNdEx + msglen
+			if postIndex < 0 {
+				return ErrInvalidLengthTypes
+			}
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Series = append(m.Series, &ProtoSeries{})
+			if err := m.Series[len(m.Series)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		case 3:
 			if wireType != 2 {
 				return fmt.Errorf("proto: wrong wireType = %d for field BlockRef", wireType)
 			}
diff --git a/pkg/bloombuild/protos/types.proto b/pkg/bloombuild/protos/types.proto
index 55ae89625abe6..9e63dd1adb604 100644
--- a/pkg/bloombuild/protos/types.proto
+++ b/pkg/bloombuild/protos/types.proto
@@ -3,6 +3,7 @@ syntax = "proto3";
 package protos;
 
 import "gogoproto/gogo.proto";
+import "pkg/logproto/bloomgateway.proto";
 
 option go_package = "protos";
 option (gogoproto.marshaler_all) = true;
@@ -27,12 +28,18 @@ message DayTable {
   string prefix = 2;
 }
 
+message ProtoSeries {
+  uint64 fingerprint = 1;
+  repeated logproto.ShortRef chunks = 2;
+}
+
 message ProtoGapWithBlocks {
   ProtoFingerprintBounds bounds = 1 [
     (gogoproto.nullable) = false,
     (gogoproto.jsontag) = "bounds"
   ];
-  repeated string blockRef = 2;
+  repeated ProtoSeries series = 2;
+  repeated string blockRef = 3;
 }
 
 // TODO: Define BlockRef and SingleTenantTSDBIdentifier as messages so we can use them right away