Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring 0.19.0 rc fixes to main. #3978

Merged
merged 6 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ config:
trace:
enable: false
list_objects_version: ""
part_size: 134217728
part_size: 67108864
sse_config:
type: ""
kms_key_id: ""
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ replace (
github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab
// Update to v1.1.1 to make sure windows CI pass.
github.com/elastic/go-sysinfo => github.com/elastic/go-sysinfo v1.1.1

// TODO: Remove this: https://github.com/thanos-io/thanos/issues/3967.
github.com/minio/minio-go/v7 => github.com/bwplotka/minio-go/v7 v7.0.11-0.20210324165441-f9927e5255a6
// Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs.
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20210315220929-1cba1741828b
github.com/sercand/kuberesolver => github.com/sercand/kuberesolver v2.4.0+incompatible
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ github.com/blang/semver v3.5.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnweb
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/bwplotka/minio-go/v7 v7.0.11-0.20210324165441-f9927e5255a6 h1:h9SZ0jmAKjtrZF6iZ77/jdXdHr+Usn29itI669SVRp4=
github.com/bwplotka/minio-go/v7 v7.0.11-0.20210324165441-f9927e5255a6/go.mod h1:td4gW1ldOsj1PbSNS+WYK43j+P1XVhX/8W8awaYlBFo=
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v0.0.0-20181003080854-62661b46c409/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
Expand Down Expand Up @@ -888,9 +890,6 @@ github.com/minio/md5-simd v1.1.0 h1:QPfiOqlZH+Cj9teu0t9b1nTBfPbyTl16Of5MeuShdK4=
github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
github.com/minio/minio-go/v6 v6.0.44/go.mod h1:qD0lajrGW49lKZLtXKtCB4X/qkMf0a5tBvN2PaZg7Gg=
github.com/minio/minio-go/v6 v6.0.56/go.mod h1:KQMM+/44DSlSGSQWSfRrAZ12FVMmpWNuX37i2AX0jfI=
github.com/minio/minio-go/v7 v7.0.2/go.mod h1:dJ80Mv2HeGkYLH1sqS/ksz07ON6csH3S6JUMSQ2zAns=
github.com/minio/minio-go/v7 v7.0.10 h1:1oUKe4EOPUEhw2qnPQaPsJ0lmVTYLFu03SiItauXs94=
github.com/minio/minio-go/v7 v7.0.10/go.mod h1:td4gW1ldOsj1PbSNS+WYK43j+P1XVhX/8W8awaYlBFo=
github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU=
github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
Expand Down
4 changes: 3 additions & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,9 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {

ignoreDirs := []string{}
for _, gr := range groups {
ignoreDirs = append(ignoreDirs, gr.Key())
for _, grID := range gr.IDs() {
ignoreDirs = append(ignoreDirs, filepath.Join(gr.Key(), grID.String()))
}
}

if err := runutil.DeleteAll(c.compactDir, ignoreDirs...); err != nil {
Expand Down
91 changes: 91 additions & 0 deletions pkg/compactv2/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,97 @@ func TestCompactor_WriteSeries_e2e(t *testing.T) {
NumChunks: 2,
},
},
{
name: "1 blocks + delete modifier. For deletion request, full match is required. Delete the first two series",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "foo", Value: "bar"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "b", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "c", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}},
},
},
modifiers: []Modifier{WithDeletionModifier(
metadata.DeletionRequest{
Matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "a", "1"),
labels.MustNewMatcher(labels.MatchEqual, "b", "2"),
},
})},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "b", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "c", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}},
},
expectedChanges: "Deleted {a=\"1\", b=\"2\"} [{0 20}]\nDeleted {a=\"1\", b=\"2\", foo=\"bar\"} [{0 20}]\n",
expectedStats: tsdb.BlockStats{
NumSamples: 18,
NumSeries: 3,
NumChunks: 3,
},
},
{
name: "1 blocks + delete modifier. Deletion request contains non-equal matchers.",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "foo", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "foo", Value: "bar"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "3"}, {Name: "foo", Value: "baz"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "foo", Value: "bat"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},

// Label a is present but with an empty value.
{lset: labels.Labels{{Name: "a", Value: ""}, {Name: "foo", Value: "bat"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
// Series with unrelated labels.
{lset: labels.Labels{{Name: "c", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
},
},
modifiers: []Modifier{WithDeletionModifier(
metadata.DeletionRequest{
Matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "a", "1"),
labels.MustNewMatcher(labels.MatchRegexp, "foo", "^ba.$"),
},
})},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "a", Value: ""}, {Name: "foo", Value: "bat"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "foo", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "c", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "foo", Value: "bat"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
},
expectedChanges: "Deleted {a=\"2\", foo=\"bar\"} [{0 20}]\nDeleted {a=\"3\", foo=\"baz\"} [{0 20}]\n",
expectedStats: tsdb.BlockStats{
NumSamples: 36,
NumSeries: 6,
NumChunks: 12,
},
},
} {
t.Run(tcase.name, func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "test-series-writer")
Expand Down
5 changes: 1 addition & 4 deletions pkg/compactv2/modifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,9 @@ SeriesLoop:
for _, deletions := range d.d.deletions {
for _, m := range deletions.Matchers {
v := lbls.Get(m.Name)
if v == "" {
continue
}

// Only if all matchers in the deletion request are matched can we proceed to deletion.
if !m.Matches(v) {
if v == "" || !m.Matches(v) {
continue DeletionsLoop
}
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/objstore/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ var DefaultConfig = Config{
MaxIdleConnsPerHost: 100,
MaxConnsPerHost: 0,
},
// Minimum file size after which an HTTP multipart request should be used to upload objects to storage.
// Set to 128 MiB as in the minio client.
PartSize: 1024 * 1024 * 128,
PartSize: 1024 * 1024 * 64, // 64MB.
}

// Config stores the configuration for s3 bucket.
Expand All @@ -85,6 +83,7 @@ type Config struct {
TraceConfig TraceConfig `yaml:"trace"`
ListObjectsVersion string `yaml:"list_objects_version"`
// PartSize used for multipart upload. Only used if uploaded object size is known and larger than configured PartSize.
// NOTE we need to make sure this number does not produce more parts than 10 000.
PartSize uint64 `yaml:"part_size"`
SSEConfig SSEConfig `yaml:"sse_config"`
}
Expand Down Expand Up @@ -449,7 +448,6 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
size = -1
}

// partSize cannot be larger than object size.
partSize := b.partSize
if size < int64(partSize) {
partSize = 0
Expand Down
55 changes: 55 additions & 0 deletions pkg/objstore/s3/s3_e2e_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package s3_test

import (
"bytes"
"context"
"strings"
"testing"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/go-kit/kit/log"
"github.com/thanos-io/thanos/pkg/objstore/s3"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"

"github.com/thanos-io/thanos/pkg/testutil"
)

// Regression benchmark for https://github.com/thanos-io/thanos/issues/3917.
func BenchmarkUpload(b *testing.B) {
b.ReportAllocs()
ctx := context.Background()

s, err := e2e.NewScenario("e2e_bench_mino_client")
testutil.Ok(b, err)
b.Cleanup(e2ethanos.CleanScenario(b, s))

const bucket = "test"
m := e2edb.NewMinio(8080, bucket)
testutil.Ok(b, s.StartAndWaitReady(m))

bkt, err := s3.NewBucketWithConfig(log.NewNopLogger(), s3.Config{
Bucket: bucket,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: m.HTTPEndpoint(),
Insecure: true,
}, "test-feed")
testutil.Ok(b, err)

buf := bytes.Buffer{}
buf.Grow(1028 * 1028 * 100) // 100MB.
word := "abcdefghij"
for i := 0; i < buf.Cap()/len(word); i++ {
_, _ = buf.WriteString(word)
}
str := buf.String()

b.ResetTimer()
for i := 0; i < b.N; i++ {
testutil.Ok(b, bkt.Upload(ctx, "test", strings.NewReader(str)))
}
}
2 changes: 1 addition & 1 deletion pkg/objstore/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ http_config:

cfg, err := parseConfig(input)
testutil.Ok(t, err)
testutil.Assert(t, cfg.PartSize == 1024*1024*128, "when part size not set it should default to 128MiB")
testutil.Assert(t, cfg.PartSize == 1024*1024*64, "when part size not set it should default to 128MiB")

input2 := []byte(`bucket: "bucket-name"
endpoint: "s3-endpoint"
Expand Down
6 changes: 5 additions & 1 deletion pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
span, ctx := tracing.StartSpan(r.Context(), "receive_http")
defer span.Finish()

// TODO(bwplotka): Optimize readAll https://github.com/thanos-io/thanos/pull/3334/files.
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -293,6 +294,9 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
return
}

// NOTE: Due to zero copy ZLabels, Labels used from WriteRequests keeps memory
// from the whole request. Ensure that we always copy those when we want to
// store them for longer time.
var wreq prompb.WriteRequest
if err := proto.Unmarshal(reqBuf, &wreq); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
Expand All @@ -313,7 +317,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
tenant = h.options.DefaultTenantID
}

// exit early if the request contained no data
// Exit early if the request contained no data.
if len(wreq.Timeseries) == 0 {
level.Info(h.logger).Log("msg", "empty timeseries from client", "tenant", tenant)
return
Expand Down
Loading