Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compactor: do not issue object storage HEAD API calls to check if cached meta.json files still exist #5063

Merged
merged 4 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* [ENHANCEMENT] Store-gateway: record index header loading time separately in `cortex_bucket_store_series_request_stage_duration_seconds{stage="load_index_header"}`. Now index header loading will be visible in the "Mimir / Queries" dashboard in the "Series request p99/average latency" panels. #5011 #5062
* [ENHANCEMENT] Querier and ingester: add experimental support for streaming chunks from ingesters to queriers while evaluating queries. This can be enabled with `-querier.prefer-streaming-chunks=true`. #4886
* [ENHANCEMENT] Update Docker base images from `alpine:3.17.3` to `alpine:3.18.0`. #5065
* [ENHANCEMENT] Compactor: reduced the number of "object exists" API calls issued by the compactor to the object storage when syncing block's `meta.json` files. #5063
* [BUGFIX] Metadata API: Mimir will now return an empty object when no metadata is available, matching Prometheus. #4782
* [BUGFIX] Store-gateway: add collision detection on expanded postings and individual postings cache keys. #4770
* [BUGFIX] Ruler: Support the `type=alert|record` query parameter for the API endpoint `<prometheus-http-prefix>/api/v1/rules`. #4302
Expand Down
60 changes: 6 additions & 54 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type DeduplicateFilter interface {
type Syncer struct {
logger log.Logger
bkt objstore.Bucket
fetcher block.MetadataFetcher
fetcher *block.MetaFetcher
mtx sync.Mutex
blocks map[ulid.ULID]*block.Meta
metrics *syncerMetrics
Expand Down Expand Up @@ -83,7 +83,7 @@ func newSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion prometh

// NewMetaSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, deduplicateBlocksFilter DeduplicateFilter, blocksMarkedForDeletion prometheus.Counter) (*Syncer, error) {
func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher *block.MetaFetcher, deduplicateBlocksFilter DeduplicateFilter, blocksMarkedForDeletion prometheus.Counter) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -102,7 +102,9 @@ func (s *Syncer) SyncMetas(ctx context.Context) error {
s.mtx.Lock()
defer s.mtx.Unlock()

metas, _, err := s.fetcher.Fetch(ctx)
// While fetching blocks, we filter out blocks that were marked for deletion.
// No deletion delay is used -- all blocks with deletion marker are ignored, and not considered for compaction.
metas, _, err := s.fetcher.FetchWithoutMarkedForDeletion(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -777,7 +779,7 @@ func (c *BucketCompactor) Compact(ctx context.Context, maxCompactionTime time.Du
// Blocks that were compacted are garbage collected after each Compaction.
// However if compactor crashes we need to resolve those on startup.
if err := c.sy.GarbageCollect(ctx); err != nil {
return errors.Wrap(err, "garbage")
return errors.Wrap(err, "blocks garbage collect")
}

jobs, err := c.grouper.Groups(c.sy.Metas())
Expand Down Expand Up @@ -971,53 +973,3 @@ func hasNonZeroULIDs(ids []ulid.ULID) bool {

return false
}

// ExcludeMarkedForDeletionFilter is a filter that filters out the blocks that are marked for deletion.
// Compared to IgnoreDeletionMarkFilter filter from Thanos, this implementation doesn't use any deletion delay,
// and only uses marker files under bucketindex.MarkersPathname.
type ExcludeMarkedForDeletionFilter struct {
bkt objstore.InstrumentedBucketReader

deletionMarkMap map[ulid.ULID]struct{}
}

func NewExcludeMarkedForDeletionFilter(bkt objstore.InstrumentedBucketReader) *ExcludeMarkedForDeletionFilter {
return &ExcludeMarkedForDeletionFilter{
bkt: bkt,
}
}

// DeletionMarkBlocks returns block ids that were marked for deletion.
// It is safe to call this method only after Filter has finished, and it is also safe to manipulate the map between calls to Filter.
func (f *ExcludeMarkedForDeletionFilter) DeletionMarkBlocks() map[ulid.ULID]struct{} {
return f.deletionMarkMap
}

// Filter filters out blocks that are marked for deletion.
// It also builds the map returned by DeletionMarkBlocks() method.
func (f *ExcludeMarkedForDeletionFilter) Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error {
deletionMarkMap := make(map[ulid.ULID]struct{})

// Find all markers in the storage.
err := f.bkt.Iter(ctx, block.MarkersPathname+"/", func(name string) error {
if err := ctx.Err(); err != nil {
return err
}

if blockID, ok := block.IsDeletionMarkFilename(path.Base(name)); ok {
_, exists := metas[blockID]
if exists {
deletionMarkMap[blockID] = struct{}{}
synced.WithLabelValues(block.MarkedForDeletionMeta).Inc()
delete(metas, blockID)
}
}
return nil
})
if err != nil {
return errors.Wrap(err, "list block deletion marks")
}

f.deletionMarkMap = deletionMarkMap
return nil
}
4 changes: 0 additions & 4 deletions pkg/compactor/bucket_compactor_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,9 @@ func TestGroupCompactE2E(t *testing.T) {

reg := prometheus.NewRegistry()

ignoreDeletionMarkFilter := NewExcludeMarkedForDeletionFilter(objstore.WithNoopInstr(bkt))
duplicateBlocksFilter := NewShardAwareDeduplicateFilter()
noCompactMarkerFilter := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), true)
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
noCompactMarkerFilter,
})
Expand Down Expand Up @@ -520,11 +518,9 @@ func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T
}

blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
ignoreDeletionMarkFilter := NewExcludeMarkedForDeletionFilter(objstore.WithNoopInstr(bkt))

duplicateBlocksFilter := NewShardAwareDeduplicateFilter()
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
})
require.NoError(t, err)
Expand Down
4 changes: 0 additions & 4 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,9 +697,6 @@ func (c *MultitenantCompactor) compactUser(ctx context.Context, userID string) e

userLogger := util_log.WithUserID(userID, c.logger)

// While fetching blocks, we filter out blocks that were marked for deletion by using ExcludeMarkedForDeletionFilter.
// No delay is used -- all blocks with deletion marker are ignored, and not considered for compaction.
excludeMarkedForDeletionFilter := NewExcludeMarkedForDeletionFilter(userBucket)
// Filters out duplicate blocks that can be formed from two or more overlapping
// blocks that fully submatches the source blocks of the older blocks.
deduplicateBlocksFilter := NewShardAwareDeduplicateFilter()
Expand All @@ -714,7 +711,6 @@ func (c *MultitenantCompactor) compactUser(ctx context.Context, userID string) e
mimir_tsdb.DeprecatedTenantIDExternalLabel,
mimir_tsdb.DeprecatedIngesterIDExternalLabel,
}),
excludeMarkedForDeletionFilter,
deduplicateBlocksFilter,
// removes blocks that should not be compacted due to being marked so.
NewNoCompactionMarkFilter(userBucket, true),
Expand Down
8 changes: 4 additions & 4 deletions pkg/compactor/split_merge_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,10 +721,10 @@ func TestMultitenantCompactor_ShouldSupportSplitAndMergeCompactor(t *testing.T)
userBucket,
fetcherDir,
reg,
[]block.MetadataFilter{NewExcludeMarkedForDeletionFilter(userBucket)},
nil,
)
require.NoError(t, err)
metas, partials, err := fetcher.Fetch(ctx)
metas, partials, err := fetcher.FetchWithoutMarkedForDeletion(ctx)
require.NoError(t, err)
require.Empty(t, partials)

Expand Down Expand Up @@ -812,10 +812,10 @@ func TestMultitenantCompactor_ShouldGuaranteeSeriesShardingConsistencyOverTheTim
userBucket,
fetcherDir,
reg,
[]block.MetadataFilter{NewExcludeMarkedForDeletionFilter(userBucket)},
nil,
)
require.NoError(t, err)
metas, partials, err := fetcher.Fetch(ctx)
metas, partials, err := fetcher.FetchWithoutMarkedForDeletion(ctx)
require.NoError(t, err)
require.Empty(t, partials)

Expand Down
96 changes: 74 additions & 22 deletions pkg/storage/tsdb/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type GaugeVec interface {
WithLabelValues(lvs ...string) prometheus.Gauge
}

// Filter allows filtering or modifying metas from the provided map or returns error.
// MetadataFilter allows filtering or modifying metas from the provided map or returns error.
type MetadataFilter interface {
Filter(ctx context.Context, metas map[ulid.ULID]*Meta, synced GaugeVec) error
}
Expand Down Expand Up @@ -176,23 +176,35 @@ var (
)

// loadMeta returns metadata from object storage or error.
// It returns `ErrorSyncMetaNotFound` and `ErrorSyncMetaCorrupted` sentinel errors in those cases.
// It returns ErrorSyncMetaNotFound and ErrorSyncMetaCorrupted sentinel errors in those cases.
func (f *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*Meta, error) {
var (
metaFile = path.Join(id.String(), MetaFilename)
cachedBlockDir = filepath.Join(f.cacheDir, id.String())
)

// TODO(bwplotka): If that causes problems (obj store rate limits), add longer ttl to cached items.
// For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix.
ok, err := f.bkt.Exists(ctx, metaFile)
if err != nil {
return nil, errors.Wrapf(err, "meta.json file exists: %v", metaFile)
}
if !ok {
return nil, ErrorSyncMetaNotFound
}

// Block meta.json file is immutable, so we lookup the cache as first thing without issuing
// any API call to the object storage. This significantly reduce the pressure on the object
// storage.
//
// Details of all possible cases:
//
// - The block upload is in progress: the meta.json file is guaranteed to be uploaded at last.
// When we'll try to read it from object storage (later on), it will fail with ErrorSyncMetaNotFound
// which is correctly handled by the caller (partial block).
//
// - The block upload is completed: this is the normal case. meta.json file still exists in the
// object storage and it's expected to match the locally cached one (because it's immutable by design).
// - The block has been marked for deletion: the deletion hasn't started yet, so the full block (including
// the meta.json file) is still in the object storage. This case is not different than the previous one.
//
// - The block deletion is in progress: loadMeta() function may return the cached meta.json while it should
// return ErrorSyncMetaNotFound. This is a race condition that could happen even if we check the meta.json
// file in the storage, because the deletion could start right after we check it but before the MetaFetcher
// completes its sync.
//
// - The block has been deleted: the loadMeta() function will not be called at all, because the block
// was not discovered while iterating the bucket since all its files were already deleted.
if m, seen := f.cached[id]; seen {
return m, nil
}
Expand Down Expand Up @@ -253,14 +265,17 @@ func (f *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*Meta, error)
type response struct {
metas map[ulid.ULID]*Meta
partial map[ulid.ULID]error

// If metaErr > 0 it means incomplete view, so some metas, failed to be loaded.
metaErrs multierror.MultiError

noMetas float64
corruptedMetas float64
// Track the number of blocks not returned because of various reasons.
noMetasCount float64
corruptedMetasCount float64
markedForDeletionCount float64
}

func (f *MetaFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
func (f *MetaFetcher) fetchMetadata(ctx context.Context, excludeMarkedForDeletion bool) (interface{}, error) {
var (
resp = response{
metas: make(map[ulid.ULID]*Meta),
Expand All @@ -271,6 +286,19 @@ func (f *MetaFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
mtx sync.Mutex
)
level.Debug(f.logger).Log("msg", "fetching meta data", "concurrency", f.concurrency)

// Get the list of blocks marked for deletion so that we'll exclude them (if required).
var markedForDeletion map[ulid.ULID]struct{}
if excludeMarkedForDeletion {
var err error

markedForDeletion, err = ListBlockDeletionMarks(ctx, f.bkt)
if err != nil {
return nil, err
}
}

// Run workers.
for i := 0; i < f.concurrency; i++ {
eg.Go(func() error {
for id := range ch {
Expand All @@ -284,11 +312,11 @@ func (f *MetaFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {

if errors.Is(errors.Cause(err), ErrorSyncMetaNotFound) {
mtx.Lock()
resp.noMetas++
resp.noMetasCount++
mtx.Unlock()
} else if errors.Is(errors.Cause(err), ErrorSyncMetaCorrupted) {
mtx.Lock()
resp.corruptedMetas++
resp.corruptedMetasCount++
mtx.Unlock()
} else {
mtx.Lock()
Expand All @@ -314,6 +342,12 @@ func (f *MetaFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
return nil
}

// If requested, skip any block marked for deletion.
if _, marked := markedForDeletion[id]; excludeMarkedForDeletion && marked {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't need to check for excludeMarkedForDeletion again. If it's false then markedForDeletion will be nil (ie. empty map).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I added the check here as well is to make the code more obvious when reading it. Given it doesn't add extra complexity, I would keep it.

resp.markedForDeletionCount++
return nil
}

select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -378,7 +412,22 @@ func (f *MetaFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
// It's caller responsibility to not change the returned metadata files. Maps can be modified.
//
// Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing.
func (f *MetaFetcher) Fetch(ctx context.Context) (_ map[ulid.ULID]*Meta, _ map[ulid.ULID]error, err error) {
func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*Meta, partials map[ulid.ULID]error, err error) {
metas, partials, err = f.fetch(ctx, false)
return
}

// FetchWithoutMarkedForDeletion returns all block metas as well as partial blocks (blocks without or with corrupted meta file) from the bucket.
// This function excludes all blocks for deletion (no deletion delay applied).
// It's caller responsibility to not change the returned metadata files. Maps can be modified.
//
// Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing.
func (f *MetaFetcher) FetchWithoutMarkedForDeletion(ctx context.Context) (metas map[ulid.ULID]*Meta, partials map[ulid.ULID]error, err error) {
metas, partials, err = f.fetch(ctx, true)
return
}

func (f *MetaFetcher) fetch(ctx context.Context, excludeMarkedForDeletion bool) (_ map[ulid.ULID]*Meta, _ map[ulid.ULID]error, err error) {
start := time.Now()
defer func() {
f.metrics.SyncDuration.Observe(time.Since(start).Seconds())
Expand All @@ -392,7 +441,7 @@ func (f *MetaFetcher) Fetch(ctx context.Context) (_ map[ulid.ULID]*Meta, _ map[u
// Run this in thread safe run group.
v, err := f.g.Do("", func() (i interface{}, err error) {
// NOTE: First go routine context will go through.
return f.fetchMetadata(ctx)
return f.fetchMetadata(ctx, excludeMarkedForDeletion)
})
if err != nil {
return nil, nil, err
Expand All @@ -406,8 +455,11 @@ func (f *MetaFetcher) Fetch(ctx context.Context) (_ map[ulid.ULID]*Meta, _ map[u
}

f.metrics.Synced.WithLabelValues(FailedMeta).Set(float64(len(resp.metaErrs)))
f.metrics.Synced.WithLabelValues(NoMeta).Set(resp.noMetas)
f.metrics.Synced.WithLabelValues(CorruptedMeta).Set(resp.corruptedMetas)
f.metrics.Synced.WithLabelValues(NoMeta).Set(resp.noMetasCount)
f.metrics.Synced.WithLabelValues(CorruptedMeta).Set(resp.corruptedMetasCount)
if excludeMarkedForDeletion {
f.metrics.Synced.WithLabelValues(MarkedForDeletionMeta).Set(resp.markedForDeletionCount)
}

for _, filter := range f.filters {
// NOTE: filter can update synced metric accordingly to the reason of the exclude.
Expand All @@ -434,7 +486,7 @@ func (f *MetaFetcher) countCached() int {
return len(f.cached)
}

// Special label that will have an ULID of the meta.json being referenced to.
// BlockIDLabel is a special label that will have an ULID of the meta.json being referenced to.
const BlockIDLabel = "__block_id"

// IgnoreDeletionMarkFilter is a filter that filters out the blocks that are marked for deletion after a given delay.
Expand Down
Loading