Skip to content

Commit

Permalink
Added tests to reproduce #2459.
Browse files Browse the repository at this point in the history
Related to: #2459

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Apr 18, 2020
1 parent c983b0b commit bff732c
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 18 deletions.
197 changes: 180 additions & 17 deletions test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package e2e_test

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"os"
Expand All @@ -17,9 +19,12 @@ import (
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/objstore/s3"
Expand All @@ -29,18 +34,26 @@ import (
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
)

type blockDesc struct {
series []labels.Labels
extLset labels.Labels
mint int64
maxt int64
}

func (b *blockDesc) Create(ctx context.Context, dir string, delay time.Duration) (ulid.ULID, error) {
if delay == 0*time.Second {
return e2eutil.CreateBlock(ctx, dir, b.series, 120, b.mint, b.maxt, b.extLset, 0)
}
return e2eutil.CreateBlockWithBlockDelay(ctx, dir, b.series, 120, b.mint, b.maxt, delay, b.extLset, 0)
}

func TestCompactWithStoreGateway(t *testing.T) {
t.Parallel()

l := log.NewLogfmtLogger(os.Stdout)
type blockDesc struct {
series []labels.Labels
extLset labels.Labels
mint int64
maxt int64
}
logger := log.NewLogfmtLogger(os.Stdout)

delay := 30 * time.Minute
justAfterConsistencyDelay := 30 * time.Minute
// Make sure to take realistic timestamp for start. This is to align blocks as if they would be aligned on Prometheus.
// To have deterministic compaction, let's have fixed date:
now, err := time.Parse(time.RFC3339, "2020-03-24T08:00:00Z")
Expand Down Expand Up @@ -266,7 +279,7 @@ func TestCompactWithStoreGateway(t *testing.T) {
m := e2edb.NewMinio(8080, bucket)
testutil.Ok(t, s.StartAndWaitReady(m))

bkt, err := s3.NewBucketWithConfig(l, s3.Config{
bkt, err := s3.NewBucketWithConfig(logger, s3.Config{
Bucket: bucket,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Expand All @@ -280,11 +293,73 @@ func TestCompactWithStoreGateway(t *testing.T) {

rawBlockIDs := map[ulid.ULID]struct{}{}
for _, b := range blocks {
id, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, b.series, 120, b.mint, b.maxt, delay, b.extLset, 0)
id, err := b.Create(ctx, dir, justAfterConsistencyDelay)
testutil.Ok(t, err)
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id.String()), id.String()))
testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String()))
rawBlockIDs[id] = struct{}{}
}
{
// On top of that, add couple of other tricky cases with different meta.
malformedBase := blockDesc{
series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")},
extLset: labels.FromStrings("case", "malformed-things", "replica", "101"),
mint: timestamp.FromTime(now),
maxt: timestamp.FromTime(now.Add(2 * time.Hour)),
}

// New Partial block.
id, err := malformedBase.Create(ctx, dir, 0*time.Second)
testutil.Ok(t, err)
testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename)))
testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String()))

// New Partial block + deletion mark.
id, err = malformedBase.Create(ctx, dir, 0*time.Second)
testutil.Ok(t, err)
testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename)))
testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, prometheus.NewCounter(prometheus.CounterOpts{})))
testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String()))

// Partial block after consistency delay.
id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay)
testutil.Ok(t, err)
testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename)))
testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String()))

// Partial block after consistency delay + deletion mark.
id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay)
testutil.Ok(t, err)
testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename)))
testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, prometheus.NewCounter(prometheus.CounterOpts{})))
testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String()))

// Partial block after consistency delay + old deletion mark ready to be deleted.
id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay)
testutil.Ok(t, err)
testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename)))
deletionMark, err := json.Marshal(metadata.DeletionMark{
ID: id,
// Deletion threshold is usually 2 days.
DeletionTime: time.Now().Add(-50 * time.Hour).Unix(),
Version: metadata.DeletionMarkVersion1,
})
testutil.Ok(t, err)
testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.DeletionMarkFilename), bytes.NewBuffer(deletionMark)))
testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String()))

// Partial block after delete threshold.
id, err = malformedBase.Create(ctx, dir, 50*time.Hour)
testutil.Ok(t, err)
testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename)))
testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String()))

// Partial block after delete threshold + deletion mark.
id, err = malformedBase.Create(ctx, dir, 50*time.Hour)
testutil.Ok(t, err)
testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename)))
testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, prometheus.NewCounter(prometheus.CounterOpts{})))
testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String()))
}

svcConfig := client.BucketConfig{
Type: client.S3,
Expand All @@ -299,7 +374,7 @@ func TestCompactWithStoreGateway(t *testing.T) {
str, err := e2ethanos.NewStoreGW(s.SharedDir(), "1", svcConfig)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(str))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs))), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7)), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified"))

Expand Down Expand Up @@ -354,7 +429,7 @@ func TestCompactWithStoreGateway(t *testing.T) {
c, err := e2ethanos.NewCompactor(s.SharedDir(), "expect-to-halt", svcConfig, nil)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(c))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs))), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7)), "thanos_blocks_meta_synced"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified"))

Expand All @@ -365,6 +440,7 @@ func TestCompactWithStoreGateway(t *testing.T) {
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_iterations_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_for_deletion_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_vertical_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compact_group_compactions_failures_total"))
Expand All @@ -380,18 +456,100 @@ func TestCompactWithStoreGateway(t *testing.T) {

testutil.Ok(t, s.Stop(c))
})
t.Run("native vertical deduplication should kick in", func(t *testing.T) {
t.Run("dedup enabled; compactor should work as expected", func(t *testing.T) {
c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, "--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica")
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(c))

// NOTE: We cannot assert on intermediate `thanos_blocks_meta_` metrics as those are gauge and change dynamically due to many
// compaction groups. Wait for at least first compaction iteration (next is in 5m).
testutil.Ok(t, c.WaitSumMetrics(e2e.Greater(0), "thanos_compactor_iterations_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(16), "thanos_compactor_blocks_cleaned_total")) // This should be 17.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(17), "thanos_compactor_blocks_marked_for_deletion_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) // We should have 1.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(5), "thanos_compact_group_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_vertical_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(12), "thanos_compact_group_compaction_runs_started_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(12), "thanos_compact_group_compaction_runs_completed_total"))

testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_downsample_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_downsample_failures_total"))

// We had 8 deletions based on 3 compactios, so 3 new blocks.
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(
len(rawBlockIDs)+7+ // (32)
// 2x 4-block compaction, 2-block compaction, 3-block compaction, 2x 3-block vertical compaction.
-(2*4+2+3+2*3)+ // Deletions (18)
6, // Newly compacted blocks.
)), "thanos_blocks_meta_synced")) // Should be +6 not +7 as one was rdy to be removed.
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))

testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_halted"))
// Make sure compactor does not modify anything else over time.
testutil.Ok(t, s.Stop(c))

ctx, cancel = context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()

// Check if query detects new blocks.
queryAndAssert(t, ctx, q.HTTPEndpoint(),
fmt.Sprintf(`count_over_time({a="1"}[13h] offset %ds)`, int64(time.Since(now.Add(12*time.Hour)).Seconds())),
promclient.QueryOptions{
Deduplicate: false, // This should be false, so that we can be sure deduplication was offline.
},
model.Vector{
// NOTE(bwplotka): Even after deduplication some series has still replica labels. This is because those blocks did not overlap yet with anything.
// This is fine as querier deduplication will remove it if needed.
{Value: 360, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "no-compaction", "replica": "1"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready", "replica": "1"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-after-dedup"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-after-dedup"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready-after-dedup"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready-after-dedup"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready-after-dedup", "replica": "1"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "1", "case": "a-partial-overlap-dedup-ready"}},
{Value: 360, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "a-partial-overlap-dedup-ready"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "a-partial-overlap-dedup-ready", "replica": "1"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "a-partial-overlap-dedup-ready"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "a-partial-overlap-dedup-ready"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "a-partial-overlap-dedup-ready", "replica": "1"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "1", "case": "partial-multi-replica-overlap-dedup-ready"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "partial-multi-replica-overlap-dedup-ready", "replica": "1", "rule_replica": "1"}},
{Value: 240, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "partial-multi-replica-overlap-dedup-ready"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "partial-multi-replica-overlap-dedup-ready"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "partial-multi-replica-overlap-dedup-ready"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "partial-multi-replica-overlap-dedup-ready", "replica": "1", "rule_replica": "1"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "1", "case": "full-replica-overlap-dedup-ready"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "full-replica-overlap-dedup-ready"}},
{Value: 240, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "full-replica-overlap-dedup-ready", "replica": "1"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "full-replica-overlap-dedup-ready"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "full-replica-overlap-dedup-ready", "replica": "1"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "full-replica-overlap-dedup-ready", "replica": "1"}},
},
)

testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(16), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(16), "thanos_compactor_blocks_marked_for_deletion_total"))
// Store view:
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)-16+5)), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified"))
})

t.Run("dedup enabled; no delete delay; compactor should work and remove things as expected", func(t *testing.T) {
c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, "--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica", "--delete-delay=0s")
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(c))

// NOTE: We cannot assert on intermediate `thanos_blocks_meta_` metrics as those are gauge and change dynamically due to many
// compaction groups. Wait for at least first compaction iteration (next is in 5m).
testutil.Ok(t, c.WaitSumMetrics(e2e.Greater(0), "thanos_compactor_iterations_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(16), "thanos_compactor_blocks_cleaned_total")) // This should be 17.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(17), "thanos_compactor_blocks_marked_for_deletion_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) // We should have 1.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(5), "thanos_compact_group_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_vertical_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_failures_total"))
Expand All @@ -402,7 +560,12 @@ func TestCompactWithStoreGateway(t *testing.T) {
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_downsample_failures_total"))

// We had 8 deletions based on 3 compactios, so 3 new blocks.
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)-16+5)), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(
len(rawBlockIDs)+7+ // (32)
// 2x 4-block compaction, 2-block compaction, 3-block compaction, 2x 3-block vertical compaction.
-(2*4+2+3+2*3)+ // Deletions (18)
6, // Newly compacted blocks.
)), "thanos_blocks_meta_synced")) // Should be +6 not +7 as one was rdy to be removed.
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))

testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_halted"))
Expand Down
1 change: 0 additions & 1 deletion test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,6 @@ func NewCompactor(sharedDir string, name string, bucketConfig client.BucketConfi
"--data-dir": container,
"--objstore.config": string(bktConfigBytes),
"--http-address": ":80",
"--delete-delay": "0s",
"--block-sync-concurrency": "20",
"--selector.relabel-config": string(relabelConfigBytes),
"--wait": "",
Expand Down

0 comments on commit bff732c

Please sign in to comment.