diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index 5e4b601dcf..9524b960dd 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -4,7 +4,9 @@ package e2e_test import ( + "bytes" "context" + "encoding/json" "fmt" "net/http" "os" @@ -17,9 +19,13 @@ import ( e2edb "github.com/cortexproject/cortex/integration/e2e/db" "github.com/go-kit/kit/log" "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/objstore/s3" @@ -29,18 +35,26 @@ import ( "github.com/thanos-io/thanos/test/e2e/e2ethanos" ) +type blockDesc struct { + series []labels.Labels + extLset labels.Labels + mint int64 + maxt int64 +} + +func (b *blockDesc) Create(ctx context.Context, dir string, delay time.Duration) (ulid.ULID, error) { + if delay == 0*time.Second { + return e2eutil.CreateBlock(ctx, dir, b.series, 120, b.mint, b.maxt, b.extLset, 0) + } + return e2eutil.CreateBlockWithBlockDelay(ctx, dir, b.series, 120, b.mint, b.maxt, delay, b.extLset, 0) +} + func TestCompactWithStoreGateway(t *testing.T) { t.Parallel() - l := log.NewLogfmtLogger(os.Stdout) - type blockDesc struct { - series []labels.Labels - extLset labels.Labels - mint int64 - maxt int64 - } + logger := log.NewLogfmtLogger(os.Stdout) - delay := 30 * time.Minute + justAfterConsistencyDelay := 30 * time.Minute // Make sure to take realistic timestamp for start. This is to align blocks as if they would be aligned on Prometheus. // To have deterministic compaction, let's have fixed date: now, err := time.Parse(time.RFC3339, "2020-03-24T08:00:00Z") @@ -266,7 +280,7 @@ func TestCompactWithStoreGateway(t *testing.T) { m := e2edb.NewMinio(8080, bucket) testutil.Ok(t, s.StartAndWaitReady(m)) - bkt, err := s3.NewBucketWithConfig(l, s3.Config{ + bkt, err := s3.NewBucketWithConfig(logger, s3.Config{ Bucket: bucket, AccessKey: e2edb.MinioAccessKey, SecretKey: e2edb.MinioSecretKey, @@ -280,11 +294,73 @@ func TestCompactWithStoreGateway(t *testing.T) { rawBlockIDs := map[ulid.ULID]struct{}{} for _, b := range blocks { - id, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, b.series, 120, b.mint, b.maxt, delay, b.extLset, 0) + id, err := b.Create(ctx, dir, justAfterConsistencyDelay) testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id.String()), id.String())) + testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) rawBlockIDs[id] = struct{}{} } + { + // On top of that, add couple of other tricky cases with different meta. + malformedBase := blockDesc{ + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("case", "malformed-things", "replica", "101"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + } + + // New Partial block. + id, err := malformedBase.Create(ctx, dir, 0*time.Second) + testutil.Ok(t, err) + testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) + testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) + + // New Partial block + deletion mark. + id, err = malformedBase.Create(ctx, dir, 0*time.Second) + testutil.Ok(t, err) + testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) + testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) + testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) + + // Partial block after consistency delay. + id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay) + testutil.Ok(t, err) + testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) + testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) + + // Partial block after consistency delay + deletion mark. + id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay) + testutil.Ok(t, err) + testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) + testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) + testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) + + // Partial block after consistency delay + old deletion mark ready to be deleted. + id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay) + testutil.Ok(t, err) + testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) + deletionMark, err := json.Marshal(metadata.DeletionMark{ + ID: id, + // Deletion threshold is usually 2 days. + DeletionTime: time.Now().Add(-50 * time.Hour).Unix(), + Version: metadata.DeletionMarkVersion1, + }) + testutil.Ok(t, err) + testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.DeletionMarkFilename), bytes.NewBuffer(deletionMark))) + testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) + + // Partial block after delete threshold. + id, err = malformedBase.Create(ctx, dir, 50*time.Hour) + testutil.Ok(t, err) + testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) + testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) + + // Partial block after delete threshold + deletion mark. + id, err = malformedBase.Create(ctx, dir, 50*time.Hour) + testutil.Ok(t, err) + testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) + testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) + testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) + } svcConfig := client.BucketConfig{ Type: client.S3, @@ -299,7 +375,7 @@ func TestCompactWithStoreGateway(t *testing.T) { str, err := e2ethanos.NewStoreGW(s.SharedDir(), "1", svcConfig) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(str)) - testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs))), "thanos_blocks_meta_synced")) + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7)), "thanos_blocks_meta_synced")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified")) @@ -349,12 +425,50 @@ func TestCompactWithStoreGateway(t *testing.T) { {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, }, ) + // Store view: + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7)), "thanos_blocks_meta_synced")) + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified")) + + expectedEndVector := model.Vector{ + // NOTE(bwplotka): Even after deduplication some series has still replica labels. This is because those blocks did not overlap yet with anything. + // This is fine as querier deduplication will remove it if needed. + {Value: 360, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "no-compaction", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-after-dedup"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-after-dedup"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready-after-dedup"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready-after-dedup"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready-after-dedup", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "1", "case": "a-partial-overlap-dedup-ready"}}, + {Value: 360, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "a-partial-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "a-partial-overlap-dedup-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "a-partial-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "a-partial-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "a-partial-overlap-dedup-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "1", "case": "partial-multi-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "partial-multi-replica-overlap-dedup-ready", "replica": "1", "rule_replica": "1"}}, + {Value: 240, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "partial-multi-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "partial-multi-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "partial-multi-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "partial-multi-replica-overlap-dedup-ready", "replica": "1", "rule_replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "1", "case": "full-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "full-replica-overlap-dedup-ready"}}, + {Value: 240, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "full-replica-overlap-dedup-ready"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, + } t.Run("no replica label with overlaps should halt compactor", func(t *testing.T) { c, err := e2ethanos.NewCompactor(s.SharedDir(), "expect-to-halt", svcConfig, nil) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(c)) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs))), "thanos_blocks_meta_synced")) + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7)), "thanos_blocks_meta_synced")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified")) @@ -365,6 +479,7 @@ func TestCompactWithStoreGateway(t *testing.T) { testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_iterations_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_for_deletion_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_vertical_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compact_group_compactions_failures_total")) @@ -380,7 +495,8 @@ func TestCompactWithStoreGateway(t *testing.T) { testutil.Ok(t, s.Stop(c)) }) - t.Run("native vertical deduplication should kick in", func(t *testing.T) { + t.Run("dedup enabled; compactor should work as expected", func(t *testing.T) { + // We expect 2x 4-block compaction, 2-block vertical compaction, 2x 3-block compaction. c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, "--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(c)) @@ -388,11 +504,11 @@ func TestCompactWithStoreGateway(t *testing.T) { // NOTE: We cannot assert on intermediate `thanos_blocks_meta_` metrics as those are gauge and change dynamically due to many // compaction groups. Wait for at least first compaction iteration (next is in 5m). testutil.Ok(t, c.WaitSumMetrics(e2e.Greater(0), "thanos_compactor_iterations_total")) - - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(16), "thanos_compactor_blocks_cleaned_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(16), "thanos_compactor_blocks_marked_for_deletion_total")) - + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total")) // This should be 1 [BUG no 1]. TODO: fix. + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2*4+2+2*3+1), "thanos_compactor_blocks_marked_for_deletion_total")) // 17. + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) // We should have 1. testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(5), "thanos_compact_group_compactions_total")) + // TODO(bwplotka): This is confusing, should be either normal compaction or vertical not both (?) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_vertical_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_failures_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(12), "thanos_compact_group_compaction_runs_started_total")) @@ -401,8 +517,10 @@ func TestCompactWithStoreGateway(t *testing.T) { testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_downsample_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_downsample_failures_total")) - // We had 8 deletions based on 3 compactios, so 3 new blocks. - testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)-16+5)), "thanos_blocks_meta_synced")) + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64( + len(rawBlockIDs)+7+ + 5, // 5 compactions, 5 newly added blocks. + )), "thanos_blocks_meta_synced")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_halted")) @@ -418,43 +536,55 @@ func TestCompactWithStoreGateway(t *testing.T) { promclient.QueryOptions{ Deduplicate: false, // This should be false, so that we can be sure deduplication was offline. }, - model.Vector{ - // NOTE(bwplotka): Even after deduplication some series has still replica labels. This is because those blocks did not overlap yet with anything. - // This is fine as querier deduplication will remove it if needed. - {Value: 360, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "no-compaction", "replica": "1"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready", "replica": "1"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-after-dedup"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-after-dedup"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready-after-dedup"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready-after-dedup"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready-after-dedup", "replica": "1"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "1", "case": "a-partial-overlap-dedup-ready"}}, - {Value: 360, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "a-partial-overlap-dedup-ready"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "a-partial-overlap-dedup-ready", "replica": "1"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "a-partial-overlap-dedup-ready"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "a-partial-overlap-dedup-ready"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "a-partial-overlap-dedup-ready", "replica": "1"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "1", "case": "partial-multi-replica-overlap-dedup-ready"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "partial-multi-replica-overlap-dedup-ready", "replica": "1", "rule_replica": "1"}}, - {Value: 240, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "partial-multi-replica-overlap-dedup-ready"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "partial-multi-replica-overlap-dedup-ready"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "partial-multi-replica-overlap-dedup-ready"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "partial-multi-replica-overlap-dedup-ready", "replica": "1", "rule_replica": "1"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "1", "case": "full-replica-overlap-dedup-ready"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "full-replica-overlap-dedup-ready"}}, - {Value: 240, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "full-replica-overlap-dedup-ready"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, - {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, + expectedEndVector, + ) + // Store view: + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7+5)), "thanos_blocks_meta_synced")) + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified")) + }) + + t.Run("dedup enabled; no delete delay; compactor should work and remove things as expected", func(t *testing.T) { + c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, "--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica", "--delete-delay=0s") + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(c)) + + // NOTE: We cannot assert on intermediate `thanos_blocks_meta_` metrics as those are gauge and change dynamically due to many + // compaction groups. Wait for at least first compaction iteration (next is in 5m). + testutil.Ok(t, c.WaitSumMetrics(e2e.Greater(0), "thanos_compactor_iterations_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(16), "thanos_compactor_blocks_cleaned_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_for_deletion_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) // We should have 1. + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_vertical_compactions_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_failures_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(6), "thanos_compact_group_compaction_runs_started_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(6), "thanos_compact_group_compaction_runs_completed_total")) + + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_downsample_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_downsample_failures_total")) + + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7+5-16)), "thanos_blocks_meta_synced")) + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) + + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_halted")) + // Make sure compactor does not modify anything else over time. + testutil.Ok(t, s.Stop(c)) + + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + // Check if query detects new blocks. + queryAndAssert(t, ctx, q.HTTPEndpoint(), + fmt.Sprintf(`count_over_time({a="1"}[13h] offset %ds)`, int64(time.Since(now.Add(12*time.Hour)).Seconds())), + promclient.QueryOptions{ + Deduplicate: false, // This should be false, so that we can be sure deduplication was offline. }, + expectedEndVector, ) // Store view: - testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)-16+5)), "thanos_blocks_meta_synced")) + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7-16+5)), "thanos_blocks_meta_synced")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified")) }) diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index c014a36d7b..c0d4bc7f35 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -373,7 +373,6 @@ func NewCompactor(sharedDir string, name string, bucketConfig client.BucketConfi "--data-dir": container, "--objstore.config": string(bktConfigBytes), "--http-address": ":80", - "--delete-delay": "0s", "--block-sync-concurrency": "20", "--selector.relabel-config": string(relabelConfigBytes), "--wait": "",