Skip to content

Commit

Permalink
Compactor: do not issue object storage HEAD API calls to check if cac…
Browse files Browse the repository at this point in the history
…hed meta.json files still exist (#5063)

* Compactor: do not issue object storage HEAD API calls on each block meta.json

Signed-off-by: Marco Pracucci <[email protected]>

* Added CHANGELOG entry

Signed-off-by: Marco Pracucci <[email protected]>

* Improve CHANGELOG entry

Signed-off-by: Marco Pracucci <[email protected]>

* Update pkg/storage/tsdb/block/fetcher.go

Co-authored-by: Peter Štibraný <[email protected]>

---------

Signed-off-by: Marco Pracucci <[email protected]>
Co-authored-by: Peter Štibraný <[email protected]>
  • Loading branch information
pracucci and pstibrany authored May 25, 2023
1 parent b0625d7 commit 5a795c3
Show file tree
Hide file tree
Showing 10 changed files with 495 additions and 96 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* [ENHANCEMENT] Store-gateway: record index header loading time separately in `cortex_bucket_store_series_request_stage_duration_seconds{stage="load_index_header"}`. Now index header loading will be visible in the "Mimir / Queries" dashboard in the "Series request p99/average latency" panels. #5011 #5062
* [ENHANCEMENT] Querier and ingester: add experimental support for streaming chunks from ingesters to queriers while evaluating queries. This can be enabled with `-querier.prefer-streaming-chunks=true`. #4886
* [ENHANCEMENT] Update Docker base images from `alpine:3.17.3` to `alpine:3.18.0`. #5065
* [ENHANCEMENT] Compactor: reduced the number of "object exists" API calls issued by the compactor to the object storage when syncing block's `meta.json` files. #5063
* [BUGFIX] Metadata API: Mimir will now return an empty object when no metadata is available, matching Prometheus. #4782
* [BUGFIX] Store-gateway: add collision detection on expanded postings and individual postings cache keys. #4770
* [BUGFIX] Ruler: Support the `type=alert|record` query parameter for the API endpoint `<prometheus-http-prefix>/api/v1/rules`. #4302
Expand Down
60 changes: 6 additions & 54 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type DeduplicateFilter interface {
type Syncer struct {
logger log.Logger
bkt objstore.Bucket
fetcher block.MetadataFetcher
fetcher *block.MetaFetcher
mtx sync.Mutex
blocks map[ulid.ULID]*block.Meta
metrics *syncerMetrics
Expand Down Expand Up @@ -83,7 +83,7 @@ func newSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion prometh

// NewMetaSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, deduplicateBlocksFilter DeduplicateFilter, blocksMarkedForDeletion prometheus.Counter) (*Syncer, error) {
func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher *block.MetaFetcher, deduplicateBlocksFilter DeduplicateFilter, blocksMarkedForDeletion prometheus.Counter) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -102,7 +102,9 @@ func (s *Syncer) SyncMetas(ctx context.Context) error {
s.mtx.Lock()
defer s.mtx.Unlock()

metas, _, err := s.fetcher.Fetch(ctx)
// While fetching blocks, we filter out blocks that were marked for deletion.
// No deletion delay is used -- all blocks with deletion marker are ignored, and not considered for compaction.
metas, _, err := s.fetcher.FetchWithoutMarkedForDeletion(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -777,7 +779,7 @@ func (c *BucketCompactor) Compact(ctx context.Context, maxCompactionTime time.Du
// Blocks that were compacted are garbage collected after each Compaction.
// However if compactor crashes we need to resolve those on startup.
if err := c.sy.GarbageCollect(ctx); err != nil {
return errors.Wrap(err, "garbage")
return errors.Wrap(err, "blocks garbage collect")
}

jobs, err := c.grouper.Groups(c.sy.Metas())
Expand Down Expand Up @@ -971,53 +973,3 @@ func hasNonZeroULIDs(ids []ulid.ULID) bool {

return false
}

// ExcludeMarkedForDeletionFilter is a filter that filters out the blocks that are marked for deletion.
// Compared to IgnoreDeletionMarkFilter filter from Thanos, this implementation doesn't use any deletion delay,
// and only uses marker files under bucketindex.MarkersPathname.
type ExcludeMarkedForDeletionFilter struct {
bkt objstore.InstrumentedBucketReader

deletionMarkMap map[ulid.ULID]struct{}
}

func NewExcludeMarkedForDeletionFilter(bkt objstore.InstrumentedBucketReader) *ExcludeMarkedForDeletionFilter {
return &ExcludeMarkedForDeletionFilter{
bkt: bkt,
}
}

// DeletionMarkBlocks returns block ids that were marked for deletion.
// It is safe to call this method only after Filter has finished, and it is also safe to manipulate the map between calls to Filter.
func (f *ExcludeMarkedForDeletionFilter) DeletionMarkBlocks() map[ulid.ULID]struct{} {
return f.deletionMarkMap
}

// Filter filters out blocks that are marked for deletion.
// It also builds the map returned by DeletionMarkBlocks() method.
func (f *ExcludeMarkedForDeletionFilter) Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error {
deletionMarkMap := make(map[ulid.ULID]struct{})

// Find all markers in the storage.
err := f.bkt.Iter(ctx, block.MarkersPathname+"/", func(name string) error {
if err := ctx.Err(); err != nil {
return err
}

if blockID, ok := block.IsDeletionMarkFilename(path.Base(name)); ok {
_, exists := metas[blockID]
if exists {
deletionMarkMap[blockID] = struct{}{}
synced.WithLabelValues(block.MarkedForDeletionMeta).Inc()
delete(metas, blockID)
}
}
return nil
})
if err != nil {
return errors.Wrap(err, "list block deletion marks")
}

f.deletionMarkMap = deletionMarkMap
return nil
}
4 changes: 0 additions & 4 deletions pkg/compactor/bucket_compactor_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,9 @@ func TestGroupCompactE2E(t *testing.T) {

reg := prometheus.NewRegistry()

ignoreDeletionMarkFilter := NewExcludeMarkedForDeletionFilter(objstore.WithNoopInstr(bkt))
duplicateBlocksFilter := NewShardAwareDeduplicateFilter()
noCompactMarkerFilter := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), true)
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
noCompactMarkerFilter,
})
Expand Down Expand Up @@ -520,11 +518,9 @@ func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T
}

blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
ignoreDeletionMarkFilter := NewExcludeMarkedForDeletionFilter(objstore.WithNoopInstr(bkt))

duplicateBlocksFilter := NewShardAwareDeduplicateFilter()
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
})
require.NoError(t, err)
Expand Down
4 changes: 0 additions & 4 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,9 +697,6 @@ func (c *MultitenantCompactor) compactUser(ctx context.Context, userID string) e

userLogger := util_log.WithUserID(userID, c.logger)

// While fetching blocks, we filter out blocks that were marked for deletion by using ExcludeMarkedForDeletionFilter.
// No delay is used -- all blocks with deletion marker are ignored, and not considered for compaction.
excludeMarkedForDeletionFilter := NewExcludeMarkedForDeletionFilter(userBucket)
// Filters out duplicate blocks that can be formed from two or more overlapping
// blocks that fully submatches the source blocks of the older blocks.
deduplicateBlocksFilter := NewShardAwareDeduplicateFilter()
Expand All @@ -714,7 +711,6 @@ func (c *MultitenantCompactor) compactUser(ctx context.Context, userID string) e
mimir_tsdb.DeprecatedTenantIDExternalLabel,
mimir_tsdb.DeprecatedIngesterIDExternalLabel,
}),
excludeMarkedForDeletionFilter,
deduplicateBlocksFilter,
// removes blocks that should not be compacted due to being marked so.
NewNoCompactionMarkFilter(userBucket, true),
Expand Down
8 changes: 4 additions & 4 deletions pkg/compactor/split_merge_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,10 +721,10 @@ func TestMultitenantCompactor_ShouldSupportSplitAndMergeCompactor(t *testing.T)
userBucket,
fetcherDir,
reg,
[]block.MetadataFilter{NewExcludeMarkedForDeletionFilter(userBucket)},
nil,
)
require.NoError(t, err)
metas, partials, err := fetcher.Fetch(ctx)
metas, partials, err := fetcher.FetchWithoutMarkedForDeletion(ctx)
require.NoError(t, err)
require.Empty(t, partials)

Expand Down Expand Up @@ -812,10 +812,10 @@ func TestMultitenantCompactor_ShouldGuaranteeSeriesShardingConsistencyOverTheTim
userBucket,
fetcherDir,
reg,
[]block.MetadataFilter{NewExcludeMarkedForDeletionFilter(userBucket)},
nil,
)
require.NoError(t, err)
metas, partials, err := fetcher.Fetch(ctx)
metas, partials, err := fetcher.FetchWithoutMarkedForDeletion(ctx)
require.NoError(t, err)
require.Empty(t, partials)

Expand Down
96 changes: 74 additions & 22 deletions pkg/storage/tsdb/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type GaugeVec interface {
WithLabelValues(lvs ...string) prometheus.Gauge
}

// Filter allows filtering or modifying metas from the provided map or returns error.
// MetadataFilter allows filtering or modifying metas from the provided map or returns error.
type MetadataFilter interface {
Filter(ctx context.Context, metas map[ulid.ULID]*Meta, synced GaugeVec) error
}
Expand Down Expand Up @@ -176,23 +176,35 @@ var (
)

// loadMeta returns metadata from object storage or error.
// It returns `ErrorSyncMetaNotFound` and `ErrorSyncMetaCorrupted` sentinel errors in those cases.
// It returns ErrorSyncMetaNotFound and ErrorSyncMetaCorrupted sentinel errors in those cases.
func (f *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*Meta, error) {
var (
metaFile = path.Join(id.String(), MetaFilename)
cachedBlockDir = filepath.Join(f.cacheDir, id.String())
)

// TODO(bwplotka): If that causes problems (obj store rate limits), add longer ttl to cached items.
// For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix.
ok, err := f.bkt.Exists(ctx, metaFile)
if err != nil {
return nil, errors.Wrapf(err, "meta.json file exists: %v", metaFile)
}
if !ok {
return nil, ErrorSyncMetaNotFound
}

// Block meta.json file is immutable, so we lookup the cache as first thing without issuing
// any API call to the object storage. This significantly reduce the pressure on the object
// storage.
//
// Details of all possible cases:
//
// - The block upload is in progress: the meta.json file is guaranteed to be uploaded at last.
// When we'll try to read it from object storage (later on), it will fail with ErrorSyncMetaNotFound
// which is correctly handled by the caller (partial block).
//
// - The block upload is completed: this is the normal case. meta.json file still exists in the
// object storage and it's expected to match the locally cached one (because it's immutable by design).
// - The block has been marked for deletion: the deletion hasn't started yet, so the full block (including
// the meta.json file) is still in the object storage. This case is not different than the previous one.
//
// - The block deletion is in progress: loadMeta() function may return the cached meta.json while it should
// return ErrorSyncMetaNotFound. This is a race condition that could happen even if we check the meta.json
// file in the storage, because the deletion could start right after we check it but before the MetaFetcher
// completes its sync.
//
// - The block has been deleted: the loadMeta() function will not be called at all, because the block
// was not discovered while iterating the bucket since all its files were already deleted.
if m, seen := f.cached[id]; seen {
return m, nil
}
Expand Down Expand Up @@ -253,14 +265,17 @@ func (f *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*Meta, error)
type response struct {
metas map[ulid.ULID]*Meta
partial map[ulid.ULID]error

// If metaErr > 0 it means incomplete view, so some metas, failed to be loaded.
metaErrs multierror.MultiError

noMetas float64
corruptedMetas float64
// Track the number of blocks not returned because of various reasons.
noMetasCount float64
corruptedMetasCount float64
markedForDeletionCount float64
}

func (f *MetaFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
func (f *MetaFetcher) fetchMetadata(ctx context.Context, excludeMarkedForDeletion bool) (interface{}, error) {
var (
resp = response{
metas: make(map[ulid.ULID]*Meta),
Expand All @@ -271,6 +286,19 @@ func (f *MetaFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
mtx sync.Mutex
)
level.Debug(f.logger).Log("msg", "fetching meta data", "concurrency", f.concurrency)

// Get the list of blocks marked for deletion so that we'll exclude them (if required).
var markedForDeletion map[ulid.ULID]struct{}
if excludeMarkedForDeletion {
var err error

markedForDeletion, err = ListBlockDeletionMarks(ctx, f.bkt)
if err != nil {
return nil, err
}
}

// Run workers.
for i := 0; i < f.concurrency; i++ {
eg.Go(func() error {
for id := range ch {
Expand All @@ -284,11 +312,11 @@ func (f *MetaFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {

if errors.Is(errors.Cause(err), ErrorSyncMetaNotFound) {
mtx.Lock()
resp.noMetas++
resp.noMetasCount++
mtx.Unlock()
} else if errors.Is(errors.Cause(err), ErrorSyncMetaCorrupted) {
mtx.Lock()
resp.corruptedMetas++
resp.corruptedMetasCount++
mtx.Unlock()
} else {
mtx.Lock()
Expand All @@ -314,6 +342,12 @@ func (f *MetaFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
return nil
}

// If requested, skip any block marked for deletion.
if _, marked := markedForDeletion[id]; excludeMarkedForDeletion && marked {
resp.markedForDeletionCount++
return nil
}

select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -378,7 +412,22 @@ func (f *MetaFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
// It's caller responsibility to not change the returned metadata files. Maps can be modified.
//
// Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing.
func (f *MetaFetcher) Fetch(ctx context.Context) (_ map[ulid.ULID]*Meta, _ map[ulid.ULID]error, err error) {
func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*Meta, partials map[ulid.ULID]error, err error) {
metas, partials, err = f.fetch(ctx, false)
return
}

// FetchWithoutMarkedForDeletion returns all block metas as well as partial blocks (blocks without or with corrupted meta file) from the bucket.
// This function excludes all blocks for deletion (no deletion delay applied).
// It's caller responsibility to not change the returned metadata files. Maps can be modified.
//
// Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing.
func (f *MetaFetcher) FetchWithoutMarkedForDeletion(ctx context.Context) (metas map[ulid.ULID]*Meta, partials map[ulid.ULID]error, err error) {
metas, partials, err = f.fetch(ctx, true)
return
}

func (f *MetaFetcher) fetch(ctx context.Context, excludeMarkedForDeletion bool) (_ map[ulid.ULID]*Meta, _ map[ulid.ULID]error, err error) {
start := time.Now()
defer func() {
f.metrics.SyncDuration.Observe(time.Since(start).Seconds())
Expand All @@ -392,7 +441,7 @@ func (f *MetaFetcher) Fetch(ctx context.Context) (_ map[ulid.ULID]*Meta, _ map[u
// Run this in thread safe run group.
v, err := f.g.Do("", func() (i interface{}, err error) {
// NOTE: First go routine context will go through.
return f.fetchMetadata(ctx)
return f.fetchMetadata(ctx, excludeMarkedForDeletion)
})
if err != nil {
return nil, nil, err
Expand All @@ -406,8 +455,11 @@ func (f *MetaFetcher) Fetch(ctx context.Context) (_ map[ulid.ULID]*Meta, _ map[u
}

f.metrics.Synced.WithLabelValues(FailedMeta).Set(float64(len(resp.metaErrs)))
f.metrics.Synced.WithLabelValues(NoMeta).Set(resp.noMetas)
f.metrics.Synced.WithLabelValues(CorruptedMeta).Set(resp.corruptedMetas)
f.metrics.Synced.WithLabelValues(NoMeta).Set(resp.noMetasCount)
f.metrics.Synced.WithLabelValues(CorruptedMeta).Set(resp.corruptedMetasCount)
if excludeMarkedForDeletion {
f.metrics.Synced.WithLabelValues(MarkedForDeletionMeta).Set(resp.markedForDeletionCount)
}

for _, filter := range f.filters {
// NOTE: filter can update synced metric accordingly to the reason of the exclude.
Expand All @@ -434,7 +486,7 @@ func (f *MetaFetcher) countCached() int {
return len(f.cached)
}

// Special label that will have an ULID of the meta.json being referenced to.
// BlockIDLabel is a special label that will have an ULID of the meta.json being referenced to.
const BlockIDLabel = "__block_id"

// IgnoreDeletionMarkFilter is a filter that filters out the blocks that are marked for deletion after a given delay.
Expand Down
Loading

0 comments on commit 5a795c3

Please sign in to comment.