Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix races triggered by TestRoundtrip in flushManager and bootstrapManager #2586

Merged
merged 4 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ github.com/jcmturner/rpc/v2 v2.0.2/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJk
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jhump/protoreflect v1.6.1 h1:4/2yi5LyDPP7nN+Hiird1SAJ6YoxUm13/oxHGRnbPd8=
github.com/jhump/protoreflect v1.6.1/go.mod h1:RZQ/lnuN+zqeRVpQigTwO6o0AJUkxbnSnpuG7toUTG4=
github.com/jhump/protoreflect v1.7.0 h1:qJ7piXPrjP3mDrfHf5ATkxfLix8ANs226vpo0aACOn0=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
Expand Down
11 changes: 7 additions & 4 deletions src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
"github.com/m3db/m3/src/x/context"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/instrument"
xtime "github.com/m3db/m3/src/x/time"

"github.com/uber-go/tally"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -85,7 +87,7 @@ type bootstrapManager struct {
status tally.Gauge
bootstrapDuration tally.Timer
durableStatus tally.Gauge
lastBootstrapCompletionTime time.Time
lastBootstrapCompletionTime atomic.Int64 // == xtime.UnixNano
}

func newBootstrapManager(
Expand Down Expand Up @@ -117,8 +119,9 @@ func (m *bootstrapManager) IsBootstrapped() bool {
return state == Bootstrapped
}

func (m *bootstrapManager) LastBootstrapCompletionTime() (time.Time, bool) {
return m.lastBootstrapCompletionTime, !m.lastBootstrapCompletionTime.IsZero()
func (m *bootstrapManager) LastBootstrapCompletionTime() (xtime.UnixNano, bool) {
bsTime := xtime.UnixNano(m.lastBootstrapCompletionTime.Load())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably could just use the existing sync.RWMutex to protect this state yeah?

Here it seems fine, but just in general for not highly contended code paths would prefer to stick to simple solutions, otherwise folks start to do lots of tricky hand offs and CompareAndSwaps... etc when they could have just as easily used a mutex.

Thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably could just use the existing sync.RWMutex to protect this state yeah?
+1, actually this is how I was fixing it myself.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing! I just assumed it was as gnarly as flushManager, so used the same apporach.

return bsTime, bsTime > 0
}

func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
Expand Down Expand Up @@ -192,7 +195,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
// on its own course so that the load of ticking and flushing is more spread out
// across the cluster.

m.lastBootstrapCompletionTime = m.nowFn()
m.lastBootstrapCompletionTime.Store(int64(xtime.ToUnixNano(m.nowFn())))
return result, nil
}

Expand Down
13 changes: 7 additions & 6 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,25 +974,26 @@ func (d *db) IsBootstrappedAndDurable() bool {
return false
}

lastBootstrapCompletionTime, ok := d.mediator.LastBootstrapCompletionTime()
lastBootstrapCompletionTimeNano, ok := d.mediator.LastBootstrapCompletionTime()
if !ok {
d.log.Debug("not bootstrapped and durable because: no last bootstrap completion time",
zap.Time("lastBootstrapCompletionTime", lastBootstrapCompletionTime))
zap.Time("lastBootstrapCompletionTime", lastBootstrapCompletionTimeNano.ToTime()))

return false
}

lastSnapshotStartTime, ok := d.mediator.LastSuccessfulSnapshotStartTime()
if !ok {
d.log.Debug("not bootstrapped and durable because: no last snapshot start time",
zap.Time("lastBootstrapCompletionTime", lastBootstrapCompletionTime),
zap.Time("lastSnapshotStartTime", lastSnapshotStartTime),
zap.Time("lastBootstrapCompletionTime", lastBootstrapCompletionTimeNano.ToTime()),
zap.Time("lastSnapshotStartTime", lastSnapshotStartTime.ToTime()),
)
return false
}

var (
hasSnapshottedPostBootstrap = lastSnapshotStartTime.After(lastBootstrapCompletionTime)
lastBootstrapCompletionTime = lastBootstrapCompletionTimeNano.ToTime()
hasSnapshottedPostBootstrap = lastSnapshotStartTime.After(lastBootstrapCompletionTimeNano)
hasBootstrappedSinceReceivingNewShards = lastBootstrapCompletionTime.After(d.lastReceivedNewShards) ||
lastBootstrapCompletionTime.Equal(d.lastReceivedNewShards)
isBootstrappedAndDurable = hasSnapshottedPostBootstrap &&
Expand All @@ -1003,7 +1004,7 @@ func (d *db) IsBootstrappedAndDurable() bool {
d.log.Debug(
"not bootstrapped and durable because: has not snapshotted post bootstrap and/or has not bootstrapped since receiving new shards",
zap.Time("lastBootstrapCompletionTime", lastBootstrapCompletionTime),
zap.Time("lastSnapshotStartTime", lastSnapshotStartTime),
zap.Time("lastSnapshotStartTime", lastSnapshotStartTime.ToTime()),
zap.Time("lastReceivedNewShards", d.lastReceivedNewShards),
)
return false
Expand Down
30 changes: 16 additions & 14 deletions src/dbnode/storage/database_bootstrapped_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/golang/mock/gomock"
xtime "github.com/m3db/m3/src/x/time"
"github.com/stretchr/testify/assert"
)

Expand All @@ -34,16 +35,17 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {

var (
validIsBootstrapped = true
validShardSetAssignedAt = time.Now()
validLastBootstrapCompletionTime = validShardSetAssignedAt.Add(time.Second)
validLastSuccessfulSnapshotStartTime = validLastBootstrapCompletionTime.Add(time.Second)
validShardSetAssignedAt = xtime.ToUnixNano(time.Now())
validLastBootstrapCompletionTime = xtime.ToUnixNano(validShardSetAssignedAt.ToTime().Add(time.Second))
validLastSuccessfulSnapshotStartTime = xtime.ToUnixNano(validLastBootstrapCompletionTime.ToTime().Add(time.Second))
zeroTime = xtime.UnixNano(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This can just be zeroTime xtime.UnixNano and will get the zero value by default.

)
testCases := []struct {
title string
isBootstrapped bool
lastBootstrapCompletionTime time.Time
lastSuccessfulSnapshotStartTime time.Time
shardSetAssignedAt time.Time
lastBootstrapCompletionTime xtime.UnixNano
lastSuccessfulSnapshotStartTime xtime.UnixNano
shardSetAssignedAt xtime.UnixNano
expectedResult bool
}{
{
Expand All @@ -57,7 +59,7 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {
{
title: "False if no last bootstrap completion time",
isBootstrapped: validIsBootstrapped,
lastBootstrapCompletionTime: time.Time{},
lastBootstrapCompletionTime: zeroTime,
lastSuccessfulSnapshotStartTime: validLastSuccessfulSnapshotStartTime,
shardSetAssignedAt: validShardSetAssignedAt,
expectedResult: false,
Expand All @@ -66,7 +68,7 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {
title: "False if no last successful snapshot start time",
isBootstrapped: validIsBootstrapped,
lastBootstrapCompletionTime: validLastBootstrapCompletionTime,
lastSuccessfulSnapshotStartTime: time.Time{},
lastSuccessfulSnapshotStartTime: zeroTime,
shardSetAssignedAt: validShardSetAssignedAt,
expectedResult: false,
},
Expand All @@ -91,7 +93,7 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {
isBootstrapped: validIsBootstrapped,
lastBootstrapCompletionTime: validLastBootstrapCompletionTime,
lastSuccessfulSnapshotStartTime: validLastSuccessfulSnapshotStartTime,
shardSetAssignedAt: validLastBootstrapCompletionTime.Add(time.Second),
shardSetAssignedAt: validLastBootstrapCompletionTime + xtime.UnixNano(xtime.Second),
expectedResult: false,
},
{
Expand All @@ -113,7 +115,7 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {

mediator := NewMockdatabaseMediator(ctrl)
d.mediator = mediator
d.lastReceivedNewShards = tc.shardSetAssignedAt
d.lastReceivedNewShards = tc.shardSetAssignedAt.ToTime()

mediator.EXPECT().IsBootstrapped().Return(tc.isBootstrapped)
if !tc.isBootstrapped {
Expand All @@ -122,17 +124,17 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {
return
}

if tc.lastBootstrapCompletionTime.IsZero() {
mediator.EXPECT().LastBootstrapCompletionTime().Return(time.Time{}, false)
if tc.lastBootstrapCompletionTime == 0 {
mediator.EXPECT().LastBootstrapCompletionTime().Return(zeroTime, false)
assert.Equal(t, tc.expectedResult, d.IsBootstrappedAndDurable())
// Early return because other mock calls will not get called.
return
}

mediator.EXPECT().LastBootstrapCompletionTime().Return(tc.lastBootstrapCompletionTime, true)

if tc.lastSuccessfulSnapshotStartTime.IsZero() {
mediator.EXPECT().LastSuccessfulSnapshotStartTime().Return(time.Time{}, false)
if tc.lastSuccessfulSnapshotStartTime == 0 {
mediator.EXPECT().LastSuccessfulSnapshotStartTime().Return(zeroTime, false)
assert.Equal(t, tc.expectedResult, d.IsBootstrappedAndDurable())
// Early return because other mock calls will not get called.
return
Expand Down
16 changes: 10 additions & 6 deletions src/dbnode/storage/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ import (
"github.com/m3db/m3/src/dbnode/persist/fs/commitlog"
"github.com/m3db/m3/src/dbnode/retention"
xerrors "github.com/m3db/m3/src/x/errors"
xtime "github.com/m3db/m3/src/x/time"

"github.com/pborman/uuid"
"github.com/uber-go/tally"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -90,9 +92,10 @@ type flushManager struct {
state flushManagerState
metrics flushManagerMetrics

lastSuccessfulSnapshotStartTime time.Time
logger *zap.Logger
nowFn clock.NowFn
lastSuccessfulSnapshotStartTime atomic.Int64 // == xtime.UnixNano

logger *zap.Logger
nowFn clock.NowFn
}

func newFlushManager(
Expand Down Expand Up @@ -247,7 +250,7 @@ func (m *flushManager) dataSnapshot(

finalErr := multiErr.FinalError()
if finalErr == nil {
m.lastSuccessfulSnapshotStartTime = startTime
m.lastSuccessfulSnapshotStartTime.Store(int64(xtime.ToUnixNano(startTime)))
}
m.metrics.dataSnapshotDuration.Record(m.nowFn().Sub(start))
return finalErr
Expand Down Expand Up @@ -378,6 +381,7 @@ func (m *flushManager) flushNamespaceWithTimes(
return multiErr.FinalError()
}

func (m *flushManager) LastSuccessfulSnapshotStartTime() (time.Time, bool) {
return m.lastSuccessfulSnapshotStartTime, !m.lastSuccessfulSnapshotStartTime.IsZero()
func (m *flushManager) LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool) {
snapTime := xtime.UnixNano(m.lastSuccessfulSnapshotStartTime.Load())
return snapTime, snapTime > 0
}
3 changes: 2 additions & 1 deletion src/dbnode/storage/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/x/ident"
xtest "github.com/m3db/m3/src/x/test"
xtime "github.com/m3db/m3/src/x/time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -554,7 +555,7 @@ func TestFlushManagerFlushSnapshot(t *testing.T) {

lastSuccessfulSnapshot, ok := fm.LastSuccessfulSnapshotStartTime()
require.True(t, ok)
require.Equal(t, now, lastSuccessfulSnapshot)
require.Equal(t, xtime.ToUnixNano(now), lastSuccessfulSnapshot)
}

type timesInOrder []time.Time
Expand Down
20 changes: 10 additions & 10 deletions src/dbnode/storage/storage_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions src/dbnode/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ type databaseBootstrapManager interface {

// LastBootstrapCompletionTime returns the last bootstrap completion time,
// if any.
LastBootstrapCompletionTime() (time.Time, bool)
LastBootstrapCompletionTime() (xtime.UnixNano, bool)

// Bootstrap performs bootstrapping for all namespaces and shards owned.
Bootstrap() (BootstrapResult, error)
Expand All @@ -749,7 +749,7 @@ type databaseFlushManager interface {

// LastSuccessfulSnapshotStartTime returns the start time of the last
// successful snapshot, if any.
LastSuccessfulSnapshotStartTime() (time.Time, bool)
LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool)

// Report reports runtime information.
Report()
Expand Down Expand Up @@ -797,7 +797,7 @@ type databaseFileSystemManager interface {

// LastSuccessfulSnapshotStartTime returns the start time of the last
// successful snapshot, if any.
LastSuccessfulSnapshotStartTime() (time.Time, bool)
LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool)
}

// databaseColdFlushManager manages the database related cold flush activities.
Expand Down Expand Up @@ -867,7 +867,7 @@ type databaseMediator interface {

// LastBootstrapCompletionTime returns the last bootstrap completion time,
// if any.
LastBootstrapCompletionTime() (time.Time, bool)
LastBootstrapCompletionTime() (xtime.UnixNano, bool)

// Bootstrap bootstraps the database with file operations performed at the end.
Bootstrap() (BootstrapResult, error)
Expand All @@ -892,7 +892,7 @@ type databaseMediator interface {

// LastSuccessfulSnapshotStartTime returns the start time of the last
// successful snapshot, if any.
LastSuccessfulSnapshotStartTime() (time.Time, bool)
LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool)
}

// OnColdFlush can perform work each time a series is flushed.
Expand Down