Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…achdb#139475

139150: kvserver: remove StoreBenignError r=tbg a=tbg

Before commit 3f0b37a, the StoreBenignError is used to handle pebble.ErrSnapshotExcised. As the latter has been removed from pebble, we don't need StoreBenignError anymore.

This commit does the following:

- Remove type "StoreBenignError".
- Remove the related increase action on counter "storeFailures".
- Update related tests "TestBaseQueueRequeue".

Fixes: cockroachdb#129941

Closes: cockroachdb#130308

Release note: None

139280: roachtest: adding backup/restore tests for minio r=sravotto a=sravotto

Introducing a test to verify that we can backup and restore into a Minio Object Store cluster, using S3 API.

Fixes: cockroachdb#139272

Release note: None

139333: roachtest: only run 30 node backfill tests in full ac mode r=andrewbaptist a=andrewbaptist

In the non-full AC modes, a node can OOM during the fill period and the test will fail. This impacts the perturbation/metamorphic/backfill test.

Fixes: cockroachdb#139302
Informs: cockroachdb#139319

Release note: None

139475: rangefeed: fix test logging r=tbg a=stevendanna

The logging didn't actually print the value as it seemed to intend.

Informs cockroachdb#119340

Release note: None

Co-authored-by: XiaochenCui <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Silvano Ravotto <[email protected]>
Co-authored-by: Andrew Baptist <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
6 people committed Jan 21, 2025
5 parents 693af11 + aecc555 + 65e8130 + 50439e0 + 9e11323 commit 210d1f0
Show file tree
Hide file tree
Showing 22 changed files with 169 additions and 103 deletions.
1 change: 0 additions & 1 deletion docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,6 @@
<tr><td>STORAGE</td><td>storage.l6-level-score</td><td>Compaction score of level 6</td><td>Score</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.l6-level-size</td><td>Size of the SSTables in level 6</td><td>Bytes</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.marked-for-compaction-files</td><td>Count of SSTables marked for compaction</td><td>SSTables</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.queue.store-failures</td><td>Number of replicas which failed processing in replica queues due to retryable store errors</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>storage.secondary-cache.count</td><td>The count of cache blocks in the secondary cache (not sstable blocks)</td><td>Cache items</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.secondary-cache.evictions</td><td>The number of times a cache block was evicted from the secondary cache</td><td>Num evictions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>storage.secondary-cache.reads-full-hit</td><td>The number of reads where all data returned was read from the secondary cache</td><td>Num reads</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ go_library(
"rust_postgres_blocklist.go",
"s3_clone_backup_restore.go",
"s3_microceph.go",
"s3_minio.go",
"schemachange.go",
"schemachange_random_load.go",
"scrub.go",
Expand Down
7 changes: 7 additions & 0 deletions pkg/cmd/roachtest/tests/perturbation/index_backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ func (b backfill) setupMetamorphic(rng *rand.Rand) variations {
if v.mem == spec.Low {
v.mem = spec.Standard
}

// TODO(#139319): This can be removed once we stop testing the non "full"
// mode. Without full AC, these tests can OOM.
if v.numNodes >= 30 && (v.acMode == elasticOnlyBoth || v.acMode == fullNormalElasticRepl) {
v.acMode = fullBoth
}

// TODO(#136848): The backfill test will cause WAL failover resulting in
// OOMs even with high memory configurations. This test passes without WAL
// failover enabled or with more vCPUs per node.
Expand Down
64 changes: 64 additions & 0 deletions pkg/cmd/roachtest/tests/s3_clone_backup_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
gosql "database/sql"
"fmt"
"math/rand"
"os"
"path"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
Expand Down Expand Up @@ -65,6 +67,42 @@ func registerBackupS3Clones(r registry.Registry) {
},
})
}

r.Add(registry.TestSpec{
Name: "backup/minio",
Owner: registry.OwnerFieldEng,
Cluster: r.MakeClusterSpec(4, spec.WorkloadNodeCount(1)),
EncryptionSupport: registry.EncryptionMetamorphic,
Leases: registry.MetamorphicLeases,
CompatibleClouds: registry.Clouds(spec.GCE),
Suites: registry.Suites(registry.Nightly),
TestSelectionOptOutSuites: registry.Suites(registry.Nightly),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
v := s3BackupRestoreValidator{
t: t,
c: c,
crdbNodes: c.CRDBNodes(),
csvPort: 8081,
importNode: c.Node(1),
rows: 1000,
workloadNode: c.WorkloadNode(),
}
v.startCluster(ctx)
mgr := minioManager{
t: t,
c: c,
bucket: backupTestingBucket,
// For now, we use the workload node as the minio cluster
minioNodes: c.Node(c.Spec().NodeCount),
key: randomString(32),
secret: randomString(64),
}
mgr.install(ctx)
defer mgr.cleanup(ctx)
v.validateBackupRestore(ctx, mgr)
},
})

}

// s3Provider defines the methods that the S3 object store has to provide
Expand Down Expand Up @@ -229,6 +267,32 @@ func (v *s3BackupRestoreValidator) runWorload(ctx context.Context, duration time
return v.c.RunE(ctx, option.WithNodes(v.workloadNode), cmd)
}

func installCa(ctx context.Context, t test.Test, c cluster.Cluster) error {
localCertsDir, err := os.MkdirTemp("", "roachtest-certs")
if err != nil {
return err
}
// get the ca file from one of the nodes.
caFile := path.Join(localCertsDir, "ca.crt")
conn := c.Conn(ctx, t.L(), 1)
defer conn.Close()
if err := c.Get(ctx, t.L(), "certs/ca.crt", caFile, c.Node(1)); err != nil {
return err
}
caCert, err := os.ReadFile(caFile)
if err != nil {
return err
}
// Disabling caching for Custom CA, see https://github.com/cockroachdb/cockroach/issues/125051.
if _, err := conn.ExecContext(ctx, "set cluster setting cloudstorage.s3.session_reuse.enabled = false"); err != nil {
return err
}
if _, err := conn.ExecContext(ctx, "set cluster setting cloudstorage.http.custom_ca=$1", caCert); err != nil {
return err
}
return nil
}

// randomString returns a random string with the given size.
func randomString(size int) string {
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
Expand Down
28 changes: 2 additions & 26 deletions pkg/cmd/roachtest/tests/s3_microceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"context"
"fmt"
"net/url"
"os"
"path"
"path/filepath"

"github.com/cockroachdb/cockroach/pkg/cloud/amazon"
Expand Down Expand Up @@ -38,7 +36,7 @@ for l in a b c; do
sudo microceph disk add --wipe "/dev/sdi${l}"
done`

// cephCleanup remove microceph and the loop devices.
// cephCleanup removes microceph and the loop devices.
const cephCleanup = `
#!/bin/bash
sudo microceph disable rgw
Expand Down Expand Up @@ -147,29 +145,7 @@ func (m cephManager) maybeInstallCa(ctx context.Context) error {
if !m.secure {
return nil
}
localCertsDir, err := os.MkdirTemp("", "roachtest-certs")
if err != nil {
return err
}
// get the ca file from one of the nodes.
caFile := path.Join(localCertsDir, "ca.crt")
conn := m.c.Conn(ctx, m.t.L(), 1)
defer conn.Close()
if err := m.c.Get(ctx, m.t.L(), "certs/ca.crt", caFile, m.c.Node(1)); err != nil {
return err
}
caCert, err := os.ReadFile(caFile)
if err != nil {
return err
}
// Disabling caching for Custom CA, see https://github.com/cockroachdb/cockroach/issues/125051.
if _, err := conn.ExecContext(ctx, "set cluster setting cloudstorage.s3.session_reuse.enabled = false"); err != nil {
return err
}
if _, err := conn.ExecContext(ctx, "set cluster setting cloudstorage.http.custom_ca=$1", caCert); err != nil {
return err
}
return nil
return installCa(ctx, m.t, m.c)
}

// put creates a file in the ceph node with the given content.
Expand Down
93 changes: 93 additions & 0 deletions pkg/cmd/roachtest/tests/s3_minio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2025 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package tests

import (
"context"
"fmt"
"net/url"
"path"

"github.com/cockroachdb/cockroach/pkg/cloud/amazon"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
)

// minioDir is the directory for supporting files.
var minioDir = "/tmp/minio"

// minioManager manages a single node minio cluster, used to
// validate the backup and restore functionality.
type minioManager struct {
t test.Test
c cluster.Cluster
bucket string
minioNodes option.NodeListOption // The nodes within the cluster used by Minio.
key string
secret string
}

// minioManager implements s3Provider
var _ s3Provider = &minioManager{}

// getBackupURI implements s3Provider.
func (m minioManager) getBackupURI(ctx context.Context, dest string) (string, error) {
addr, err := m.c.InternalIP(ctx, m.t.L(), m.minioNodes)
if err != nil {
return "", err
}
m.t.Status("minio: ", addr)
endpointURL := `https://` + addr[0]

q := make(url.Values)
q.Add(amazon.AWSAccessKeyParam, m.key)
q.Add(amazon.AWSSecretParam, m.secret)
q.Add(amazon.AWSUsePathStyle, "true")
// Region is required in the URL, but not used in Minio.
q.Add(amazon.S3RegionParam, "dummy")
q.Add(amazon.AWSEndpointParam, endpointURL)
uri := fmt.Sprintf("s3://%s/%s?%s", m.bucket, dest, q.Encode())
return uri, nil
}

func (m minioManager) cleanup(ctx context.Context) {
m.run(ctx, "removing minio", "sudo docker rm -f minio")
m.run(ctx, "removing minio dir", fmt.Sprintf(`rm -rf %s`, minioDir))
}

// install a single node minio cluster within a docker container.
// It is fatal on errors.
func (m minioManager) install(ctx context.Context) {
if err := m.c.Install(ctx, m.t.L(), m.minioNodes, "docker"); err != nil {
m.t.Fatalf("failed to install docker: %v", err)
}
certsDir := path.Join(minioDir, "certs")
m.run(ctx, `copy CA`,
fmt.Sprintf(`mkdir -p %[1]s/CAs ; cp certs/ca.crt %[1]s/CAs/ca.crt; `, certsDir))
m.run(ctx, `copy certs/key`,
fmt.Sprintf(`cp certs/node.crt %[1]s/public.crt; cp certs/node.key %[1]s/private.key; `,
certsDir))
m.run(ctx, `installing minio`,
fmt.Sprintf(`sudo docker run --name minio -d -p 443:9000 -e "MINIO_ROOT_USER=%s" -e "MINIO_ROOT_PASSWORD=%s" --privileged -v %s:/root/.minio minio/minio server /data`,
m.key, m.secret, minioDir))

m.run(ctx, `install s3cmd`, `sudo apt install -y s3cmd`)
m.run(ctx, `creating bucket`,
fmt.Sprintf(s3cmdSsl, m.key, m.secret, "mb s3://"+m.bucket))

if err := installCa(ctx, m.t, m.c); err != nil {
m.t.Fatal(err)
}
}

// run the given command on the minio node.
func (m minioManager) run(ctx context.Context, msg string, cmd ...string) {
m.t.Status(msg, "...")
m.t.Status(cmd)
m.c.Run(ctx, option.WithNodes(m.minioNodes), cmd...)
m.t.Status(msg, " done")
}
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1585,7 +1585,7 @@ func TestRangeFeedIntentResolutionRace(t *testing.T) {
require.False(t, commitTS.LessEq(c.ResolvedTS),
"repl %s emitted checkpoint %s beyond write timestamp %s", repl3, c.ResolvedTS, commitTS)
case v := <-valueC:
require.Fail(t, "repl3 emitted premature value %s", v)
require.Failf(t, "repl3 emitted premature value", "value: %#+v", v)
case <-waitC:
done = true
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/kv/kvserver/benignerror/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "benignerror",
srcs = [
"benign_error.go",
"store_benign_error.go",
],
srcs = ["benign_error.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/benignerror",
visibility = ["//visibility:public"],
deps = ["@com_github_cockroachdb_errors//:errors"],
Expand Down
27 changes: 0 additions & 27 deletions pkg/kv/kvserver/benignerror/store_benign_error.go

This file was deleted.

1 change: 0 additions & 1 deletion pkg/kv/kvserver/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func newConsistencyQueue(store *Store) *consistencyQueue {
acceptsUnsplitRanges: true,
successes: store.metrics.ConsistencyQueueSuccesses,
failures: store.metrics.ConsistencyQueueFailures,
storeFailures: store.metrics.StoreFailures,
pending: store.metrics.ConsistencyQueuePending,
processingNanos: store.metrics.ConsistencyQueueProcessingNanos,
processTimeoutFunc: makeRateLimitedTimeoutFunc(consistencyCheckRate),
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue {
acceptsUnsplitRanges: false,
successes: store.metrics.MergeQueueSuccesses,
failures: store.metrics.MergeQueueFailures,
storeFailures: store.metrics.StoreFailures,
pending: store.metrics.MergeQueuePending,
processingNanos: store.metrics.MergeQueueProcessingNanos,
purgatory: store.metrics.MergeQueuePurgatory,
Expand Down
9 changes: 0 additions & 9 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1815,13 +1815,6 @@ The messages are dropped to help these replicas to recover from I/O overload.`,
Unit: metric.Unit_PERCENT,
}

// Replica queue metrics.
metaStoreFailures = metric.Metadata{
Name: "storage.queue.store-failures",
Help: "Number of replicas which failed processing in replica queues due to retryable store errors",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaMVCCGCQueueSuccesses = metric.Metadata{
Name: "queue.gc.process.success",
Help: "Number of replicas successfully processed by the MVCC GC queue",
Expand Down Expand Up @@ -2875,7 +2868,6 @@ type StoreMetrics struct {
RaftCoalescedHeartbeatsPending *metric.Gauge

// Replica queue metrics.
StoreFailures *metric.Counter
MVCCGCQueueSuccesses *metric.Counter
MVCCGCQueueFailures *metric.Counter
MVCCGCQueuePending *metric.Gauge
Expand Down Expand Up @@ -3645,7 +3637,6 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RaftCoalescedHeartbeatsPending: metric.NewGauge(metaRaftCoalescedHeartbeatsPending),

// Replica queue metrics.
StoreFailures: metric.NewCounter(metaStoreFailures),
MVCCGCQueueSuccesses: metric.NewCounter(metaMVCCGCQueueSuccesses),
MVCCGCQueueFailures: metric.NewCounter(metaMVCCGCQueueFailures),
MVCCGCQueuePending: metric.NewGauge(metaMVCCGCQueuePending),
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/mvcc_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func newMVCCGCQueue(store *Store) *mvccGCQueue {
},
successes: store.metrics.MVCCGCQueueSuccesses,
failures: store.metrics.MVCCGCQueueFailures,
storeFailures: store.metrics.StoreFailures,
pending: store.metrics.MVCCGCQueuePending,
processingNanos: store.metrics.MVCCGCQueueProcessingNanos,
disabledConfig: kvserverbase.MVCCGCQueueEnabled,
Expand Down
9 changes: 0 additions & 9 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,6 @@ type queueConfig struct {
successes *metric.Counter
// failures is a counter of replicas which failed processing.
failures *metric.Counter
// storeFailures is a counter of replicas that failed processing due to a
// StoreBenignError. These errors must be counted independently of the above
// failures metric.
storeFailures *metric.Counter
// pending is a gauge measuring current replica count pending.
pending *metric.Gauge
// processingNanos is a counter measuring total nanoseconds spent processing
Expand Down Expand Up @@ -1170,17 +1166,12 @@ func (bq *baseQueue) finishProcessingReplica(
// Handle failures.
if err != nil {
benign := benignerror.IsBenign(err)
storeBenign := benignerror.IsStoreBenign(err)

// Increment failures metric.
//
// TODO(tschottdorf): once we start asserting zero failures in tests
// (and production), move benign failures into a dedicated category.
bq.failures.Inc(1)
if storeBenign {
bq.storeFailures.Inc(1)
requeue = true
}

// Determine whether a failure is a purgatory error. If it is, add
// the failing replica to purgatory. Note that even if the item was
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/queue_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ func TestBaseQueueConcurrent(t *testing.T) {
// We don't care about these, but we don't want to crash.
successes: metric.NewCounter(metric.Metadata{Name: "processed"}),
failures: metric.NewCounter(metric.Metadata{Name: "failures"}),
storeFailures: metric.NewCounter(metric.Metadata{Name: "store_failures"}),
pending: metric.NewGauge(metric.Metadata{Name: "pending"}),
processingNanos: metric.NewCounter(metric.Metadata{Name: "processingnanos"}),
purgatory: metric.NewGauge(metric.Metadata{Name: "purgatory"}),
Expand Down
Loading

0 comments on commit 210d1f0

Please sign in to comment.