From 8e4b3e38fc325b191743314e3baeea4887ae2709 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Tue, 23 May 2023 15:35:24 +0200 Subject: [PATCH 1/4] Compactor: do not issue object storage HEAD API calls on each block meta.json Signed-off-by: Marco Pracucci --- pkg/compactor/bucket_compactor.go | 60 +-- pkg/compactor/bucket_compactor_e2e_test.go | 4 - pkg/compactor/compactor.go | 4 - pkg/compactor/split_merge_compactor_test.go | 8 +- pkg/storage/tsdb/block/fetcher.go | 97 +++-- pkg/storage/tsdb/block/fetcher_test.go | 345 ++++++++++++++++++ pkg/storage/tsdb/block/global_markers.go | 26 ++ pkg/storage/tsdb/block/global_markers_test.go | 37 ++ pkg/storage/tsdb/bucketindex/updater.go | 10 +- 9 files changed, 495 insertions(+), 96 deletions(-) create mode 100644 pkg/storage/tsdb/block/fetcher_test.go diff --git a/pkg/compactor/bucket_compactor.go b/pkg/compactor/bucket_compactor.go index 140a195937b..ba3aac19e6c 100644 --- a/pkg/compactor/bucket_compactor.go +++ b/pkg/compactor/bucket_compactor.go @@ -45,7 +45,7 @@ type DeduplicateFilter interface { type Syncer struct { logger log.Logger bkt objstore.Bucket - fetcher block.MetadataFetcher + fetcher *block.MetaFetcher mtx sync.Mutex blocks map[ulid.ULID]*block.Meta metrics *syncerMetrics @@ -83,7 +83,7 @@ func newSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion prometh // NewMetaSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, deduplicateBlocksFilter DeduplicateFilter, blocksMarkedForDeletion prometheus.Counter) (*Syncer, error) { +func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher *block.MetaFetcher, deduplicateBlocksFilter DeduplicateFilter, blocksMarkedForDeletion prometheus.Counter) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } @@ -102,7 +102,9 @@ func (s *Syncer) SyncMetas(ctx context.Context) error { s.mtx.Lock() defer s.mtx.Unlock() - metas, _, err := s.fetcher.Fetch(ctx) + // While fetching blocks, we filter out blocks that were marked for deletion. + // No deletion delay is used -- all blocks with deletion marker are ignored, and not considered for compaction. + metas, _, err := s.fetcher.FetchWithoutMarkedForDeletion(ctx) if err != nil { return err } @@ -777,7 +779,7 @@ func (c *BucketCompactor) Compact(ctx context.Context, maxCompactionTime time.Du // Blocks that were compacted are garbage collected after each Compaction. // However if compactor crashes we need to resolve those on startup. if err := c.sy.GarbageCollect(ctx); err != nil { - return errors.Wrap(err, "garbage") + return errors.Wrap(err, "blocks garbage collect") } jobs, err := c.grouper.Groups(c.sy.Metas()) @@ -971,53 +973,3 @@ func hasNonZeroULIDs(ids []ulid.ULID) bool { return false } - -// ExcludeMarkedForDeletionFilter is a filter that filters out the blocks that are marked for deletion. -// Compared to IgnoreDeletionMarkFilter filter from Thanos, this implementation doesn't use any deletion delay, -// and only uses marker files under bucketindex.MarkersPathname. -type ExcludeMarkedForDeletionFilter struct { - bkt objstore.InstrumentedBucketReader - - deletionMarkMap map[ulid.ULID]struct{} -} - -func NewExcludeMarkedForDeletionFilter(bkt objstore.InstrumentedBucketReader) *ExcludeMarkedForDeletionFilter { - return &ExcludeMarkedForDeletionFilter{ - bkt: bkt, - } -} - -// DeletionMarkBlocks returns block ids that were marked for deletion. -// It is safe to call this method only after Filter has finished, and it is also safe to manipulate the map between calls to Filter. -func (f *ExcludeMarkedForDeletionFilter) DeletionMarkBlocks() map[ulid.ULID]struct{} { - return f.deletionMarkMap -} - -// Filter filters out blocks that are marked for deletion. -// It also builds the map returned by DeletionMarkBlocks() method. -func (f *ExcludeMarkedForDeletionFilter) Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error { - deletionMarkMap := make(map[ulid.ULID]struct{}) - - // Find all markers in the storage. - err := f.bkt.Iter(ctx, block.MarkersPathname+"/", func(name string) error { - if err := ctx.Err(); err != nil { - return err - } - - if blockID, ok := block.IsDeletionMarkFilename(path.Base(name)); ok { - _, exists := metas[blockID] - if exists { - deletionMarkMap[blockID] = struct{}{} - synced.WithLabelValues(block.MarkedForDeletionMeta).Inc() - delete(metas, blockID) - } - } - return nil - }) - if err != nil { - return errors.Wrap(err, "list block deletion marks") - } - - f.deletionMarkMap = deletionMarkMap - return nil -} diff --git a/pkg/compactor/bucket_compactor_e2e_test.go b/pkg/compactor/bucket_compactor_e2e_test.go index ac070d24cdf..9b22e1cf72c 100644 --- a/pkg/compactor/bucket_compactor_e2e_test.go +++ b/pkg/compactor/bucket_compactor_e2e_test.go @@ -225,11 +225,9 @@ func TestGroupCompactE2E(t *testing.T) { reg := prometheus.NewRegistry() - ignoreDeletionMarkFilter := NewExcludeMarkedForDeletionFilter(objstore.WithNoopInstr(bkt)) duplicateBlocksFilter := NewShardAwareDeduplicateFilter() noCompactMarkerFilter := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), true) metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{ - ignoreDeletionMarkFilter, duplicateBlocksFilter, noCompactMarkerFilter, }) @@ -520,11 +518,9 @@ func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T } blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) - ignoreDeletionMarkFilter := NewExcludeMarkedForDeletionFilter(objstore.WithNoopInstr(bkt)) duplicateBlocksFilter := NewShardAwareDeduplicateFilter() metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{ - ignoreDeletionMarkFilter, duplicateBlocksFilter, }) require.NoError(t, err) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index f14cb3f87ae..44775b25f7b 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -697,9 +697,6 @@ func (c *MultitenantCompactor) compactUser(ctx context.Context, userID string) e userLogger := util_log.WithUserID(userID, c.logger) - // While fetching blocks, we filter out blocks that were marked for deletion by using ExcludeMarkedForDeletionFilter. - // No delay is used -- all blocks with deletion marker are ignored, and not considered for compaction. - excludeMarkedForDeletionFilter := NewExcludeMarkedForDeletionFilter(userBucket) // Filters out duplicate blocks that can be formed from two or more overlapping // blocks that fully submatches the source blocks of the older blocks. deduplicateBlocksFilter := NewShardAwareDeduplicateFilter() @@ -714,7 +711,6 @@ func (c *MultitenantCompactor) compactUser(ctx context.Context, userID string) e mimir_tsdb.DeprecatedTenantIDExternalLabel, mimir_tsdb.DeprecatedIngesterIDExternalLabel, }), - excludeMarkedForDeletionFilter, deduplicateBlocksFilter, // removes blocks that should not be compacted due to being marked so. NewNoCompactionMarkFilter(userBucket, true), diff --git a/pkg/compactor/split_merge_compactor_test.go b/pkg/compactor/split_merge_compactor_test.go index 542886ebb3d..3235de4b76f 100644 --- a/pkg/compactor/split_merge_compactor_test.go +++ b/pkg/compactor/split_merge_compactor_test.go @@ -721,10 +721,10 @@ func TestMultitenantCompactor_ShouldSupportSplitAndMergeCompactor(t *testing.T) userBucket, fetcherDir, reg, - []block.MetadataFilter{NewExcludeMarkedForDeletionFilter(userBucket)}, + nil, ) require.NoError(t, err) - metas, partials, err := fetcher.Fetch(ctx) + metas, partials, err := fetcher.FetchWithoutMarkedForDeletion(ctx) require.NoError(t, err) require.Empty(t, partials) @@ -812,10 +812,10 @@ func TestMultitenantCompactor_ShouldGuaranteeSeriesShardingConsistencyOverTheTim userBucket, fetcherDir, reg, - []block.MetadataFilter{NewExcludeMarkedForDeletionFilter(userBucket)}, + nil, ) require.NoError(t, err) - metas, partials, err := fetcher.Fetch(ctx) + metas, partials, err := fetcher.FetchWithoutMarkedForDeletion(ctx) require.NoError(t, err) require.Empty(t, partials) diff --git a/pkg/storage/tsdb/block/fetcher.go b/pkg/storage/tsdb/block/fetcher.go index 4bf6d457cf4..735fc7e0c0b 100644 --- a/pkg/storage/tsdb/block/fetcher.go +++ b/pkg/storage/tsdb/block/fetcher.go @@ -123,7 +123,7 @@ type GaugeVec interface { WithLabelValues(lvs ...string) prometheus.Gauge } -// Filter allows filtering or modifying metas from the provided map or returns error. +// MetadataFilter allows filtering or modifying metas from the provided map or returns error. type MetadataFilter interface { Filter(ctx context.Context, metas map[ulid.ULID]*Meta, synced GaugeVec) error } @@ -176,23 +176,36 @@ var ( ) // loadMeta returns metadata from object storage or error. -// It returns `ErrorSyncMetaNotFound` and `ErrorSyncMetaCorrupted` sentinel errors in those cases. +// It returns ErrorSyncMetaNotFound and ErrorSyncMetaCorrupted sentinel errors in those cases. func (f *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*Meta, error) { var ( metaFile = path.Join(id.String(), MetaFilename) cachedBlockDir = filepath.Join(f.cacheDir, id.String()) ) - // TODO(bwplotka): If that causes problems (obj store rate limits), add longer ttl to cached items. - // For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix. - ok, err := f.bkt.Exists(ctx, metaFile) - if err != nil { - return nil, errors.Wrapf(err, "meta.json file exists: %v", metaFile) - } - if !ok { - return nil, ErrorSyncMetaNotFound - } - + // Block meta.json file is immutable, so we lookup the cache as first thing without issuing + // any API call to the object storage. This significantly reduce the pressure on the object + // storage. + // + // Details of all possible cases: + // + // - The block upload is in progress: the meta.json file is guaranteed to be uploaded at last. + // When we'll try to read it from object storage (later on), it will fail with ErrorSyncMetaNotFound + // which is correctly handled by the caller (partial block). + // + // - The block upload is completed: this is the normal case. Meta.json file still exists in the + // object storage and it's expected to match the locally cached one (because immutable by design). + // + // - The block has been marked for deletion: the deletion hasn't started yet, so the full block (including + // the meta.json file) is still in the object storage. This case is not different than the previous one. + // + // - The block deletion is in progress: loadMeta() function may return the cached meta.json while it should + // return ErrorSyncMetaNotFound. This is a race condition that could happen even if we check the meta.json + // file in the storage, because the deletion could start right after we check it but before the MetaFetcher + // completes its sync. + // + // - The block has been deleted: the loadMeta() function will not be called at all, because the block + // was not discovered while iterating the bucket since all its files were already deleted. if m, seen := f.cached[id]; seen { return m, nil } @@ -253,14 +266,17 @@ func (f *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*Meta, error) type response struct { metas map[ulid.ULID]*Meta partial map[ulid.ULID]error + // If metaErr > 0 it means incomplete view, so some metas, failed to be loaded. metaErrs multierror.MultiError - noMetas float64 - corruptedMetas float64 + // Track the number of blocks not returned because of various reasons. + noMetasCount float64 + corruptedMetasCount float64 + markedForDeletionCount float64 } -func (f *MetaFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { +func (f *MetaFetcher) fetchMetadata(ctx context.Context, excludeMarkedForDeletion bool) (interface{}, error) { var ( resp = response{ metas: make(map[ulid.ULID]*Meta), @@ -271,6 +287,19 @@ func (f *MetaFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { mtx sync.Mutex ) level.Debug(f.logger).Log("msg", "fetching meta data", "concurrency", f.concurrency) + + // Get the list of blocks marked for deletion so that we'll exclude them (if required). + var markedForDeletion map[ulid.ULID]struct{} + if excludeMarkedForDeletion { + var err error + + markedForDeletion, err = ListBlockDeletionMarks(ctx, f.bkt) + if err != nil { + return nil, err + } + } + + // Run workers. for i := 0; i < f.concurrency; i++ { eg.Go(func() error { for id := range ch { @@ -284,11 +313,11 @@ func (f *MetaFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { if errors.Is(errors.Cause(err), ErrorSyncMetaNotFound) { mtx.Lock() - resp.noMetas++ + resp.noMetasCount++ mtx.Unlock() } else if errors.Is(errors.Cause(err), ErrorSyncMetaCorrupted) { mtx.Lock() - resp.corruptedMetas++ + resp.corruptedMetasCount++ mtx.Unlock() } else { mtx.Lock() @@ -314,6 +343,12 @@ func (f *MetaFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { return nil } + // If requested, skip any block marked for deletion. + if _, marked := markedForDeletion[id]; excludeMarkedForDeletion && marked { + resp.markedForDeletionCount++ + return nil + } + select { case <-ctx.Done(): return ctx.Err() @@ -378,7 +413,22 @@ func (f *MetaFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { // It's caller responsibility to not change the returned metadata files. Maps can be modified. // // Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing. -func (f *MetaFetcher) Fetch(ctx context.Context) (_ map[ulid.ULID]*Meta, _ map[ulid.ULID]error, err error) { +func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*Meta, partials map[ulid.ULID]error, err error) { + metas, partials, err = f.fetch(ctx, false) + return +} + +// FetchWithoutMarkedForDeletion returns all block metas as well as partial blocks (blocks without or with corrupted meta file) from the bucket. +// This function excludes all blocks for deletion (no deletion delay applied). +// It's caller responsibility to not change the returned metadata files. Maps can be modified. +// +// Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing. +func (f *MetaFetcher) FetchWithoutMarkedForDeletion(ctx context.Context) (metas map[ulid.ULID]*Meta, partials map[ulid.ULID]error, err error) { + metas, partials, err = f.fetch(ctx, true) + return +} + +func (f *MetaFetcher) fetch(ctx context.Context, excludeMarkedForDeletion bool) (_ map[ulid.ULID]*Meta, _ map[ulid.ULID]error, err error) { start := time.Now() defer func() { f.metrics.SyncDuration.Observe(time.Since(start).Seconds()) @@ -392,7 +442,7 @@ func (f *MetaFetcher) Fetch(ctx context.Context) (_ map[ulid.ULID]*Meta, _ map[u // Run this in thread safe run group. v, err := f.g.Do("", func() (i interface{}, err error) { // NOTE: First go routine context will go through. - return f.fetchMetadata(ctx) + return f.fetchMetadata(ctx, excludeMarkedForDeletion) }) if err != nil { return nil, nil, err @@ -406,8 +456,11 @@ func (f *MetaFetcher) Fetch(ctx context.Context) (_ map[ulid.ULID]*Meta, _ map[u } f.metrics.Synced.WithLabelValues(FailedMeta).Set(float64(len(resp.metaErrs))) - f.metrics.Synced.WithLabelValues(NoMeta).Set(resp.noMetas) - f.metrics.Synced.WithLabelValues(CorruptedMeta).Set(resp.corruptedMetas) + f.metrics.Synced.WithLabelValues(NoMeta).Set(resp.noMetasCount) + f.metrics.Synced.WithLabelValues(CorruptedMeta).Set(resp.corruptedMetasCount) + if excludeMarkedForDeletion { + f.metrics.Synced.WithLabelValues(MarkedForDeletionMeta).Set(resp.markedForDeletionCount) + } for _, filter := range f.filters { // NOTE: filter can update synced metric accordingly to the reason of the exclude. @@ -434,7 +487,7 @@ func (f *MetaFetcher) countCached() int { return len(f.cached) } -// Special label that will have an ULID of the meta.json being referenced to. +// BlockIDLabel is a special label that will have an ULID of the meta.json being referenced to. const BlockIDLabel = "__block_id" // IgnoreDeletionMarkFilter is a filter that filters out the blocks that are marked for deletion after a given delay. diff --git a/pkg/storage/tsdb/block/fetcher_test.go b/pkg/storage/tsdb/block/fetcher_test.go new file mode 100644 index 00000000000..e8239a671e9 --- /dev/null +++ b/pkg/storage/tsdb/block/fetcher_test.go @@ -0,0 +1,345 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package block + +import ( + "context" + "path" + "path/filepath" + "strings" + "testing" + + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + + "github.com/grafana/mimir/pkg/storage/bucket/filesystem" +) + +func TestMetaFetcher_Fetch_ShouldReturnDiscoveredBlocksIncludingMarkedForDeletion(t *testing.T) { + var ( + ctx = context.Background() + reg = prometheus.NewPedanticRegistry() + logger = log.NewNopLogger() + ) + + // Create a bucket client with global markers. + bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: t.TempDir()}) + require.NoError(t, err) + bkt = BucketWithGlobalMarkers(bkt) + + f, err := NewMetaFetcher(logger, 10, objstore.BucketWithMetrics("test", bkt, reg), t.TempDir(), reg, nil) + require.NoError(t, err) + + t.Run("should return no metas and no partials on no block in the storage", func(t *testing.T) { + actualMetas, actualPartials, actualErr := f.Fetch(ctx) + require.NoError(t, actualErr) + require.Empty(t, actualMetas) + require.Empty(t, actualPartials) + + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures + # TYPE blocks_meta_sync_failures_total counter + blocks_meta_sync_failures_total 0 + + # HELP blocks_meta_synced Number of block metadata synced + # TYPE blocks_meta_synced gauge + blocks_meta_synced{state="corrupted-meta-json"} 0 + blocks_meta_synced{state="duplicate"} 0 + blocks_meta_synced{state="failed"} 0 + blocks_meta_synced{state="label-excluded"} 0 + blocks_meta_synced{state="loaded"} 0 + blocks_meta_synced{state="marked-for-deletion"} 0 + blocks_meta_synced{state="marked-for-no-compact"} 0 + blocks_meta_synced{state="no-meta-json"} 0 + blocks_meta_synced{state="time-excluded"} 0 + + # HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts + # TYPE blocks_meta_syncs_total counter + blocks_meta_syncs_total 1 + `), "blocks_meta_syncs_total", "blocks_meta_sync_failures_total", "blocks_meta_synced")) + }) + + // Upload a block. + block1ID, block1Dir := createTestBlock(t) + require.NoError(t, Upload(ctx, logger, bkt, block1Dir, nil)) + + // Upload a partial block. + block2ID, block2Dir := createTestBlock(t) + require.NoError(t, Upload(ctx, logger, bkt, block2Dir, nil)) + require.NoError(t, bkt.Delete(ctx, path.Join(block2ID.String(), MetaFilename))) + + t.Run("should return metas and partials on some blocks in the storage", func(t *testing.T) { + actualMetas, actualPartials, actualErr := f.Fetch(ctx) + require.NoError(t, actualErr) + require.Len(t, actualMetas, 1) + require.Contains(t, actualMetas, block1ID) + require.Len(t, actualPartials, 1) + require.Contains(t, actualPartials, block2ID) + + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures + # TYPE blocks_meta_sync_failures_total counter + blocks_meta_sync_failures_total 0 + + # HELP blocks_meta_synced Number of block metadata synced + # TYPE blocks_meta_synced gauge + blocks_meta_synced{state="corrupted-meta-json"} 0 + blocks_meta_synced{state="duplicate"} 0 + blocks_meta_synced{state="failed"} 0 + blocks_meta_synced{state="label-excluded"} 0 + blocks_meta_synced{state="loaded"} 1 + blocks_meta_synced{state="marked-for-deletion"} 0 + blocks_meta_synced{state="marked-for-no-compact"} 0 + blocks_meta_synced{state="no-meta-json"} 1 + blocks_meta_synced{state="time-excluded"} 0 + + # HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts + # TYPE blocks_meta_syncs_total counter + blocks_meta_syncs_total 2 + `), "blocks_meta_syncs_total", "blocks_meta_sync_failures_total", "blocks_meta_synced")) + }) + + // Upload a block and mark it for deletion. + block3ID, block3Dir := createTestBlock(t) + require.NoError(t, Upload(ctx, logger, bkt, block3Dir, nil)) + require.NoError(t, MarkForDeletion(ctx, logger, bkt, block3ID, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) + + t.Run("should include blocks marked for deletion", func(t *testing.T) { + actualMetas, actualPartials, actualErr := f.Fetch(ctx) + require.NoError(t, actualErr) + require.Len(t, actualMetas, 2) + require.Contains(t, actualMetas, block1ID) + require.Contains(t, actualMetas, block3ID) + require.Len(t, actualPartials, 1) + require.Contains(t, actualPartials, block2ID) + + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures + # TYPE blocks_meta_sync_failures_total counter + blocks_meta_sync_failures_total 0 + + # HELP blocks_meta_synced Number of block metadata synced + # TYPE blocks_meta_synced gauge + blocks_meta_synced{state="corrupted-meta-json"} 0 + blocks_meta_synced{state="duplicate"} 0 + blocks_meta_synced{state="failed"} 0 + blocks_meta_synced{state="label-excluded"} 0 + blocks_meta_synced{state="loaded"} 2 + blocks_meta_synced{state="marked-for-deletion"} 0 + blocks_meta_synced{state="marked-for-no-compact"} 0 + blocks_meta_synced{state="no-meta-json"} 1 + blocks_meta_synced{state="time-excluded"} 0 + + # HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts + # TYPE blocks_meta_syncs_total counter + blocks_meta_syncs_total 3 + `), "blocks_meta_syncs_total", "blocks_meta_sync_failures_total", "blocks_meta_synced")) + }) +} + +func TestMetaFetcher_FetchWithoutMarkedForDeletion_ShouldReturnDiscoveredBlocksExcludingMarkedForDeletion(t *testing.T) { + var ( + ctx = context.Background() + reg = prometheus.NewPedanticRegistry() + logger = log.NewNopLogger() + ) + + // Create a bucket client with global markers. + bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: t.TempDir()}) + require.NoError(t, err) + bkt = BucketWithGlobalMarkers(bkt) + + f, err := NewMetaFetcher(logger, 10, objstore.BucketWithMetrics("test", bkt, reg), t.TempDir(), reg, nil) + require.NoError(t, err) + + t.Run("should return no metas and no partials on no block in the storage", func(t *testing.T) { + actualMetas, actualPartials, actualErr := f.FetchWithoutMarkedForDeletion(ctx) + require.NoError(t, actualErr) + require.Empty(t, actualMetas) + require.Empty(t, actualPartials) + + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures + # TYPE blocks_meta_sync_failures_total counter + blocks_meta_sync_failures_total 0 + + # HELP blocks_meta_synced Number of block metadata synced + # TYPE blocks_meta_synced gauge + blocks_meta_synced{state="corrupted-meta-json"} 0 + blocks_meta_synced{state="duplicate"} 0 + blocks_meta_synced{state="failed"} 0 + blocks_meta_synced{state="label-excluded"} 0 + blocks_meta_synced{state="loaded"} 0 + blocks_meta_synced{state="marked-for-deletion"} 0 + blocks_meta_synced{state="marked-for-no-compact"} 0 + blocks_meta_synced{state="no-meta-json"} 0 + blocks_meta_synced{state="time-excluded"} 0 + + # HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts + # TYPE blocks_meta_syncs_total counter + blocks_meta_syncs_total 1 + `), "blocks_meta_syncs_total", "blocks_meta_sync_failures_total", "blocks_meta_synced")) + }) + + // Upload a block. + block1ID, block1Dir := createTestBlock(t) + require.NoError(t, Upload(ctx, logger, bkt, block1Dir, nil)) + + // Upload a partial block. + block2ID, block2Dir := createTestBlock(t) + require.NoError(t, Upload(ctx, logger, bkt, block2Dir, nil)) + require.NoError(t, bkt.Delete(ctx, path.Join(block2ID.String(), MetaFilename))) + + t.Run("should return metas and partials on some blocks in the storage", func(t *testing.T) { + actualMetas, actualPartials, actualErr := f.FetchWithoutMarkedForDeletion(ctx) + require.NoError(t, actualErr) + require.Len(t, actualMetas, 1) + require.Contains(t, actualMetas, block1ID) + require.Len(t, actualPartials, 1) + require.Contains(t, actualPartials, block2ID) + + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures + # TYPE blocks_meta_sync_failures_total counter + blocks_meta_sync_failures_total 0 + + # HELP blocks_meta_synced Number of block metadata synced + # TYPE blocks_meta_synced gauge + blocks_meta_synced{state="corrupted-meta-json"} 0 + blocks_meta_synced{state="duplicate"} 0 + blocks_meta_synced{state="failed"} 0 + blocks_meta_synced{state="label-excluded"} 0 + blocks_meta_synced{state="loaded"} 1 + blocks_meta_synced{state="marked-for-deletion"} 0 + blocks_meta_synced{state="marked-for-no-compact"} 0 + blocks_meta_synced{state="no-meta-json"} 1 + blocks_meta_synced{state="time-excluded"} 0 + + # HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts + # TYPE blocks_meta_syncs_total counter + blocks_meta_syncs_total 2 + `), "blocks_meta_syncs_total", "blocks_meta_sync_failures_total", "blocks_meta_synced")) + }) + + // Upload a block and mark it for deletion. + block3ID, block3Dir := createTestBlock(t) + require.NoError(t, Upload(ctx, logger, bkt, block3Dir, nil)) + require.NoError(t, MarkForDeletion(ctx, logger, bkt, block3ID, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) + + t.Run("should include blocks marked for deletion", func(t *testing.T) { + actualMetas, actualPartials, actualErr := f.FetchWithoutMarkedForDeletion(ctx) + require.NoError(t, actualErr) + require.Len(t, actualMetas, 1) + require.Contains(t, actualMetas, block1ID) + require.Len(t, actualPartials, 1) + require.Contains(t, actualPartials, block2ID) + + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures + # TYPE blocks_meta_sync_failures_total counter + blocks_meta_sync_failures_total 0 + + # HELP blocks_meta_synced Number of block metadata synced + # TYPE blocks_meta_synced gauge + blocks_meta_synced{state="corrupted-meta-json"} 0 + blocks_meta_synced{state="duplicate"} 0 + blocks_meta_synced{state="failed"} 0 + blocks_meta_synced{state="label-excluded"} 0 + blocks_meta_synced{state="loaded"} 1 + blocks_meta_synced{state="marked-for-deletion"} 1 + blocks_meta_synced{state="marked-for-no-compact"} 0 + blocks_meta_synced{state="no-meta-json"} 1 + blocks_meta_synced{state="time-excluded"} 0 + + # HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts + # TYPE blocks_meta_syncs_total counter + blocks_meta_syncs_total 3 + `), "blocks_meta_syncs_total", "blocks_meta_sync_failures_total", "blocks_meta_synced")) + }) +} + +func TestMetaFetcher_ShouldNotIssueAnyAPICallToObjectStorageIfAllBlockMetasAreCached(t *testing.T) { + var ( + ctx = context.Background() + logger = log.NewNopLogger() + fetcherDir = t.TempDir() + ) + + // Create a bucket client. + bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: t.TempDir()}) + require.NoError(t, err) + + // Upload few blocks. + block1ID, block1Dir := createTestBlock(t) + require.NoError(t, Upload(ctx, logger, bkt, block1Dir, nil)) + block2ID, block2Dir := createTestBlock(t) + require.NoError(t, Upload(ctx, logger, bkt, block2Dir, nil)) + + // Create a fetcher and fetch block metas to populate the cache on disk. + reg1 := prometheus.NewPedanticRegistry() + fetcher1, err := NewMetaFetcher(logger, 10, objstore.BucketWithMetrics("test", bkt, reg1), fetcherDir, nil, nil) + require.NoError(t, err) + actualMetas, _, actualErr := fetcher1.Fetch(ctx) + require.NoError(t, actualErr) + require.Len(t, actualMetas, 2) + require.Contains(t, actualMetas, block1ID) + require.Contains(t, actualMetas, block2ID) + + assert.NoError(t, testutil.GatherAndCompare(reg1, strings.NewReader(` + # HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket. + # TYPE thanos_objstore_bucket_operations_total counter + thanos_objstore_bucket_operations_total{bucket="test",operation="attributes"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="delete"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="exists"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="get"} 2 + thanos_objstore_bucket_operations_total{bucket="test",operation="get_range"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="iter"} 1 + thanos_objstore_bucket_operations_total{bucket="test",operation="upload"} 0 + `), "thanos_objstore_bucket_operations_total")) + + // Create a new fetcher and fetch blocks again. This time we expect all meta.json to be loaded from cache. + reg2 := prometheus.NewPedanticRegistry() + fetcher2, err := NewMetaFetcher(logger, 10, objstore.BucketWithMetrics("test", bkt, reg2), fetcherDir, nil, nil) + require.NoError(t, err) + actualMetas, _, actualErr = fetcher2.Fetch(ctx) + require.NoError(t, actualErr) + require.Len(t, actualMetas, 2) + require.Contains(t, actualMetas, block1ID) + require.Contains(t, actualMetas, block2ID) + + assert.NoError(t, testutil.GatherAndCompare(reg2, strings.NewReader(` + # HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket. + # TYPE thanos_objstore_bucket_operations_total counter + thanos_objstore_bucket_operations_total{bucket="test",operation="attributes"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="delete"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="exists"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="get"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="get_range"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="iter"} 1 + thanos_objstore_bucket_operations_total{bucket="test",operation="upload"} 0 + `), "thanos_objstore_bucket_operations_total")) +} + +func createTestBlock(t *testing.T) (blockID ulid.ULID, blockDir string) { + var err error + + parentDir := t.TempDir() + series := []labels.Labels{ + labels.FromStrings(labels.MetricName, "series_1"), + labels.FromStrings(labels.MetricName, "series_2"), + labels.FromStrings(labels.MetricName, "series_3"), + } + + blockID, err = CreateBlock(context.Background(), parentDir, series, 100, 0, 1000, nil) + require.NoError(t, err) + + blockDir = filepath.Join(parentDir, blockID.String()) + return +} diff --git a/pkg/storage/tsdb/block/global_markers.go b/pkg/storage/tsdb/block/global_markers.go index 81ea6df5964..3720734859e 100644 --- a/pkg/storage/tsdb/block/global_markers.go +++ b/pkg/storage/tsdb/block/global_markers.go @@ -6,11 +6,15 @@ package block import ( + "context" "fmt" + "path" "path/filepath" "strings" "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/thanos-io/objstore" ) const ( @@ -60,3 +64,25 @@ func NoCompactMarkFilepath(blockID ulid.ULID) string { func IsNoCompactMarkFilename(name string) (ulid.ULID, bool) { return isMarkFilename(name, NoCompactMarkFilename) } + +// ListBlockDeletionMarks looks for block deletion marks in the global markers location +// and returns a map containing all blocks having a deletion mark and their location in the +// bucket. +func ListBlockDeletionMarks(ctx context.Context, bkt objstore.BucketReader) (map[ulid.ULID]struct{}, error) { + discovered := map[ulid.ULID]struct{}{} + + // Find all markers in the storage. + err := bkt.Iter(ctx, MarkersPathname+"/", func(name string) error { + if err := ctx.Err(); err != nil { + return err + } + + if blockID, ok := IsDeletionMarkFilename(path.Base(name)); ok { + discovered[blockID] = struct{}{} + } + + return nil + }) + + return discovered, errors.Wrap(err, "list block deletion marks") +} diff --git a/pkg/storage/tsdb/block/global_markers_test.go b/pkg/storage/tsdb/block/global_markers_test.go index 50d02c8cf77..f47129a9fec 100644 --- a/pkg/storage/tsdb/block/global_markers_test.go +++ b/pkg/storage/tsdb/block/global_markers_test.go @@ -6,10 +6,15 @@ package block import ( + "context" + "strings" "testing" "github.com/oklog/ulid" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + mimir_testutil "github.com/grafana/mimir/pkg/storage/tsdb/testutil" ) func TestDeletionMarkFilepath(t *testing.T) { @@ -57,3 +62,35 @@ func TestIsNoCompactMarkFilename(t *testing.T) { assert.True(t, ok) assert.Equal(t, expected, actual) } + +func TestListBlockDeletionMarks(t *testing.T) { + var ( + ctx = context.Background() + block1 = ulid.MustNew(1, nil) + block2 = ulid.MustNew(2, nil) + block3 = ulid.MustNew(3, nil) + ) + + t.Run("should return an empty map on empty bucket", func(t *testing.T) { + bkt, _ := mimir_testutil.PrepareFilesystemBucket(t) + + actualMarks, actualErr := ListBlockDeletionMarks(ctx, bkt) + require.NoError(t, actualErr) + assert.Empty(t, actualMarks) + }) + + t.Run("should return a map with the locations of the block deletion marks found", func(t *testing.T) { + bkt, _ := mimir_testutil.PrepareFilesystemBucket(t) + + require.NoError(t, bkt.Upload(ctx, DeletionMarkFilepath(block1), strings.NewReader("{}"))) + require.NoError(t, bkt.Upload(ctx, NoCompactMarkFilepath(block2), strings.NewReader("{}"))) + require.NoError(t, bkt.Upload(ctx, DeletionMarkFilepath(block3), strings.NewReader("{}"))) + + actualMarks, actualErr := ListBlockDeletionMarks(ctx, bkt) + require.NoError(t, actualErr) + assert.Equal(t, map[ulid.ULID]struct{}{ + block1: {}, + block3: {}, + }, actualMarks) + }) +} diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index 1df2506e79c..9468f6604ee 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -169,17 +169,11 @@ func (w *Updater) updateBlockIndexEntry(ctx context.Context, id ulid.ULID) (*Blo func (w *Updater) updateBlockDeletionMarks(ctx context.Context, old []*BlockDeletionMark) ([]*BlockDeletionMark, error) { out := make([]*BlockDeletionMark, 0, len(old)) - discovered := map[ulid.ULID]struct{}{} // Find all markers in the storage. - err := w.bkt.Iter(ctx, block.MarkersPathname+"/", func(name string) error { - if blockID, ok := block.IsDeletionMarkFilename(path.Base(name)); ok { - discovered[blockID] = struct{}{} - } - return nil - }) + discovered, err := block.ListBlockDeletionMarks(ctx, w.bkt) if err != nil { - return nil, errors.Wrap(err, "list block deletion marks") + return nil, err } // Since deletion marks are immutable, all markers already existing in the index can just be copied. From 28ea758968cea0956791dcfc83f59541dd338893 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 25 May 2023 10:29:13 +0200 Subject: [PATCH 2/4] Added CHANGELOG entry Signed-off-by: Marco Pracucci --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 81ceba69d85..19564421d85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ * [ENHANCEMENT] Store-gateway: record index header loading time separately in `cortex_bucket_store_series_request_stage_duration_seconds{stage="load_index_header"}`. Now index header loading will be visible in the "Mimir / Queries" dashboard in the "Series request p99/average latency" panels. #5011 #5062 * [ENHANCEMENT] Querier and ingester: add experimental support for streaming chunks from ingesters to queriers while evaluating queries. This can be enabled with `-querier.prefer-streaming-chunks=true`. #4886 * [ENHANCEMENT] Update Docker base images from `alpine:3.17.3` to `alpine:3.18.0`. #5065 +* [ENHANCEMENT] Compactor: reduced the number of object storage `HEAD` API calls issued when syncing block's `meta.json` files. #5063 * [BUGFIX] Metadata API: Mimir will now return an empty object when no metadata is available, matching Prometheus. #4782 * [BUGFIX] Store-gateway: add collision detection on expanded postings and individual postings cache keys. #4770 * [BUGFIX] Ruler: Support the `type=alert|record` query parameter for the API endpoint `/api/v1/rules`. #4302 From 357768d8c81b70f32b7078e4d40d7b9efdad16eb Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 25 May 2023 11:43:03 +0200 Subject: [PATCH 3/4] Improve CHANGELOG entry Signed-off-by: Marco Pracucci --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 19564421d85..3b23108c347 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,7 +52,7 @@ * [ENHANCEMENT] Store-gateway: record index header loading time separately in `cortex_bucket_store_series_request_stage_duration_seconds{stage="load_index_header"}`. Now index header loading will be visible in the "Mimir / Queries" dashboard in the "Series request p99/average latency" panels. #5011 #5062 * [ENHANCEMENT] Querier and ingester: add experimental support for streaming chunks from ingesters to queriers while evaluating queries. This can be enabled with `-querier.prefer-streaming-chunks=true`. #4886 * [ENHANCEMENT] Update Docker base images from `alpine:3.17.3` to `alpine:3.18.0`. #5065 -* [ENHANCEMENT] Compactor: reduced the number of object storage `HEAD` API calls issued when syncing block's `meta.json` files. #5063 +* [ENHANCEMENT] Compactor: reduced the number of "object exists" API calls issued by the compactor to the object storage when syncing block's `meta.json` files. #5063 * [BUGFIX] Metadata API: Mimir will now return an empty object when no metadata is available, matching Prometheus. #4782 * [BUGFIX] Store-gateway: add collision detection on expanded postings and individual postings cache keys. #4770 * [BUGFIX] Ruler: Support the `type=alert|record` query parameter for the API endpoint `/api/v1/rules`. #4302 From acc7ef695bf38769389d38c243d1fbe3f4e0a507 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 25 May 2023 11:43:46 +0200 Subject: [PATCH 4/4] Update pkg/storage/tsdb/block/fetcher.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Peter Štibraný --- pkg/storage/tsdb/block/fetcher.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/storage/tsdb/block/fetcher.go b/pkg/storage/tsdb/block/fetcher.go index 735fc7e0c0b..a2130cca3d9 100644 --- a/pkg/storage/tsdb/block/fetcher.go +++ b/pkg/storage/tsdb/block/fetcher.go @@ -193,9 +193,8 @@ func (f *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*Meta, error) // When we'll try to read it from object storage (later on), it will fail with ErrorSyncMetaNotFound // which is correctly handled by the caller (partial block). // - // - The block upload is completed: this is the normal case. Meta.json file still exists in the - // object storage and it's expected to match the locally cached one (because immutable by design). - // + // - The block upload is completed: this is the normal case. meta.json file still exists in the + // object storage and it's expected to match the locally cached one (because it's immutable by design). // - The block has been marked for deletion: the deletion hasn't started yet, so the full block (including // the meta.json file) is still in the object storage. This case is not different than the previous one. //