From 99fb33e91e9d7e5feb97c80b700b000ed6405cb0 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Wed, 9 Jun 2021 14:48:59 -0400 Subject: [PATCH 1/7] [dbnode] Add ability to force repair regardless namespace has option set and allow compare only repair type --- src/cmd/services/m3dbnode/config/config.go | 20 +++++-- src/dbnode/integration/integration.go | 7 +++ src/dbnode/server/server.go | 4 +- src/dbnode/storage/namespace.go | 20 ++++++- src/dbnode/storage/repair.go | 46 +++++++++++++-- src/dbnode/storage/repair/options.go | 22 ++++++++ src/dbnode/storage/repair/types.go | 65 ++++++++++++++++++++++ src/dbnode/storage/types.go | 7 ++- 8 files changed, 177 insertions(+), 14 deletions(-) diff --git a/src/cmd/services/m3dbnode/config/config.go b/src/cmd/services/m3dbnode/config/config.go index d5f3cc797f..6380a16370 100644 --- a/src/cmd/services/m3dbnode/config/config.go +++ b/src/cmd/services/m3dbnode/config/config.go @@ -29,21 +29,22 @@ import ( "strings" "time" + "github.com/m3dbx/vellum/regexp" + "go.etcd.io/etcd/embed" + "go.etcd.io/etcd/pkg/transport" + "go.etcd.io/etcd/pkg/types" + coordinatorcfg "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/discovery" "github.com/m3db/m3/src/dbnode/environment" + "github.com/m3db/m3/src/dbnode/storage/repair" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/x/config/hostid" "github.com/m3db/m3/src/x/debug/config" "github.com/m3db/m3/src/x/instrument" xlog "github.com/m3db/m3/src/x/log" "github.com/m3db/m3/src/x/opentracing" - - "github.com/m3dbx/vellum/regexp" - "go.etcd.io/etcd/embed" - "go.etcd.io/etcd/pkg/transport" - "go.etcd.io/etcd/pkg/types" ) const ( @@ -528,11 +529,20 @@ type CommitLogQueuePolicy struct { Size int `yaml:"size" validate:"nonzero"` } +// RepairPolicyMode is the repair policy mode. +type RepairPolicyMode uint + // RepairPolicy is the repair policy. type RepairPolicy struct { // Enabled or disabled. Enabled bool `yaml:"enabled"` + // Type is the type of repair to run. + Type repair.Type `yaml:"type"` + + // Force the repair to run regardless of whether namespaces have repair enabled or not. + Force bool `yaml:"force"` + // The repair throttle. Throttle time.Duration `yaml:"throttle"` diff --git a/src/dbnode/integration/integration.go b/src/dbnode/integration/integration.go index e5704279bf..797da83eef 100644 --- a/src/dbnode/integration/integration.go +++ b/src/dbnode/integration/integration.go @@ -22,6 +22,7 @@ package integration import ( "fmt" + "github.com/m3db/m3/src/dbnode/storage/repair" "sync" "testing" "time" @@ -131,6 +132,8 @@ type BootstrappableTestSetupOptions struct { DisablePeersBootstrapper bool UseTChannelClientForWriting bool EnableRepairs bool + ForceRepairs bool + RepairType repair.Type AdminClientCustomOpts []client.CustomAdminOption } @@ -180,6 +183,8 @@ func NewDefaultBootstrappableTestSetups( // nolint:gocyclo topologyInitializer = setupOpts[i].TopologyInitializer testStatsReporter = setupOpts[i].TestStatsReporter enableRepairs = setupOpts[i].EnableRepairs + forceRepairs = setupOpts[i].ForceRepairs + repairType = setupOpts[i].RepairType origin topology.Host instanceOpts = newMultiAddrTestOptions(opts, instance) adminClientCustomOpts = setupOpts[i].AdminClientCustomOpts @@ -346,6 +351,8 @@ func NewDefaultBootstrappableTestSetups( // nolint:gocyclo SetRepairEnabled(true). SetRepairOptions( setup.StorageOpts().RepairOptions(). + SetType(repairType). + SetForce(forceRepairs). SetRepairThrottle(time.Millisecond). SetRepairCheckInterval(time.Millisecond). SetAdminClients([]client.AdminClient{adminClient}). diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 609c95fd6a..461ad6ae23 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -921,8 +921,10 @@ func Run(runOpts RunOptions) { repairOpts := opts.RepairOptions(). SetAdminClients(repairClients) - if cfg.Repair != nil { + if repairCfg := cfg.Repair; repairCfg != nil { repairOpts = repairOpts. + SetType(repairCfg.Type). + SetForce(repairCfg.Force). SetResultOptions(rsOpts). SetDebugShadowComparisonsEnabled(cfg.Repair.DebugShadowComparisonsEnabled) if cfg.Repair.Throttle > 0 { diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index f94f6e4b01..227e8050e2 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -120,6 +120,7 @@ type dbNamespace struct { snapshotFilesFn snapshotFilesFn log *zap.Logger bootstrapState BootstrapState + repairsAny bool // schemaDescr caches the latest schema for the namespace. // schemaDescr is updated whenever schema registry is updated. @@ -1272,11 +1273,12 @@ func (n *dbNamespace) ColdFlush(flushPersist persist.FlushPreparer) error { return errNamespaceNotBootstrapped } nsCtx := n.nsContextWithRLock() + repairsAny := n.repairsAny n.RUnlock() - // If repair is enabled we still need cold flush regardless of whether cold writes is + // If repair has run we still need cold flush regardless of whether cold writes is // enabled since repairs are dependent on the cold flushing logic. - enabled := n.nopts.ColdWritesEnabled() || n.nopts.RepairEnabled() + enabled := n.nopts.ColdWritesEnabled() || repairsAny if n.ReadOnly() || !enabled { n.metrics.flushColdData.ReportSuccess(n.nowFn().Sub(callStart)) return nil @@ -1499,11 +1501,23 @@ func (n *dbNamespace) Truncate() (int64, error) { func (n *dbNamespace) Repair( repairer databaseShardRepairer, tr xtime.Range, + opts NamespaceRepairOptions, ) error { - if !n.nopts.RepairEnabled() { + shouldRun := opts.Force || n.nopts.RepairEnabled() + if !shouldRun { return nil } + n.RLock() + repairsAny := n.repairsAny + n.RUnlock() + if !repairsAny { + // Only acquire write lock if required. + n.Lock() + n.repairsAny = true + n.Unlock() + } + var ( wg sync.WaitGroup mutex sync.Mutex diff --git a/src/dbnode/storage/repair.go b/src/dbnode/storage/repair.go index e8339ed5cd..bb0b895e40 100644 --- a/src/dbnode/storage/repair.go +++ b/src/dbnode/storage/repair.go @@ -68,9 +68,26 @@ type shardRepairer struct { rpopts repair.Options clients []client.AdminClient recordFn recordFn + nowFn clock.NowFn logger *zap.Logger scope tally.Scope - nowFn clock.NowFn + metrics shardRepairerMetrics +} + +type shardRepairerMetrics struct { + runDefault tally.Counter + runOnlyCompare tally.Counter +} + +func newShardRepairerMetrics(scope tally.Scope) shardRepairerMetrics { + return shardRepairerMetrics{ + runDefault: scope.Tagged(map[string]string{ + "repair_type": "default", + }).Counter("run"), + runOnlyCompare: scope.Tagged(map[string]string{ + "repair_type": "only_compare", + }).Counter("run"), + } } func newShardRepairer(opts Options, rpopts repair.Options) databaseShardRepairer { @@ -81,9 +98,10 @@ func newShardRepairer(opts Options, rpopts repair.Options) databaseShardRepairer opts: opts, rpopts: rpopts, clients: rpopts.AdminClients(), + nowFn: opts.ClockOptions().NowFn(), logger: iopts.Logger(), scope: scope, - nowFn: opts.ClockOptions().NowFn(), + metrics: newShardRepairerMetrics(scope), } r.recordFn = r.recordDifferences @@ -101,6 +119,20 @@ func (r shardRepairer) Repair( tr xtime.Range, shard databaseShard, ) (repair.MetadataComparisonResult, error) { + repairType := r.rpopts.Type() + compareOnly := false + switch repairType { + case repair.DefaultRepair: + defer r.metrics.runDefault.Inc(1) + case repair.OnlyCompareRepair: + compareOnly = true + defer r.metrics.runOnlyCompare.Inc(1) + default: + // Unknown repair type. + err := fmt.Errorf("unknown repair type: %v", repairType) + return repair.MetadataComparisonResult{}, err + } + var sessions []sessionAndTopo for _, c := range r.clients { session, err := c.DefaultAdminSession() @@ -108,6 +140,7 @@ func (r shardRepairer) Repair( fmtErr := fmt.Errorf("error obtaining default admin session: %v", err) return repair.MetadataComparisonResult{}, fmtErr } + topo, err := session.TopologyMap() if err != nil { fmtErr := fmt.Errorf("error obtaining topology map: %v", err) @@ -213,6 +246,10 @@ func (r shardRepairer) Repair( // Shard repair can fail due to transient network errors due to the significant amount of data fetched from peers. // So collect and emit metadata comparison metrics before fetching blocks from peer to repair. r.recordFn(origin, nsCtx.ID, shard, metadataRes) + if compareOnly { + // Early return if repair type doesn't require executing repairing the data step. + return metadataRes, nil + } originID := origin.ID() for _, e := range seriesWithChecksumMismatches.Iter() { @@ -730,10 +767,11 @@ func (r *dbRepairer) repairNamespaceBlockstart(n databaseNamespace, blockStart x } func (r *dbRepairer) repairNamespaceWithTimeRange(n databaseNamespace, tr xtime.Range) error { - if err := n.Repair(r.shardRepairer, tr); err != nil { + if err := n.Repair(r.shardRepairer, tr, NamespaceRepairOptions{ + Force: r.ropts.Force(), + }); err != nil { return fmt.Errorf("namespace %s failed to repair time range %v: %v", n.ID().String(), tr, err) } - return nil } diff --git a/src/dbnode/storage/repair/options.go b/src/dbnode/storage/repair/options.go index 3555f6eabf..f1f62bf5f5 100644 --- a/src/dbnode/storage/repair/options.go +++ b/src/dbnode/storage/repair/options.go @@ -51,6 +51,8 @@ var ( ) type options struct { + repairType Type + force bool adminClients []client.AdminClient repairConsistencyLevel topology.ReadConsistencyLevel repairShardConcurrency int @@ -76,6 +78,26 @@ func NewOptions() Options { } } +func (o *options) SetType(value Type) Options { + opts := *o + opts.repairType = value + return &opts +} + +func (o *options) Type() Type { + return o.repairType +} + +func (o *options) SetForce(value bool) Options { + opts := *o + opts.force = value + return &opts +} + +func (o *options) Force() bool { + return o.force +} + func (o *options) SetAdminClients(value []client.AdminClient) Options { opts := *o opts.adminClients = value diff --git a/src/dbnode/storage/repair/types.go b/src/dbnode/storage/repair/types.go index 49519ba158..57f8c29e5c 100644 --- a/src/dbnode/storage/repair/types.go +++ b/src/dbnode/storage/repair/types.go @@ -21,6 +21,7 @@ package repair import ( + "fmt" "time" "github.com/m3db/m3/src/dbnode/client" @@ -31,6 +32,58 @@ import ( xtime "github.com/m3db/m3/src/x/time" ) +// Type defines the type of repair to run. +type Type uint + +const ( + // DefaultRepair will compare node's integrity to other replicas and then repair blocks as required. + DefaultRepair Type = iota + // OnlyCompareRepair will compare node's integrity to other replicas without repairing blocks, + // this is useful for looking at the metrics emitted by the comparison. + OnlyCompareRepair +) + +var ( + validTypes = []Type{ + DefaultRepair, + OnlyCompareRepair, + } +) + +// UnmarshalYAML unmarshals an Type into a valid type from string. +func (t *Type) UnmarshalYAML(unmarshal func(interface{}) error) error { + var str string + if err := unmarshal(&str); err != nil { + return err + } + + // If unspecified, use default mode. + if str == "" { + *t = DefaultRepair + return nil + } + + for _, valid := range validTypes { + if str == valid.String() { + *t = valid + return nil + } + } + return fmt.Errorf("invalid repair Type '%s' valid types are: %s", + str, validTypes) +} + +// String returns the bootstrap mode as a string +func (t Type) String() string { + switch t { + case DefaultRepair: + return "default" + case OnlyCompareRepair: + return "only_compare" + } + return "unknown" +} + // ReplicaMetadataSlice captures a slice of block.ReplicaMetadata. type ReplicaMetadataSlice interface { // Add adds the metadata to the slice. @@ -145,6 +198,18 @@ type MetadataComparisonResult struct { // Options are the repair options. type Options interface { + // SetType sets the type of repair to run. + SetType(value Type) Options + + // Type returns the type of repair to run. + Type() Type + + // SetForce sets whether to force repairs to run for all namespaces. + SetForce(value bool) Options + + // Force returns whether to force repairs to run for all namespaces. + Force() bool + // SetAdminClient sets the admin client. SetAdminClients(value []client.AdminClient) Options diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 63da1fd924..346af118a8 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -449,7 +449,7 @@ type databaseNamespace interface { Truncate() (int64, error) // Repair repairs the namespace data for a given time range - Repair(repairer databaseShardRepairer, tr xtime.Range) error + Repair(repairer databaseShardRepairer, tr xtime.Range, opts NamespaceRepairOptions) error // BootstrapState returns namespaces' bootstrap state. BootstrapState() BootstrapState @@ -480,6 +480,11 @@ type databaseNamespace interface { ) (int64, error) } +// NamespaceRepairOptions is a set of repair options for repairing a namespace. +type NamespaceRepairOptions struct { + Force bool +} + // Shard is a time series database shard. type Shard interface { // ID returns the ID of the shard. From eb0f0197ce2520c41338784cd91864fa8d8ede2e Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Wed, 9 Jun 2021 14:52:26 -0400 Subject: [PATCH 2/7] Add integration test --- .../repair_force_only_compare_test.go | 210 ++++++++++++++++++ src/dbnode/storage/repair/types_test.go | 45 ++++ 2 files changed, 255 insertions(+) create mode 100644 src/dbnode/integration/repair_force_only_compare_test.go create mode 100644 src/dbnode/storage/repair/types_test.go diff --git a/src/dbnode/integration/repair_force_only_compare_test.go b/src/dbnode/integration/repair_force_only_compare_test.go new file mode 100644 index 0000000000..802013044d --- /dev/null +++ b/src/dbnode/integration/repair_force_only_compare_test.go @@ -0,0 +1,210 @@ +// +build integration + +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package integration + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + + "github.com/m3db/m3/src/dbnode/integration/generate" + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/retention" + "github.com/m3db/m3/src/dbnode/storage/repair" + xtest "github.com/m3db/m3/src/x/test" + xtime "github.com/m3db/m3/src/x/time" +) + +func TestRepairForceAndOnlyCompare(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + var ( + // Test both disjoint and shared series repair. + genRepairData = genRepairDatafn(func(now xtime.UnixNano, blockSize time.Duration) ( + node0Data generate.SeriesBlocksByStart, + node1Data generate.SeriesBlocksByStart, + node2Data generate.SeriesBlocksByStart, + allData generate.SeriesBlocksByStart, + ) { + currBlockStart := now.Truncate(blockSize) + node0Data = generate.BlocksByStart([]generate.BlockConfig{ + {IDs: []string{"foo"}, NumPoints: 90, Start: currBlockStart.Add(-4 * blockSize)}, + {IDs: []string{"foo", "baz"}, NumPoints: 90, Start: currBlockStart.Add(-3 * blockSize)}, + }) + node1Data = generate.BlocksByStart([]generate.BlockConfig{ + {IDs: []string{"bar"}, NumPoints: 90, Start: currBlockStart.Add(-4 * blockSize)}, + {IDs: []string{"foo", "baz"}, NumPoints: 90, Start: currBlockStart.Add(-3 * blockSize)}, + }) + + allData = make(map[xtime.UnixNano]generate.SeriesBlock) + for start, data := range node0Data { + for _, series := range data { + allData[start] = append(allData[start], series) + } + } + for start, data := range node1Data { + for _, series := range data { + allData[start] = append(allData[start], series) + } + } + for start, data := range node2Data { + for _, series := range data { + allData[start] = append(allData[start], series) + } + } + + return node0Data, node1Data, node2Data, allData + }) + ) + + // Test setups. + log := xtest.NewLogger(t) + retentionOpts := retention.NewOptions(). + SetRetentionPeriod(20 * time.Hour). + SetBlockSize(2 * time.Hour). + SetBufferPast(10 * time.Minute). + SetBufferFuture(2 * time.Minute) + nsOpts := namespace.NewOptions(). + // Test needing to force enable repairs. + SetRepairEnabled(false). + SetRetentionOptions(retentionOpts) + namesp, err := namespace.NewMetadata(testNamespaces[0], nsOpts) + require.NoError(t, err) + opts := NewTestOptions(t). + SetNamespaces([]namespace.Metadata{namesp}). + // Use TChannel clients for writing / reading because we want to target individual nodes at a time + // and not write/read all nodes in the cluster. + SetUseTChannelClientForWriting(true). + SetUseTChannelClientForReading(true) + + setupOpts := []BootstrappableTestSetupOptions{ + { + DisablePeersBootstrapper: true, + EnableRepairs: true, + // Test forcing repair of type compare only repair. + ForceRepairs: true, + RepairType: repair.OnlyCompareRepair, + }, + { + DisablePeersBootstrapper: true, + EnableRepairs: true, + // Test forcing repair of type compare only repair. + ForceRepairs: true, + RepairType: repair.OnlyCompareRepair, + }, + { + DisablePeersBootstrapper: true, + EnableRepairs: true, + // Test forcing repair of type compare only repair. + ForceRepairs: true, + RepairType: repair.OnlyCompareRepair, + }, + } + setups, closeFn := NewDefaultBootstrappableTestSetups(t, opts, setupOpts) + defer closeFn() + + // Ensure that the current time is set such that the previous block is flushable. + blockSize := retentionOpts.BlockSize() + now := setups[0].NowFn()().Truncate(blockSize).Add(retentionOpts.BufferPast()).Add(time.Second) + for _, setup := range setups { + setup.SetNowFn(now) + } + + node0Data, node1Data, node2Data, _ := genRepairData(now, blockSize) + if node0Data != nil { + require.NoError(t, writeTestDataToDisk(namesp, setups[0], node0Data, 0)) + } + if node1Data != nil { + require.NoError(t, writeTestDataToDisk(namesp, setups[1], node1Data, 0)) + } + if node2Data != nil { + require.NoError(t, writeTestDataToDisk(namesp, setups[2], node2Data, 0)) + } + + // Start the servers with filesystem bootstrappers. + setups.parallel(func(s TestSetup) { + if err := s.StartServer(); err != nil { + panic(err) + } + }) + log.Debug("servers are now up") + + // Stop the servers. + defer func() { + setups.parallel(func(s TestSetup) { + require.NoError(t, s.StopServer()) + }) + log.Debug("servers are now down") + }() + + // Wait for repairs to occur at least once per node. + log.Debug("waiting for repairs to run") + var runSuccessPerNodeCounters []tally.CounterSnapshot + require.True(t, waitUntil(func() bool { + var successCounters []tally.CounterSnapshot + for _, setup := range setups { + scope := setup.Scope() + for _, v := range scope.Snapshot().Counters() { + if v.Name() != "repair.run" { + continue + } + repairType, ok := v.Tags()["repair_type"] + if !ok || repairType != "only_compare" { + continue + } + if v.Value() > 0 { + successCounters = append(successCounters, v) + break + } + } + } + + // Check if all counters are success. + successAll := len(successCounters) == len(setups) + if successAll { + runSuccessPerNodeCounters = successCounters + return true + } + return false + }, 60*time.Second)) + + // Verify that the repair runs only ran comparisons without repairing data. + log.Debug("verifying repairs that ran") + require.Equal(t, len(setups), len(runSuccessPerNodeCounters), + "unexpected number of successful nodes ran repairs") + for _, counter := range runSuccessPerNodeCounters { + repairType, ok := counter.Tags()["repair_type"] + require.True(t, ok) + require.Equal(t, "only_compare", repairType) + require.True(t, counter.Value() > 0) + } + + // Verify data did not change (repair type is compare only). + verifySeriesMaps(t, setups[0], namesp.ID(), node0Data) + verifySeriesMaps(t, setups[1], namesp.ID(), node1Data) + verifySeriesMaps(t, setups[2], namesp.ID(), node2Data) +} diff --git a/src/dbnode/storage/repair/types_test.go b/src/dbnode/storage/repair/types_test.go new file mode 100644 index 0000000000..15c4bc902f --- /dev/null +++ b/src/dbnode/storage/repair/types_test.go @@ -0,0 +1,45 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package repair + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + yaml "gopkg.in/yaml.v2" +) + +func TestUnmarshalYAML(t *testing.T) { + type config struct { + Type Type `yaml:"type"` + } + + for _, value := range validTypes { + str := fmt.Sprintf("type: %s\n", value.String()) + var cfg config + require.NoError(t, yaml.Unmarshal([]byte(str), &cfg)) + require.Equal(t, value, cfg.Type) + } + + var cfg config + require.Error(t, yaml.Unmarshal([]byte("type: abc"), &cfg)) +} From 4e834a32dfae743439a19ce7a49431fce49fc642 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Wed, 9 Jun 2021 14:57:17 -0400 Subject: [PATCH 3/7] Fix license header --- src/dbnode/storage/repair/types_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/storage/repair/types_test.go b/src/dbnode/storage/repair/types_test.go index 15c4bc902f..6e93368075 100644 --- a/src/dbnode/storage/repair/types_test.go +++ b/src/dbnode/storage/repair/types_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal From aa71b60ddc1191b8a702cc7e989e08a2359b3ce5 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Wed, 9 Jun 2021 17:30:23 -0400 Subject: [PATCH 4/7] Fix mockgen and comment without fullstop --- src/dbnode/storage/storage_mock.go | 8 ++++---- src/dbnode/storage/types.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 715821fdba..256e576658 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -1629,17 +1629,17 @@ func (mr *MockdatabaseNamespaceMockRecorder) ReadableShardAt(shardID interface{} } // Repair mocks base method. -func (m *MockdatabaseNamespace) Repair(repairer databaseShardRepairer, tr time0.Range) error { +func (m *MockdatabaseNamespace) Repair(repairer databaseShardRepairer, tr time0.Range, opts NamespaceRepairOptions) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Repair", repairer, tr) + ret := m.ctrl.Call(m, "Repair", repairer, tr, opts) ret0, _ := ret[0].(error) return ret0 } // Repair indicates an expected call of Repair. -func (mr *MockdatabaseNamespaceMockRecorder) Repair(repairer, tr interface{}) *gomock.Call { +func (mr *MockdatabaseNamespaceMockRecorder) Repair(repairer, tr, opts interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Repair", reflect.TypeOf((*MockdatabaseNamespace)(nil).Repair), repairer, tr) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Repair", reflect.TypeOf((*MockdatabaseNamespace)(nil).Repair), repairer, tr, opts) } // Schema mocks base method. diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 346af118a8..61317be6e4 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -448,7 +448,7 @@ type databaseNamespace interface { // Truncate truncates the in-memory data for this namespace. Truncate() (int64, error) - // Repair repairs the namespace data for a given time range + // Repair repairs the namespace data for a given time range. Repair(repairer databaseShardRepairer, tr xtime.Range, opts NamespaceRepairOptions) error // BootstrapState returns namespaces' bootstrap state. From e30984185aa5025eb9a10300b2b4a42359be17ff Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Wed, 9 Jun 2021 18:03:19 -0400 Subject: [PATCH 5/7] Fix tests --- src/dbnode/storage/namespace_test.go | 2 +- src/dbnode/storage/repair_test.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index d000381a25..74abfcc679 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -866,7 +866,7 @@ func TestNamespaceRepair(t *testing.T) { ns.shards[testShardIDs[i].ID()] = shard } - require.Equal(t, "foo", ns.Repair(repairer, repairTimeRange).Error()) + require.Equal(t, "foo", ns.Repair(repairer, repairTimeRange, NamespaceRepairOptions{}).Error()) } func TestNamespaceShardAt(t *testing.T) { diff --git a/src/dbnode/storage/repair_test.go b/src/dbnode/storage/repair_test.go index 693d328006..3b72f5235c 100644 --- a/src/dbnode/storage/repair_test.go +++ b/src/dbnode/storage/repair_test.go @@ -706,8 +706,8 @@ func TestDatabaseRepairPrioritizationLogic(t *testing.T) { ns1.EXPECT().ID().Return(ident.StringID("ns1")).AnyTimes() ns2.EXPECT().ID().Return(ident.StringID("ns2")).AnyTimes() - ns1.EXPECT().Repair(gomock.Any(), tc.expectedNS1Repair.expectedRepairRange) - ns2.EXPECT().Repair(gomock.Any(), tc.expectedNS2Repair.expectedRepairRange) + ns1.EXPECT().Repair(gomock.Any(), tc.expectedNS1Repair.expectedRepairRange, NamespaceRepairOptions{}) + ns2.EXPECT().Repair(gomock.Any(), tc.expectedNS2Repair.expectedRepairRange, NamespaceRepairOptions{}) mockDatabase.EXPECT().OwnedNamespaces().Return(namespaces, nil) require.Nil(t, repairer.Repair()) @@ -815,7 +815,7 @@ func TestDatabaseRepairSkipsPoisonShard(t *testing.T) { var ns1RepairExpectations = make([]*gomock.Call, len(tc.expectedNS1Repairs)) for i, ns1Repair := range tc.expectedNS1Repairs { ns1RepairExpectations[i] = ns1.EXPECT(). - Repair(gomock.Any(), ns1Repair.expectedRepairRange). + Repair(gomock.Any(), ns1Repair.expectedRepairRange, NamespaceRepairOptions{}). Return(ns1Repair.mockRepairResult) } gomock.InOrder(ns1RepairExpectations...) @@ -824,7 +824,7 @@ func TestDatabaseRepairSkipsPoisonShard(t *testing.T) { var ns2RepairExpectations = make([]*gomock.Call, len(tc.expectedNS2Repairs)) for i, ns2Repair := range tc.expectedNS2Repairs { ns2RepairExpectations[i] = ns2.EXPECT(). - Repair(gomock.Any(), ns2Repair.expectedRepairRange). + Repair(gomock.Any(), ns2Repair.expectedRepairRange, NamespaceRepairOptions{}). Return(ns2Repair.mockRepairResult) } gomock.InOrder(ns2RepairExpectations...) From 91ad7f38ffe8afc5d4a32838a818a4e3fcbd6ed0 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Wed, 9 Jun 2021 18:55:46 -0400 Subject: [PATCH 6/7] Fix lint --- src/dbnode/integration/integration.go | 10 +-- .../repair_force_only_compare_test.go | 68 +++++++++---------- src/dbnode/storage/repair.go | 4 +- src/dbnode/storage/repair/types.go | 10 ++- src/dbnode/storage/repair_test.go | 1 + 5 files changed, 42 insertions(+), 51 deletions(-) diff --git a/src/dbnode/integration/integration.go b/src/dbnode/integration/integration.go index 797da83eef..a7cadb744e 100644 --- a/src/dbnode/integration/integration.go +++ b/src/dbnode/integration/integration.go @@ -22,11 +22,14 @@ package integration import ( "fmt" - "github.com/m3db/m3/src/dbnode/storage/repair" "sync" "testing" "time" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "go.uber.org/zap" + "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/integration/generate" @@ -44,6 +47,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/index/compaction" + "github.com/m3db/m3/src/dbnode/storage/repair" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/topology/testutil" xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" @@ -54,10 +58,6 @@ import ( "github.com/m3db/m3/src/x/instrument" xretry "github.com/m3db/m3/src/x/retry" xtime "github.com/m3db/m3/src/x/time" - - "github.com/stretchr/testify/require" - "github.com/uber-go/tally" - "go.uber.org/zap" ) const ( diff --git a/src/dbnode/integration/repair_force_only_compare_test.go b/src/dbnode/integration/repair_force_only_compare_test.go index 802013044d..835244044c 100644 --- a/src/dbnode/integration/repair_force_only_compare_test.go +++ b/src/dbnode/integration/repair_force_only_compare_test.go @@ -42,44 +42,36 @@ func TestRepairForceAndOnlyCompare(t *testing.T) { t.SkipNow() } - var ( - // Test both disjoint and shared series repair. - genRepairData = genRepairDatafn(func(now xtime.UnixNano, blockSize time.Duration) ( - node0Data generate.SeriesBlocksByStart, - node1Data generate.SeriesBlocksByStart, - node2Data generate.SeriesBlocksByStart, - allData generate.SeriesBlocksByStart, - ) { - currBlockStart := now.Truncate(blockSize) - node0Data = generate.BlocksByStart([]generate.BlockConfig{ - {IDs: []string{"foo"}, NumPoints: 90, Start: currBlockStart.Add(-4 * blockSize)}, - {IDs: []string{"foo", "baz"}, NumPoints: 90, Start: currBlockStart.Add(-3 * blockSize)}, - }) - node1Data = generate.BlocksByStart([]generate.BlockConfig{ - {IDs: []string{"bar"}, NumPoints: 90, Start: currBlockStart.Add(-4 * blockSize)}, - {IDs: []string{"foo", "baz"}, NumPoints: 90, Start: currBlockStart.Add(-3 * blockSize)}, - }) - - allData = make(map[xtime.UnixNano]generate.SeriesBlock) - for start, data := range node0Data { - for _, series := range data { - allData[start] = append(allData[start], series) - } - } - for start, data := range node1Data { - for _, series := range data { - allData[start] = append(allData[start], series) - } - } - for start, data := range node2Data { - for _, series := range data { - allData[start] = append(allData[start], series) - } - } - - return node0Data, node1Data, node2Data, allData + // Test both disjoint and shared series repair. + genRepairData := genRepairDatafn(func(now xtime.UnixNano, blockSize time.Duration) ( + node0Data generate.SeriesBlocksByStart, + node1Data generate.SeriesBlocksByStart, + node2Data generate.SeriesBlocksByStart, + allData generate.SeriesBlocksByStart, + ) { + currBlockStart := now.Truncate(blockSize) + node0Data = generate.BlocksByStart([]generate.BlockConfig{ + {IDs: []string{"foo"}, NumPoints: 90, Start: currBlockStart.Add(-4 * blockSize)}, + {IDs: []string{"foo", "baz"}, NumPoints: 90, Start: currBlockStart.Add(-3 * blockSize)}, + }) + node1Data = generate.BlocksByStart([]generate.BlockConfig{ + {IDs: []string{"bar"}, NumPoints: 90, Start: currBlockStart.Add(-4 * blockSize)}, + {IDs: []string{"foo", "baz"}, NumPoints: 90, Start: currBlockStart.Add(-3 * blockSize)}, }) - ) + + allData = make(map[xtime.UnixNano]generate.SeriesBlock) + for start, data := range node0Data { + allData[start] = append(allData[start], data...) + } + for start, data := range node1Data { + allData[start] = append(allData[start], data...) + } + for start, data := range node2Data { + allData[start] = append(allData[start], data...) + } + + return node0Data, node1Data, node2Data, allData + }) // Test setups. log := xtest.NewLogger(t) @@ -124,6 +116,8 @@ func TestRepairForceAndOnlyCompare(t *testing.T) { RepairType: repair.OnlyCompareRepair, }, } + + // nolint: govet setups, closeFn := NewDefaultBootstrappableTestSetups(t, opts, setupOpts) defer closeFn() diff --git a/src/dbnode/storage/repair.go b/src/dbnode/storage/repair.go index bb0b895e40..f662bd0873 100644 --- a/src/dbnode/storage/repair.go +++ b/src/dbnode/storage/repair.go @@ -120,12 +120,10 @@ func (r shardRepairer) Repair( shard databaseShard, ) (repair.MetadataComparisonResult, error) { repairType := r.rpopts.Type() - compareOnly := false switch repairType { case repair.DefaultRepair: defer r.metrics.runDefault.Inc(1) case repair.OnlyCompareRepair: - compareOnly = true defer r.metrics.runOnlyCompare.Inc(1) default: // Unknown repair type. @@ -246,7 +244,7 @@ func (r shardRepairer) Repair( // Shard repair can fail due to transient network errors due to the significant amount of data fetched from peers. // So collect and emit metadata comparison metrics before fetching blocks from peer to repair. r.recordFn(origin, nsCtx.ID, shard, metadataRes) - if compareOnly { + if repairType == repair.OnlyCompareRepair { // Early return if repair type doesn't require executing repairing the data step. return metadataRes, nil } diff --git a/src/dbnode/storage/repair/types.go b/src/dbnode/storage/repair/types.go index 57f8c29e5c..bcd397f0bc 100644 --- a/src/dbnode/storage/repair/types.go +++ b/src/dbnode/storage/repair/types.go @@ -43,12 +43,10 @@ const ( OnlyCompareRepair ) -var ( - validTypes = []Type{ - DefaultRepair, - OnlyCompareRepair, - } -) +var validTypes = []Type{ + DefaultRepair, + OnlyCompareRepair, +} // UnmarshalYAML unmarshals an Type into a valid type from string. func (t *Type) UnmarshalYAML(unmarshal func(interface{}) error) error { diff --git a/src/dbnode/storage/repair_test.go b/src/dbnode/storage/repair_test.go index 3b72f5235c..803a416fc7 100644 --- a/src/dbnode/storage/repair_test.go +++ b/src/dbnode/storage/repair_test.go @@ -678,6 +678,7 @@ func TestDatabaseRepairPrioritizationLogic(t *testing.T) { } for _, tc := range testCases { + tc := tc t.Run(tc.title, func(t *testing.T) { opts := DefaultTestOptions().SetRepairOptions(testRepairOptions(ctrl)) mockDatabase := NewMockdatabase(ctrl) From 843dd777ec3ed2bd94d9b887960eea44ceab87da Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Wed, 9 Jun 2021 23:16:03 -0400 Subject: [PATCH 7/7] Fix final unit test --- src/cmd/services/m3dbnode/config/config_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index 2ce0ad573f..f8c2192d4b 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -453,6 +453,8 @@ func TestConfiguration(t *testing.T) { queueChannel: null repair: enabled: false + type: 0 + force: false throttle: 2m0s checkInterval: 1m0s debugShadowComparisonsEnabled: false