From dbc712a29b60897927d634728737906a55fc8b1a Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Mon, 15 Jun 2020 12:19:51 +0100 Subject: [PATCH] shipper: Be strict about upload order unless it's specified so. Signed-off-by: Bartlomiej Plotka --- CHANGELOG.md | 1 + cmd/thanos/config.go | 14 +++- cmd/thanos/receive.go | 9 +++ cmd/thanos/rule.go | 10 ++- cmd/thanos/sidecar.go | 4 +- docs/components/rule.md | 10 +++ docs/components/sidecar.md | 12 ++- docs/operating/troubleshooting.md | 3 +- pkg/receive/multitsdb.go | 26 ++++--- pkg/receive/multitsdb_test.go | 2 + pkg/shipper/shipper.go | 123 +++++++++++++++--------------- pkg/shipper/shipper_e2e_test.go | 4 +- pkg/shipper/shipper_test.go | 24 ++---- 13 files changed, 143 insertions(+), 99 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4506d56778b..8b6eee16729 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2416](https://github.com/thanos-io/thanos/pull/2416) Bucket: Fixed issue #2416 bug in `inspect --sort-by` doesn't work correctly in all cases. - [#2719](https://github.com/thanos-io/thanos/pull/2719) Query: `irate` and `resets` use now counter downsampling aggregations. - [#2705](https://github.com/thanos-io/thanos/pull/2705) minio-go: Added support for `af-south-1` and `eu-south-1` regions. +- [#2753](https://github.com/thanos-io/thanos/issues/2753) Sidecar,Receive,Rule: Fixed cause for compactor overlapping blocks in upload error cases. ### Added diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index 1b46311adcb..20fd80a4425 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -112,17 +112,23 @@ func (rc *reloaderConfig) registerFlag(cmd *kingpin.CmdClause) *reloaderConfig { } type shipperConfig struct { - uploadCompacted bool - ignoreBlockSize bool + uploadCompacted bool + ignoreBlockSize bool + allowOutOfOrderUpload bool } func (sc *shipperConfig) registerFlag(cmd *kingpin.CmdClause) *shipperConfig { cmd.Flag("shipper.upload-compacted", - "If true sidecar will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus. Do it once and then disable the flag when done."). + "If true shipper will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus. Do it once and then disable the flag when done."). Default("false").BoolVar(&sc.uploadCompacted) cmd.Flag("shipper.ignore-unequal-block-size", - "If true sidecar will not require prometheus min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled on your Prometheus instance, as in the worst case it can result in ~2h data loss for your Thanos bucket storage."). + "If true shipper will not require prometheus min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled on your Prometheus instance, as in the worst case it can result in ~2h data loss for your Thanos bucket storage."). Default("false").Hidden().BoolVar(&sc.ignoreBlockSize) + cmd.Flag("shipper.allow-out-of-order-uploads", + "If true shipper will skip failed block uploads in given iteration and retry later. This means that some newer blocks might uploaded sooner than older."+ + "This will trigger compaction without those blocks, as a resulted create 'valid overlap situation'. Set it to true if you have vertical compaction enabled and wish to upload blocks as soon as possible without caring"+ + "about order."). + Default("false").BoolVar(&sc.allowOutOfOrderUpload) return sc } diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 972e02f2504..129c6f2f282 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -89,6 +89,12 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool() + allowOutOfOrderUpload := cmd.Flag("shipper.allow-out-of-order-uploads", + "If true shipper will skip failed block uploads in given iteration and retry later. This means that some newer blocks might uploaded sooner than older."+ + "This will trigger compaction without those blocks, as a resulted create 'valid overlap situation'. Set it to true if you have vertical compaction enabled and wish to upload blocks as soon as possible without caring"+ + "about order."). + Default("false").Bool() + m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { lset, err := parseFlagLabels(*labelStrs) if err != nil { @@ -157,6 +163,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { *replicationFactor, time.Duration(*forwardTimeout), comp, + *allowOutOfOrderUpload, ) } } @@ -195,6 +202,7 @@ func runReceive( replicationFactor uint64, forwardTimeout time.Duration, comp component.SourceStoreAPI, + allowOutOfOrderUpload bool, ) error { logger = log.With(logger, "component", "receive") level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice") @@ -246,6 +254,7 @@ func runReceive( lset, tenantLabelName, bkt, + allowOutOfOrderUpload, ) writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs) webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 2789178f899..4bae7c771cf 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -114,6 +114,12 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { dnsSDResolver := cmd.Flag("query.sd-dns-resolver", "Resolver to use. Possible options: [golang, miekgdns]"). Default("golang").Hidden().String() + allowOutOfOrderUpload := cmd.Flag("shipper.allow-out-of-order-uploads", + "If true shipper will skip failed block uploads in given iteration and retry later. This means that some newer blocks might uploaded sooner than older."+ + "This will trigger compaction without those blocks, as a resulted create 'valid overlap situation'. Set it to true if you have vertical compaction enabled and wish to upload blocks as soon as possible without caring"+ + "about order."). + Default("false").Bool() + m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, reload <-chan struct{}, _ bool) error { lset, err := parseFlagLabels(*labelStrs) if err != nil { @@ -197,6 +203,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { time.Duration(*dnsSDInterval), *dnsSDResolver, comp, + *allowOutOfOrderUpload, ) } } @@ -283,6 +290,7 @@ func runRule( dnsSDInterval time.Duration, dnsSDResolver string, comp component.Component, + allowOutOfOrderUpload bool, ) error { metrics := newRuleMetrics(reg) @@ -615,7 +623,7 @@ func runRule( } }() - s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource) + s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, allowOutOfOrderUpload) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index e16f7534bc9..83cb5e3081d 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -273,9 +273,9 @@ func runSidecar( var s *shipper.Shipper if conf.shipper.uploadCompacted { - s = shipper.NewWithCompacted(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource) + s = shipper.NewWithCompacted(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, conf.shipper.allowOutOfOrderUpload) } else { - s = shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource) + s = shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, conf.shipper.allowOutOfOrderUpload) } return runutil.Repeat(30*time.Second, ctx.Done(), func() error { diff --git a/docs/components/rule.md b/docs/components/rule.md index e85c2d45abd..388002e1a4c 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -382,6 +382,16 @@ Flags: (used as a fallback) --query.sd-dns-interval=30s Interval between DNS resolutions. + --shipper.allow-out-of-order-uploads + If true shipper will skip failed block uploads + in given iteration and retry later. This means + that some newer blocks might uploaded sooner + than older.This will trigger compaction without + those blocks, as a resulted create 'valid + overlap situation'. Set it to true if you have + vertical compaction enabled and wish to upload + blocks as soon as possible without caringabout + order. ``` diff --git a/docs/components/sidecar.md b/docs/components/sidecar.md index 1eb2a0eb2fd..27338bc3536 100644 --- a/docs/components/sidecar.md +++ b/docs/components/sidecar.md @@ -151,11 +151,21 @@ Flags: details: https://thanos.io/storage.md/#configuration --shipper.upload-compacted - If true sidecar will try to upload compacted + If true shipper will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus. Do it once and then disable the flag when done. + --shipper.allow-out-of-order-uploads + If true shipper will skip failed block uploads + in given iteration and retry later. This means + that some newer blocks might uploaded sooner + than older.This will trigger compaction without + those blocks, as a resulted create 'valid + overlap situation'. Set it to true if you have + vertical compaction enabled and wish to upload + blocks as soon as possible without caringabout + order. --min-time=0000-01-01T00:00:00Z Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened diff --git a/docs/operating/troubleshooting.md b/docs/operating/troubleshooting.md index 5e2b9fa3bb3..bb53fe5c51c 100644 --- a/docs/operating/troubleshooting.md +++ b/docs/operating/troubleshooting.md @@ -7,7 +7,6 @@ slug: /troubleshooting.md # Troubleshooting; Common cases - ## Overlaps **Block overlap**: Set of blocks with exactly the same external labels in meta.json and for the same time or overlapping time period. @@ -29,6 +28,7 @@ Checking producers log for such ULID, and checking meta.json (e.g if sample stat ### Reasons +- You are running Thanos (sidecar, ruler or receive) older than 0.13.0. During transient upload errors there was possibility to have overlaps caused by compactor not being aware of all blocks See: [this](https://github.com/thanos-io/thanos/issues/2753) - Misconfiguraiton of sidecar/ruler: Same external labels or no external labels across many block producers. - Running multiple compactors for single block "stream", even for short duration. - Manually uploading blocks to the bucket. @@ -36,6 +36,7 @@ Checking producers log for such ULID, and checking meta.json (e.g if sample stat ### Solutions +- Upgrade sidecar, ruler and receive to 0.13.0+ - Compactor can be blocked for some time, but if it is urgent. Mitigate by removing overlap or better: Backing up somewhere else (you can rename block ULID to non-ulid). - Who uploaded the block? Search for logs with this ULID across all sidecars/rulers. Check access logs to object storage. Check debug/metas or meta.json of problematic block to see how blocks looks like and what is the `source`. - Determine what you misconfigured. diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index a9ba831f3d3..672a8b0fa9c 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -38,8 +38,9 @@ type MultiTSDB struct { labels labels.Labels bucket objstore.Bucket - mtx *sync.RWMutex - tenants map[string]*tenant + mtx *sync.RWMutex + tenants map[string]*tenant + allowOutOfOrderUpload bool } func NewMultiTSDB( @@ -50,21 +51,23 @@ func NewMultiTSDB( labels labels.Labels, tenantLabelName string, bucket objstore.Bucket, + allowOutOfOrderUpload bool, ) *MultiTSDB { if l == nil { l = log.NewNopLogger() } return &MultiTSDB{ - dataDir: dataDir, - logger: l, - reg: reg, - tsdbOpts: tsdbOpts, - mtx: &sync.RWMutex{}, - tenants: map[string]*tenant{}, - labels: labels, - tenantLabelName: tenantLabelName, - bucket: bucket, + dataDir: dataDir, + logger: l, + reg: reg, + tsdbOpts: tsdbOpts, + mtx: &sync.RWMutex{}, + tenants: map[string]*tenant{}, + labels: labels, + tenantLabelName: tenantLabelName, + bucket: bucket, + allowOutOfOrderUpload: allowOutOfOrderUpload, } } @@ -256,6 +259,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan t.bucket, func() labels.Labels { return lbls }, metadata.ReceiveSource, + t.allowOutOfOrderUpload, ) } diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index f6e7a012dde..a05244cf040 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -43,6 +43,7 @@ func TestMultiTSDB(t *testing.T) { labels.FromStrings("replica", "01"), "tenant_id", nil, + false, ) defer testutil.Ok(t, m.Flush()) @@ -109,6 +110,7 @@ func TestMultiTSDB(t *testing.T) { labels.FromStrings("replica", "01"), "tenant_id", nil, + false, ) defer testutil.Ok(t, m.Flush()) diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 0139a643825..6a79689ea07 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -72,13 +72,15 @@ func newMetrics(reg prometheus.Registerer, uploadCompacted bool) *metrics { // Shipper watches a directory for matching files and directories and uploads // them to a remote data store. type Shipper struct { - logger log.Logger - dir string - metrics *metrics - bucket objstore.Bucket - labels func() labels.Labels - source metadata.SourceType - uploadCompacted bool + logger log.Logger + dir string + metrics *metrics + bucket objstore.Bucket + labels func() labels.Labels + source metadata.SourceType + + uploadCompacted bool + allowOutOfOrderUploads bool } // New creates a new shipper that detects new TSDB blocks in dir and uploads them @@ -90,6 +92,7 @@ func New( bucket objstore.Bucket, lbls func() labels.Labels, source metadata.SourceType, + allowOutOfOrderUploads bool, ) *Shipper { if logger == nil { logger = log.NewNopLogger() @@ -99,12 +102,13 @@ func New( } return &Shipper{ - logger: logger, - dir: dir, - bucket: bucket, - labels: lbls, - metrics: newMetrics(r, false), - source: source, + logger: logger, + dir: dir, + bucket: bucket, + labels: lbls, + metrics: newMetrics(r, false), + source: source, + allowOutOfOrderUploads: allowOutOfOrderUploads, } } @@ -118,6 +122,7 @@ func NewWithCompacted( bucket objstore.Bucket, lbls func() labels.Labels, source metadata.SourceType, + allowOutOfOrderUploads bool, ) *Shipper { if logger == nil { logger = log.NewNopLogger() @@ -127,13 +132,14 @@ func NewWithCompacted( } return &Shipper{ - logger: logger, - dir: dir, - bucket: bucket, - labels: lbls, - metrics: newMetrics(r, true), - source: source, - uploadCompacted: true, + logger: logger, + dir: dir, + bucket: bucket, + labels: lbls, + metrics: newMetrics(r, true), + source: source, + uploadCompacted: true, + allowOutOfOrderUploads: allowOutOfOrderUploads, } } @@ -153,23 +159,23 @@ func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) { minTime = math.MaxInt64 maxSyncTime = math.MinInt64 - if err := s.iterBlockMetas(func(m *metadata.Meta) error { + metas, err := s.blockMetasFromOldest() + if err != nil { + return 0, 0, err + } + for _, m := range metas { if m.MinTime < minTime { minTime = m.MinTime } if _, ok := hasUploaded[m.ULID]; ok && m.MaxTime > maxSyncTime { maxSyncTime = m.MaxTime } - return nil - }); err != nil { - return 0, 0, errors.Wrap(err, "iter Block metas for timestamp") } if minTime == math.MaxInt64 { // No block yet found. We cannot assume any min block size so propagate 0 minTime. minTime = 0 } - return minTime, maxSyncTime, nil } @@ -272,70 +278,76 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { checker = newLazyOverlapChecker(s.logger, s.bucket, s.labels) uploadErrs int ) - // Sync non compacted blocks first. - if err := s.iterBlockMetas(func(m *metadata.Meta) error { + + metas, err := s.blockMetasFromOldest() + if err != nil { + return 0, err + } + for _, m := range metas { // Do not sync a block if we already uploaded or ignored it. If it's no longer found in the bucket, // it was generally removed by the compaction process. if _, uploaded := hasUploaded[m.ULID]; uploaded { meta.Uploaded = append(meta.Uploaded, m.ULID) - return nil + continue } if m.Stats.NumSamples == 0 { // Ignore empty blocks. level.Debug(s.logger).Log("msg", "ignoring empty block", "block", m.ULID) - return nil + continue } // Check against bucket if the meta file for this block exists. ok, err := s.bucket.Exists(ctx, path.Join(m.ULID.String(), block.MetaFilename)) if err != nil { - return errors.Wrap(err, "check exists") + return 0, errors.Wrap(err, "check exists") } if ok { - return nil + continue } // We only ship of the first compacted block level as normal flow. if m.Compaction.Level > 1 { if !s.uploadCompacted { - return nil + continue } if err := checker.IsOverlapping(ctx, m.BlockMeta); err != nil { + if !s.allowOutOfOrderUploads { + return 0, errors.Errorf("Found overlap or error during sync, cannot upload compacted block, details: %v", err) + } level.Error(s.logger).Log("msg", "found overlap or error during sync, cannot upload compacted block", "err", err) uploadErrs++ - return nil + continue } } if err := s.upload(ctx, m); err != nil { - level.Error(s.logger).Log("msg", "shipping failed", "block", m.ULID, "err", err) + if !s.allowOutOfOrderUploads { + return 0, errors.Wrapf(err, "upload %v", m.ULID) + } + // No error returned, just log line. This is because we want other blocks to be uploaded even // though this one failed. It will be retried on second Sync iteration. + level.Error(s.logger).Log("msg", "shipping failed", "block", m.ULID, "err", err) uploadErrs++ - return nil + continue } meta.Uploaded = append(meta.Uploaded, m.ULID) - uploaded++ s.metrics.uploads.Inc() - return nil - }); err != nil { - s.metrics.dirSyncFailures.Inc() - return uploaded, errors.Wrap(err, "iter local block metas") } - if err := WriteMetaFile(s.logger, s.dir, meta); err != nil { level.Warn(s.logger).Log("msg", "updating meta file failed", "err", err) } s.metrics.dirSyncs.Inc() - if uploadErrs > 0 { s.metrics.uploadFailures.Add(float64(uploadErrs)) return uploaded, errors.Errorf("failed to sync %v blocks", uploadErrs) - } else if s.uploadCompacted { + } + + if s.uploadCompacted { s.metrics.uploadedCompacted.Set(1) } return uploaded, nil @@ -378,15 +390,12 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error { return block.Upload(ctx, s.logger, s.bucket, updir) } -// iterBlockMetas calls f with the block meta for each block found in dir -// sorted by minTime asc. It logs an error and continues if it cannot access a -// meta.json file. -// If f returns an error, the function returns with the same error. -func (s *Shipper) iterBlockMetas(f func(m *metadata.Meta) error) error { - var metas []*metadata.Meta +// blockMetasFromOldest returns the block meta of each block found in dir +// sorted by minTime asc. +func (s *Shipper) blockMetasFromOldest() (metas []*metadata.Meta, _ error) { fis, err := ioutil.ReadDir(s.dir) if err != nil { - return errors.Wrap(err, "read dir") + return nil, errors.Wrap(err, "read dir") } names := make([]string, 0, len(fis)) for _, fi := range fis { @@ -400,29 +409,21 @@ func (s *Shipper) iterBlockMetas(f func(m *metadata.Meta) error) error { fi, err := os.Stat(dir) if err != nil { - level.Warn(s.logger).Log("msg", "open file failed", "err", err) - continue + return nil, errors.Wrapf(err, "stat block %v", dir) } if !fi.IsDir() { continue } m, err := metadata.Read(dir) if err != nil { - level.Warn(s.logger).Log("msg", "reading meta file failed", "err", err) - continue + return nil, errors.Wrapf(err, "read metadata for block %v", dir) } metas = append(metas, m) } sort.Slice(metas, func(i, j int) bool { return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime }) - for _, m := range metas { - - if err := f(m); err != nil { - return err - } - } - return nil + return metas, nil } func hardlinkBlock(src, dst string) error { diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index b307b165328..00df38104dd 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -38,7 +38,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) { }() extLset := labels.FromStrings("prometheus", "prom-1") - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, false) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -199,7 +199,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) { defer upcancel2() testutil.Ok(t, p.WaitPrometheusUp(upctx2)) - shipper := NewWithCompacted(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource) + shipper := NewWithCompacted(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, false) // Create 10 new blocks. 9 of them (non compacted) should be actually uploaded. var ( diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go index fd9b75706fb..32cdeedf3b7 100644 --- a/pkg/shipper/shipper_test.go +++ b/pkg/shipper/shipper_test.go @@ -26,7 +26,7 @@ func TestShipperTimestamps(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - s := New(nil, nil, dir, nil, nil, metadata.TestSource) + s := New(nil, nil, dir, nil, nil, metadata.TestSource, false) // Missing thanos meta file. _, _, err = s.Timestamps() @@ -84,7 +84,6 @@ func TestShipperTimestamps(t *testing.T) { } func TestIterBlockMetas(t *testing.T) { - var metas []*metadata.Meta dir, err := ioutil.TempDir("", "shipper-test") testutil.Ok(t, err) defer func() { @@ -124,13 +123,9 @@ func TestIterBlockMetas(t *testing.T) { }, })) - shipper := New(nil, nil, dir, nil, nil, metadata.TestSource) - if err := shipper.iterBlockMetas(func(m *metadata.Meta) error { - metas = append(metas, m) - return nil - }); err != nil { - testutil.Ok(t, err) - } + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false) + metas, err := shipper.blockMetasFromOldest() + testutil.Ok(t, err) testutil.Equals(t, sort.SliceIsSorted(metas, func(i, j int) bool { return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime }), true) @@ -167,11 +162,8 @@ func BenchmarkIterBlockMetas(b *testing.B) { }) b.ResetTimer() - shipper := New(nil, nil, dir, nil, nil, metadata.TestSource) - if err := shipper.iterBlockMetas(func(m *metadata.Meta) error { - metas = append(metas, m) - return nil - }); err != nil { - testutil.Ok(b, err) - } + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false) + + _, err = shipper.blockMetasFromOldest() + testutil.Ok(b, err) }