Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Add ability to force repair regardless namespace has option set and add compare only repair type #3550

Merged
merged 8 commits into from
Jun 10, 2021
20 changes: 15 additions & 5 deletions src/cmd/services/m3dbnode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,22 @@ import (
"strings"
"time"

"github.com/m3dbx/vellum/regexp"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/pkg/transport"
"go.etcd.io/etcd/pkg/types"

coordinatorcfg "github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/dbnode/discovery"
"github.com/m3db/m3/src/dbnode/environment"
"github.com/m3db/m3/src/dbnode/storage/repair"
"github.com/m3db/m3/src/dbnode/storage/series"
"github.com/m3db/m3/src/x/config/hostid"
"github.com/m3db/m3/src/x/debug/config"
"github.com/m3db/m3/src/x/instrument"
xlog "github.com/m3db/m3/src/x/log"
"github.com/m3db/m3/src/x/opentracing"

"github.com/m3dbx/vellum/regexp"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/pkg/transport"
"go.etcd.io/etcd/pkg/types"
)

const (
Expand Down Expand Up @@ -528,11 +529,20 @@ type CommitLogQueuePolicy struct {
Size int `yaml:"size" validate:"nonzero"`
}

// RepairPolicyMode is the repair policy mode.
type RepairPolicyMode uint

// RepairPolicy is the repair policy.
type RepairPolicy struct {
// Enabled or disabled.
Enabled bool `yaml:"enabled"`

// Type is the type of repair to run.
Type repair.Type `yaml:"type"`

// Force the repair to run regardless of whether namespaces have repair enabled or not.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: change to ForceEnabled for context?

Force bool `yaml:"force"`

// The repair throttle.
Throttle time.Duration `yaml:"throttle"`

Expand Down
7 changes: 7 additions & 0 deletions src/dbnode/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package integration

import (
"fmt"
"github.com/m3db/m3/src/dbnode/storage/repair"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -131,6 +132,8 @@ type BootstrappableTestSetupOptions struct {
DisablePeersBootstrapper bool
UseTChannelClientForWriting bool
EnableRepairs bool
ForceRepairs bool
RepairType repair.Type
AdminClientCustomOpts []client.CustomAdminOption
}

Expand Down Expand Up @@ -180,6 +183,8 @@ func NewDefaultBootstrappableTestSetups( // nolint:gocyclo
topologyInitializer = setupOpts[i].TopologyInitializer
testStatsReporter = setupOpts[i].TestStatsReporter
enableRepairs = setupOpts[i].EnableRepairs
forceRepairs = setupOpts[i].ForceRepairs
repairType = setupOpts[i].RepairType
origin topology.Host
instanceOpts = newMultiAddrTestOptions(opts, instance)
adminClientCustomOpts = setupOpts[i].AdminClientCustomOpts
Expand Down Expand Up @@ -346,6 +351,8 @@ func NewDefaultBootstrappableTestSetups( // nolint:gocyclo
SetRepairEnabled(true).
SetRepairOptions(
setup.StorageOpts().RepairOptions().
SetType(repairType).
SetForce(forceRepairs).
SetRepairThrottle(time.Millisecond).
SetRepairCheckInterval(time.Millisecond).
SetAdminClients([]client.AdminClient{adminClient}).
Expand Down
210 changes: 210 additions & 0 deletions src/dbnode/integration/repair_force_only_compare_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// +build integration

// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package integration

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/uber-go/tally"

"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/storage/repair"
xtest "github.com/m3db/m3/src/x/test"
xtime "github.com/m3db/m3/src/x/time"
)

func TestRepairForceAndOnlyCompare(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

var (
// Test both disjoint and shared series repair.
genRepairData = genRepairDatafn(func(now xtime.UnixNano, blockSize time.Duration) (
node0Data generate.SeriesBlocksByStart,
node1Data generate.SeriesBlocksByStart,
node2Data generate.SeriesBlocksByStart,
allData generate.SeriesBlocksByStart,
) {
currBlockStart := now.Truncate(blockSize)
node0Data = generate.BlocksByStart([]generate.BlockConfig{
{IDs: []string{"foo"}, NumPoints: 90, Start: currBlockStart.Add(-4 * blockSize)},
{IDs: []string{"foo", "baz"}, NumPoints: 90, Start: currBlockStart.Add(-3 * blockSize)},
})
node1Data = generate.BlocksByStart([]generate.BlockConfig{
{IDs: []string{"bar"}, NumPoints: 90, Start: currBlockStart.Add(-4 * blockSize)},
{IDs: []string{"foo", "baz"}, NumPoints: 90, Start: currBlockStart.Add(-3 * blockSize)},
})

allData = make(map[xtime.UnixNano]generate.SeriesBlock)
for start, data := range node0Data {
for _, series := range data {
allData[start] = append(allData[start], series)
}
}
for start, data := range node1Data {
for _, series := range data {
allData[start] = append(allData[start], series)
}
}
for start, data := range node2Data {
for _, series := range data {
allData[start] = append(allData[start], series)
}
}

return node0Data, node1Data, node2Data, allData
})
)

// Test setups.
log := xtest.NewLogger(t)
retentionOpts := retention.NewOptions().
SetRetentionPeriod(20 * time.Hour).
SetBlockSize(2 * time.Hour).
SetBufferPast(10 * time.Minute).
SetBufferFuture(2 * time.Minute)
nsOpts := namespace.NewOptions().
// Test needing to force enable repairs.
SetRepairEnabled(false).
SetRetentionOptions(retentionOpts)
namesp, err := namespace.NewMetadata(testNamespaces[0], nsOpts)
require.NoError(t, err)
opts := NewTestOptions(t).
SetNamespaces([]namespace.Metadata{namesp}).
// Use TChannel clients for writing / reading because we want to target individual nodes at a time
// and not write/read all nodes in the cluster.
SetUseTChannelClientForWriting(true).
SetUseTChannelClientForReading(true)

setupOpts := []BootstrappableTestSetupOptions{
{
DisablePeersBootstrapper: true,
EnableRepairs: true,
// Test forcing repair of type compare only repair.
ForceRepairs: true,
RepairType: repair.OnlyCompareRepair,
},
{
DisablePeersBootstrapper: true,
EnableRepairs: true,
// Test forcing repair of type compare only repair.
ForceRepairs: true,
RepairType: repair.OnlyCompareRepair,
},
{
DisablePeersBootstrapper: true,
EnableRepairs: true,
// Test forcing repair of type compare only repair.
ForceRepairs: true,
RepairType: repair.OnlyCompareRepair,
},
}
setups, closeFn := NewDefaultBootstrappableTestSetups(t, opts, setupOpts)
defer closeFn()

// Ensure that the current time is set such that the previous block is flushable.
blockSize := retentionOpts.BlockSize()
now := setups[0].NowFn()().Truncate(blockSize).Add(retentionOpts.BufferPast()).Add(time.Second)
for _, setup := range setups {
setup.SetNowFn(now)
}

node0Data, node1Data, node2Data, _ := genRepairData(now, blockSize)
if node0Data != nil {
require.NoError(t, writeTestDataToDisk(namesp, setups[0], node0Data, 0))
}
if node1Data != nil {
require.NoError(t, writeTestDataToDisk(namesp, setups[1], node1Data, 0))
}
if node2Data != nil {
require.NoError(t, writeTestDataToDisk(namesp, setups[2], node2Data, 0))
}

// Start the servers with filesystem bootstrappers.
setups.parallel(func(s TestSetup) {
if err := s.StartServer(); err != nil {
panic(err)
}
})
log.Debug("servers are now up")

// Stop the servers.
defer func() {
setups.parallel(func(s TestSetup) {
require.NoError(t, s.StopServer())
})
log.Debug("servers are now down")
}()

// Wait for repairs to occur at least once per node.
log.Debug("waiting for repairs to run")
var runSuccessPerNodeCounters []tally.CounterSnapshot
require.True(t, waitUntil(func() bool {
var successCounters []tally.CounterSnapshot
for _, setup := range setups {
scope := setup.Scope()
for _, v := range scope.Snapshot().Counters() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can also use tallytest.AssertCounterValue but it can be a little finicky

if v.Name() != "repair.run" {
continue
}
repairType, ok := v.Tags()["repair_type"]
if !ok || repairType != "only_compare" {
continue
}
if v.Value() > 0 {
successCounters = append(successCounters, v)
break
}
}
}

// Check if all counters are success.
successAll := len(successCounters) == len(setups)
if successAll {
runSuccessPerNodeCounters = successCounters
return true
}
return false
}, 60*time.Second))

// Verify that the repair runs only ran comparisons without repairing data.
log.Debug("verifying repairs that ran")
require.Equal(t, len(setups), len(runSuccessPerNodeCounters),
"unexpected number of successful nodes ran repairs")
for _, counter := range runSuccessPerNodeCounters {
repairType, ok := counter.Tags()["repair_type"]
require.True(t, ok)
require.Equal(t, "only_compare", repairType)
require.True(t, counter.Value() > 0)
}

// Verify data did not change (repair type is compare only).
verifySeriesMaps(t, setups[0], namesp.ID(), node0Data)
verifySeriesMaps(t, setups[1], namesp.ID(), node1Data)
verifySeriesMaps(t, setups[2], namesp.ID(), node2Data)
}
4 changes: 3 additions & 1 deletion src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,8 +921,10 @@ func Run(runOpts RunOptions) {
repairOpts := opts.RepairOptions().
SetAdminClients(repairClients)

if cfg.Repair != nil {
if repairCfg := cfg.Repair; repairCfg != nil {
repairOpts = repairOpts.
SetType(repairCfg.Type).
SetForce(repairCfg.Force).
SetResultOptions(rsOpts).
SetDebugShadowComparisonsEnabled(cfg.Repair.DebugShadowComparisonsEnabled)
if cfg.Repair.Throttle > 0 {
Expand Down
20 changes: 17 additions & 3 deletions src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type dbNamespace struct {
snapshotFilesFn snapshotFilesFn
log *zap.Logger
bootstrapState BootstrapState
repairsAny bool

// schemaDescr caches the latest schema for the namespace.
// schemaDescr is updated whenever schema registry is updated.
Expand Down Expand Up @@ -1272,11 +1273,12 @@ func (n *dbNamespace) ColdFlush(flushPersist persist.FlushPreparer) error {
return errNamespaceNotBootstrapped
}
nsCtx := n.nsContextWithRLock()
repairsAny := n.repairsAny
n.RUnlock()

// If repair is enabled we still need cold flush regardless of whether cold writes is
// If repair has run we still need cold flush regardless of whether cold writes is
// enabled since repairs are dependent on the cold flushing logic.
enabled := n.nopts.ColdWritesEnabled() || n.nopts.RepairEnabled()
enabled := n.nopts.ColdWritesEnabled() || repairsAny
if n.ReadOnly() || !enabled {
n.metrics.flushColdData.ReportSuccess(n.nowFn().Sub(callStart))
return nil
Expand Down Expand Up @@ -1499,11 +1501,23 @@ func (n *dbNamespace) Truncate() (int64, error) {
func (n *dbNamespace) Repair(
repairer databaseShardRepairer,
tr xtime.Range,
opts NamespaceRepairOptions,
) error {
if !n.nopts.RepairEnabled() {
shouldRun := opts.Force || n.nopts.RepairEnabled()
if !shouldRun {
return nil
}

n.RLock()
repairsAny := n.repairsAny
n.RUnlock()
if !repairsAny {
// Only acquire write lock if required.
n.Lock()
n.repairsAny = true
n.Unlock()
}

var (
wg sync.WaitGroup
mutex sync.Mutex
Expand Down
Loading