Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: [k212] fix: Keep blocks referenced by newer metas #13627

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (p *Planner) computeTasks(

// In case the planner restarted before deleting outdated metas in the previous iteration,
// we delete them during the planning phase to avoid reprocessing them.
metas, err = p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, metas, phasePlanning)
metas, err = p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, nil, metas, phasePlanning)
if err != nil {
return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err)
}
Expand Down Expand Up @@ -446,8 +446,7 @@ func (p *Planner) processTenantTaskResults(
return tasksSucceed, nil
}

combined := append(originalMetas, newMetas...)
if _, err := p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, combined, phaseBuilding); err != nil {
if _, err := p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, newMetas, originalMetas, phaseBuilding); err != nil {
return 0, fmt.Errorf("failed to delete outdated metas: %w", err)
}

Expand All @@ -460,12 +459,14 @@ func (p *Planner) deleteOutdatedMetasAndBlocks(
ctx context.Context,
table config.DayTable,
tenant string,
metas []bloomshipper.Meta,
newMetas []bloomshipper.Meta,
originalMetas []bloomshipper.Meta,
phase string,
) ([]bloomshipper.Meta, error) {
logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant, "phase", phase)

upToDate, outdated := outdatedMetas(metas)
combined := append(originalMetas, newMetas...)
upToDate, outdated := outdatedMetas(combined)
if len(outdated) == 0 {
level.Debug(logger).Log(
"msg", "no outdated metas found",
Expand Down Expand Up @@ -497,17 +498,25 @@ func (p *Planner) deleteOutdatedMetasAndBlocks(

for _, meta := range outdated {
for _, block := range meta.Blocks {
logger := log.With(logger, "block", block.String())

// Prevent deleting blocks that are reused in new metas
if isBlockInMetas(block, upToDate) {
level.Debug(logger).Log("msg", "block is still in use in new meta, skipping delete")
continue
}

if err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block}); err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block.String())
level.Debug(logger).Log("msg", "block not found while attempting delete, continuing")
} else {
level.Error(logger).Log("msg", "failed to delete block", "err", err, "block", block.String())
level.Error(logger).Log("msg", "failed to delete block", "err", err)
return nil, errors.Wrap(err, "failed to delete block")
}
}

deletedBlocks++
level.Debug(logger).Log("msg", "removed outdated block", "block", block.String())
level.Debug(logger).Log("msg", "removed outdated block")
}

err = client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef})
Expand All @@ -532,6 +541,22 @@ func (p *Planner) deleteOutdatedMetasAndBlocks(
return upToDate, nil
}

func isBlockInMetas(block bloomshipper.BlockRef, metas []bloomshipper.Meta) bool {
// Blocks are sorted within a meta, so we can find it with binary search
for _, meta := range metas {
// Search for the first block whose bound is >= than the target block min bound.
i := sort.Search(len(meta.Blocks), func(i int) bool {
return meta.Blocks[i].Cmp(uint64(block.Bounds.Max)) <= v1.Overlap
})

if i < len(meta.Blocks) && meta.Blocks[i] == block {
return true
}
}

return false
}

func (p *Planner) tables(ts time.Time) *dayRangeIterator {
// adjust the minimum by one to make it inclusive, which is more intuitive
// for a configuration variable
Expand Down
100 changes: 92 additions & 8 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package planner

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -20,6 +21,8 @@ import (
"google.golang.org/grpc"

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/chunkenc"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
Expand Down Expand Up @@ -166,11 +169,36 @@ func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef {
}
}

func genBlock(ref bloomshipper.BlockRef) bloomshipper.Block {
func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := v1.NewByteReader(indexBuf, bloomsBuf)

blockOpts := v1.NewBlockOptions(chunkenc.EncNone, 4, 1, 0, 0)

builder, err := v1.NewBlockBuilder(blockOpts, writer)
if err != nil {
return bloomshipper.Block{}, err
}

if _, err = builder.BuildFrom(iter.NewEmptyIter[v1.SeriesWithBlooms]()); err != nil {
return bloomshipper.Block{}, err
}

block := v1.NewBlock(reader, v1.NewMetrics(nil))

buf := bytes.NewBuffer(nil)
if err := v1.TarGz(buf, block.Reader()); err != nil {
return bloomshipper.Block{}, err
}

tarReader := bytes.NewReader(buf.Bytes())

return bloomshipper.Block{
BlockRef: ref,
Data: &DummyReadSeekCloser{},
}
Data: bloomshipper.ClosableReadSeekerAdapter{ReadSeeker: tarReader},
}, nil
}

func Test_blockPlansForGaps(t *testing.T) {
Expand Down Expand Up @@ -612,7 +640,12 @@ func putMetas(bloomClient bloomshipper.Client, metas []bloomshipper.Meta) error
}

for _, block := range meta.Blocks {
err := bloomClient.PutBlock(context.Background(), genBlock(block))
writtenBlock, err := genBlock(block)
if err != nil {
return err
}

err = bloomClient.PutBlock(context.Background(), writtenBlock)
if err != nil {
return err
}
Expand Down Expand Up @@ -826,6 +859,7 @@ func Test_deleteOutdatedMetas(t *testing.T) {
for _, tc := range []struct {
name string
originalMetas []bloomshipper.Meta
newMetas []bloomshipper.Meta
expectedUpToDateMetas []bloomshipper.Meta
}{
{
Expand All @@ -835,6 +869,8 @@ func Test_deleteOutdatedMetas(t *testing.T) {
name: "only up to date metas",
originalMetas: []bloomshipper.Meta{
genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
},
newMetas: []bloomshipper.Meta{
genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}),
},
expectedUpToDateMetas: []bloomshipper.Meta{
Expand All @@ -846,13 +882,52 @@ func Test_deleteOutdatedMetas(t *testing.T) {
name: "outdated metas",
originalMetas: []bloomshipper.Meta{
genMeta(0, 5, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 5)}),
genMeta(6, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 10)}),
},
newMetas: []bloomshipper.Meta{
genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
},
expectedUpToDateMetas: []bloomshipper.Meta{
genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
},
},
{
name: "new metas reuse blocks from outdated meta",
originalMetas: []bloomshipper.Meta{
genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{ // Outdated
genBlockRef(0, 5), // Reuse
genBlockRef(5, 10), // Delete
}),
genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{ // Outdated
genBlockRef(10, 20), // Reuse
}),
genMeta(20, 30, []int{0}, []bloomshipper.BlockRef{ // Up to date
genBlockRef(20, 30),
}),
},
newMetas: []bloomshipper.Meta{
genMeta(0, 5, []int{1}, []bloomshipper.BlockRef{
genBlockRef(0, 5), // Reused block
}),
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{
genBlockRef(5, 7), // New block
genBlockRef(7, 10), // New block
genBlockRef(10, 20), // Reused block
}),
},
expectedUpToDateMetas: []bloomshipper.Meta{
genMeta(0, 5, []int{1}, []bloomshipper.BlockRef{
genBlockRef(0, 5),
}),
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{
genBlockRef(5, 7),
genBlockRef(7, 10),
genBlockRef(10, 20),
}),
genMeta(20, 30, []int{0}, []bloomshipper.BlockRef{
genBlockRef(20, 30),
}),
},
},
} {
t.Run(tc.name, func(t *testing.T) {
logger := log.NewNopLogger()
Expand All @@ -867,9 +942,11 @@ func Test_deleteOutdatedMetas(t *testing.T) {
bloomClient, err := planner.bloomStore.Client(testDay.ModelTime())
require.NoError(t, err)

// Create original metas and blocks
// Create original/new metas and blocks
err = putMetas(bloomClient, tc.originalMetas)
require.NoError(t, err)
err = putMetas(bloomClient, tc.newMetas)
require.NoError(t, err)

// Get all metas
metas, err := planner.bloomStore.FetchMetas(
Expand All @@ -882,9 +959,9 @@ func Test_deleteOutdatedMetas(t *testing.T) {
)
require.NoError(t, err)
removeLocFromMetasSources(metas)
require.ElementsMatch(t, tc.originalMetas, metas)
require.ElementsMatch(t, append(tc.originalMetas, tc.newMetas...), metas)

upToDate, err := planner.deleteOutdatedMetasAndBlocks(context.Background(), testTable, "fakeTenant", tc.originalMetas, phasePlanning)
upToDate, err := planner.deleteOutdatedMetasAndBlocks(context.Background(), testTable, "fakeTenant", tc.newMetas, tc.originalMetas, phasePlanning)
require.NoError(t, err)
require.ElementsMatch(t, tc.expectedUpToDateMetas, upToDate)

Expand All @@ -900,6 +977,13 @@ func Test_deleteOutdatedMetas(t *testing.T) {
require.NoError(t, err)
removeLocFromMetasSources(metas)
require.ElementsMatch(t, tc.expectedUpToDateMetas, metas)

// Fetch all blocks from the metas
for _, meta := range metas {
blocks, err := planner.bloomStore.FetchBlocks(context.Background(), meta.Blocks)
require.NoError(t, err)
require.Len(t, blocks, len(meta.Blocks))
}
})
}
}
Expand Down
Loading