Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix races triggered by TestRoundtrip in flushManager and bootstrapManager #2586

Merged
merged 4 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/m3db/m3/src/x/context"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/instrument"
xtime "github.com/m3db/m3/src/x/time"

"github.com/uber-go/tally"
"go.uber.org/zap"
Expand Down Expand Up @@ -85,7 +86,7 @@ type bootstrapManager struct {
status tally.Gauge
bootstrapDuration tally.Timer
durableStatus tally.Gauge
lastBootstrapCompletionTime time.Time
lastBootstrapCompletionTime xtime.UnixNano
}

func newBootstrapManager(
Expand Down Expand Up @@ -117,8 +118,11 @@ func (m *bootstrapManager) IsBootstrapped() bool {
return state == Bootstrapped
}

func (m *bootstrapManager) LastBootstrapCompletionTime() (time.Time, bool) {
return m.lastBootstrapCompletionTime, !m.lastBootstrapCompletionTime.IsZero()
func (m *bootstrapManager) LastBootstrapCompletionTime() (xtime.UnixNano, bool) {
m.RLock()
bsTime := m.lastBootstrapCompletionTime
m.RUnlock()
return bsTime, bsTime > 0
}

func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
Expand Down Expand Up @@ -191,8 +195,9 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
// load to the cluster. It turns out to be better to let ticking happen naturally
// on its own course so that the load of ticking and flushing is more spread out
// across the cluster.

m.lastBootstrapCompletionTime = m.nowFn()
m.Lock()
m.lastBootstrapCompletionTime = xtime.ToUnixNano(m.nowFn())
m.Unlock()
return result, nil
}

Expand Down
13 changes: 7 additions & 6 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,25 +974,26 @@ func (d *db) IsBootstrappedAndDurable() bool {
return false
}

lastBootstrapCompletionTime, ok := d.mediator.LastBootstrapCompletionTime()
lastBootstrapCompletionTimeNano, ok := d.mediator.LastBootstrapCompletionTime()
if !ok {
d.log.Debug("not bootstrapped and durable because: no last bootstrap completion time",
zap.Time("lastBootstrapCompletionTime", lastBootstrapCompletionTime))
zap.Time("lastBootstrapCompletionTime", lastBootstrapCompletionTimeNano.ToTime()))

return false
}

lastSnapshotStartTime, ok := d.mediator.LastSuccessfulSnapshotStartTime()
if !ok {
d.log.Debug("not bootstrapped and durable because: no last snapshot start time",
zap.Time("lastBootstrapCompletionTime", lastBootstrapCompletionTime),
zap.Time("lastSnapshotStartTime", lastSnapshotStartTime),
zap.Time("lastBootstrapCompletionTime", lastBootstrapCompletionTimeNano.ToTime()),
zap.Time("lastSnapshotStartTime", lastSnapshotStartTime.ToTime()),
)
return false
}

var (
hasSnapshottedPostBootstrap = lastSnapshotStartTime.After(lastBootstrapCompletionTime)
lastBootstrapCompletionTime = lastBootstrapCompletionTimeNano.ToTime()
hasSnapshottedPostBootstrap = lastSnapshotStartTime.After(lastBootstrapCompletionTimeNano)
hasBootstrappedSinceReceivingNewShards = lastBootstrapCompletionTime.After(d.lastReceivedNewShards) ||
lastBootstrapCompletionTime.Equal(d.lastReceivedNewShards)
isBootstrappedAndDurable = hasSnapshottedPostBootstrap &&
Expand All @@ -1003,7 +1004,7 @@ func (d *db) IsBootstrappedAndDurable() bool {
d.log.Debug(
"not bootstrapped and durable because: has not snapshotted post bootstrap and/or has not bootstrapped since receiving new shards",
zap.Time("lastBootstrapCompletionTime", lastBootstrapCompletionTime),
zap.Time("lastSnapshotStartTime", lastSnapshotStartTime),
zap.Time("lastSnapshotStartTime", lastSnapshotStartTime.ToTime()),
zap.Time("lastReceivedNewShards", d.lastReceivedNewShards),
)
return false
Expand Down
30 changes: 16 additions & 14 deletions src/dbnode/storage/database_bootstrapped_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/golang/mock/gomock"
xtime "github.com/m3db/m3/src/x/time"
"github.com/stretchr/testify/assert"
)

Expand All @@ -34,16 +35,17 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {

var (
validIsBootstrapped = true
validShardSetAssignedAt = time.Now()
validLastBootstrapCompletionTime = validShardSetAssignedAt.Add(time.Second)
validLastSuccessfulSnapshotStartTime = validLastBootstrapCompletionTime.Add(time.Second)
validShardSetAssignedAt = xtime.ToUnixNano(time.Now())
validLastBootstrapCompletionTime = xtime.ToUnixNano(validShardSetAssignedAt.ToTime().Add(time.Second))
validLastSuccessfulSnapshotStartTime = xtime.ToUnixNano(validLastBootstrapCompletionTime.ToTime().Add(time.Second))
zeroTime xtime.UnixNano
)
testCases := []struct {
title string
isBootstrapped bool
lastBootstrapCompletionTime time.Time
lastSuccessfulSnapshotStartTime time.Time
shardSetAssignedAt time.Time
lastBootstrapCompletionTime xtime.UnixNano
lastSuccessfulSnapshotStartTime xtime.UnixNano
shardSetAssignedAt xtime.UnixNano
expectedResult bool
}{
{
Expand All @@ -57,7 +59,7 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {
{
title: "False if no last bootstrap completion time",
isBootstrapped: validIsBootstrapped,
lastBootstrapCompletionTime: time.Time{},
lastBootstrapCompletionTime: zeroTime,
lastSuccessfulSnapshotStartTime: validLastSuccessfulSnapshotStartTime,
shardSetAssignedAt: validShardSetAssignedAt,
expectedResult: false,
Expand All @@ -66,7 +68,7 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {
title: "False if no last successful snapshot start time",
isBootstrapped: validIsBootstrapped,
lastBootstrapCompletionTime: validLastBootstrapCompletionTime,
lastSuccessfulSnapshotStartTime: time.Time{},
lastSuccessfulSnapshotStartTime: zeroTime,
shardSetAssignedAt: validShardSetAssignedAt,
expectedResult: false,
},
Expand All @@ -91,7 +93,7 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {
isBootstrapped: validIsBootstrapped,
lastBootstrapCompletionTime: validLastBootstrapCompletionTime,
lastSuccessfulSnapshotStartTime: validLastSuccessfulSnapshotStartTime,
shardSetAssignedAt: validLastBootstrapCompletionTime.Add(time.Second),
shardSetAssignedAt: validLastBootstrapCompletionTime + xtime.UnixNano(xtime.Second),
expectedResult: false,
},
{
Expand All @@ -113,7 +115,7 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {

mediator := NewMockdatabaseMediator(ctrl)
d.mediator = mediator
d.lastReceivedNewShards = tc.shardSetAssignedAt
d.lastReceivedNewShards = tc.shardSetAssignedAt.ToTime()

mediator.EXPECT().IsBootstrapped().Return(tc.isBootstrapped)
if !tc.isBootstrapped {
Expand All @@ -122,17 +124,17 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {
return
}

if tc.lastBootstrapCompletionTime.IsZero() {
mediator.EXPECT().LastBootstrapCompletionTime().Return(time.Time{}, false)
if tc.lastBootstrapCompletionTime == 0 {
mediator.EXPECT().LastBootstrapCompletionTime().Return(zeroTime, false)
assert.Equal(t, tc.expectedResult, d.IsBootstrappedAndDurable())
// Early return because other mock calls will not get called.
return
}

mediator.EXPECT().LastBootstrapCompletionTime().Return(tc.lastBootstrapCompletionTime, true)

if tc.lastSuccessfulSnapshotStartTime.IsZero() {
mediator.EXPECT().LastSuccessfulSnapshotStartTime().Return(time.Time{}, false)
if tc.lastSuccessfulSnapshotStartTime == 0 {
mediator.EXPECT().LastSuccessfulSnapshotStartTime().Return(zeroTime, false)
assert.Equal(t, tc.expectedResult, d.IsBootstrappedAndDurable())
// Early return because other mock calls will not get called.
return
Expand Down
16 changes: 10 additions & 6 deletions src/dbnode/storage/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ import (
"github.com/m3db/m3/src/dbnode/persist/fs/commitlog"
"github.com/m3db/m3/src/dbnode/retention"
xerrors "github.com/m3db/m3/src/x/errors"
xtime "github.com/m3db/m3/src/x/time"

"github.com/pborman/uuid"
"github.com/uber-go/tally"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -90,9 +92,10 @@ type flushManager struct {
state flushManagerState
metrics flushManagerMetrics

lastSuccessfulSnapshotStartTime time.Time
logger *zap.Logger
nowFn clock.NowFn
lastSuccessfulSnapshotStartTime atomic.Int64 // == xtime.UnixNano

logger *zap.Logger
nowFn clock.NowFn
}

func newFlushManager(
Expand Down Expand Up @@ -247,7 +250,7 @@ func (m *flushManager) dataSnapshot(

finalErr := multiErr.FinalError()
if finalErr == nil {
m.lastSuccessfulSnapshotStartTime = startTime
m.lastSuccessfulSnapshotStartTime.Store(int64(xtime.ToUnixNano(startTime)))
}
m.metrics.dataSnapshotDuration.Record(m.nowFn().Sub(start))
return finalErr
Expand Down Expand Up @@ -378,6 +381,7 @@ func (m *flushManager) flushNamespaceWithTimes(
return multiErr.FinalError()
}

func (m *flushManager) LastSuccessfulSnapshotStartTime() (time.Time, bool) {
return m.lastSuccessfulSnapshotStartTime, !m.lastSuccessfulSnapshotStartTime.IsZero()
func (m *flushManager) LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool) {
snapTime := xtime.UnixNano(m.lastSuccessfulSnapshotStartTime.Load())
return snapTime, snapTime > 0
}
3 changes: 2 additions & 1 deletion src/dbnode/storage/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/x/ident"
xtest "github.com/m3db/m3/src/x/test"
xtime "github.com/m3db/m3/src/x/time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -554,7 +555,7 @@ func TestFlushManagerFlushSnapshot(t *testing.T) {

lastSuccessfulSnapshot, ok := fm.LastSuccessfulSnapshotStartTime()
require.True(t, ok)
require.Equal(t, now, lastSuccessfulSnapshot)
require.Equal(t, xtime.ToUnixNano(now), lastSuccessfulSnapshot)
}

type timesInOrder []time.Time
Expand Down
20 changes: 10 additions & 10 deletions src/dbnode/storage/storage_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions src/dbnode/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ type databaseBootstrapManager interface {

// LastBootstrapCompletionTime returns the last bootstrap completion time,
// if any.
LastBootstrapCompletionTime() (time.Time, bool)
LastBootstrapCompletionTime() (xtime.UnixNano, bool)

// Bootstrap performs bootstrapping for all namespaces and shards owned.
Bootstrap() (BootstrapResult, error)
Expand All @@ -749,7 +749,7 @@ type databaseFlushManager interface {

// LastSuccessfulSnapshotStartTime returns the start time of the last
// successful snapshot, if any.
LastSuccessfulSnapshotStartTime() (time.Time, bool)
LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool)

// Report reports runtime information.
Report()
Expand Down Expand Up @@ -797,7 +797,7 @@ type databaseFileSystemManager interface {

// LastSuccessfulSnapshotStartTime returns the start time of the last
// successful snapshot, if any.
LastSuccessfulSnapshotStartTime() (time.Time, bool)
LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool)
}

// databaseColdFlushManager manages the database related cold flush activities.
Expand Down Expand Up @@ -867,7 +867,7 @@ type databaseMediator interface {

// LastBootstrapCompletionTime returns the last bootstrap completion time,
// if any.
LastBootstrapCompletionTime() (time.Time, bool)
LastBootstrapCompletionTime() (xtime.UnixNano, bool)

// Bootstrap bootstraps the database with file operations performed at the end.
Bootstrap() (BootstrapResult, error)
Expand All @@ -892,7 +892,7 @@ type databaseMediator interface {

// LastSuccessfulSnapshotStartTime returns the start time of the last
// successful snapshot, if any.
LastSuccessfulSnapshotStartTime() (time.Time, bool)
LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool)
}

// OnColdFlush can perform work each time a series is flushed.
Expand Down