Skip to content

Commit

Permalink
Fix races triggered by TestRoundtrip in flushManager and bootstrapMan…
Browse files Browse the repository at this point in the history
…ager (#2586)

Fix one of the few races that are always triggering on my env with go 1.14
Updates #2540
  • Loading branch information
vdarulis authored Sep 3, 2020
1 parent 929af23 commit b7addc3
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 47 deletions.
15 changes: 10 additions & 5 deletions src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/m3db/m3/src/x/context"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/instrument"
xtime "github.com/m3db/m3/src/x/time"

"github.com/uber-go/tally"
"go.uber.org/zap"
Expand Down Expand Up @@ -85,7 +86,7 @@ type bootstrapManager struct {
status tally.Gauge
bootstrapDuration tally.Timer
durableStatus tally.Gauge
lastBootstrapCompletionTime time.Time
lastBootstrapCompletionTime xtime.UnixNano
}

func newBootstrapManager(
Expand Down Expand Up @@ -117,8 +118,11 @@ func (m *bootstrapManager) IsBootstrapped() bool {
return state == Bootstrapped
}

func (m *bootstrapManager) LastBootstrapCompletionTime() (time.Time, bool) {
return m.lastBootstrapCompletionTime, !m.lastBootstrapCompletionTime.IsZero()
func (m *bootstrapManager) LastBootstrapCompletionTime() (xtime.UnixNano, bool) {
m.RLock()
bsTime := m.lastBootstrapCompletionTime
m.RUnlock()
return bsTime, bsTime > 0
}

func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
Expand Down Expand Up @@ -191,8 +195,9 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
// load to the cluster. It turns out to be better to let ticking happen naturally
// on its own course so that the load of ticking and flushing is more spread out
// across the cluster.

m.lastBootstrapCompletionTime = m.nowFn()
m.Lock()
m.lastBootstrapCompletionTime = xtime.ToUnixNano(m.nowFn())
m.Unlock()
return result, nil
}

Expand Down
13 changes: 7 additions & 6 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,25 +974,26 @@ func (d *db) IsBootstrappedAndDurable() bool {
return false
}

lastBootstrapCompletionTime, ok := d.mediator.LastBootstrapCompletionTime()
lastBootstrapCompletionTimeNano, ok := d.mediator.LastBootstrapCompletionTime()
if !ok {
d.log.Debug("not bootstrapped and durable because: no last bootstrap completion time",
zap.Time("lastBootstrapCompletionTime", lastBootstrapCompletionTime))
zap.Time("lastBootstrapCompletionTime", lastBootstrapCompletionTimeNano.ToTime()))

return false
}

lastSnapshotStartTime, ok := d.mediator.LastSuccessfulSnapshotStartTime()
if !ok {
d.log.Debug("not bootstrapped and durable because: no last snapshot start time",
zap.Time("lastBootstrapCompletionTime", lastBootstrapCompletionTime),
zap.Time("lastSnapshotStartTime", lastSnapshotStartTime),
zap.Time("lastBootstrapCompletionTime", lastBootstrapCompletionTimeNano.ToTime()),
zap.Time("lastSnapshotStartTime", lastSnapshotStartTime.ToTime()),
)
return false
}

var (
hasSnapshottedPostBootstrap = lastSnapshotStartTime.After(lastBootstrapCompletionTime)
lastBootstrapCompletionTime = lastBootstrapCompletionTimeNano.ToTime()
hasSnapshottedPostBootstrap = lastSnapshotStartTime.After(lastBootstrapCompletionTimeNano)
hasBootstrappedSinceReceivingNewShards = lastBootstrapCompletionTime.After(d.lastReceivedNewShards) ||
lastBootstrapCompletionTime.Equal(d.lastReceivedNewShards)
isBootstrappedAndDurable = hasSnapshottedPostBootstrap &&
Expand All @@ -1003,7 +1004,7 @@ func (d *db) IsBootstrappedAndDurable() bool {
d.log.Debug(
"not bootstrapped and durable because: has not snapshotted post bootstrap and/or has not bootstrapped since receiving new shards",
zap.Time("lastBootstrapCompletionTime", lastBootstrapCompletionTime),
zap.Time("lastSnapshotStartTime", lastSnapshotStartTime),
zap.Time("lastSnapshotStartTime", lastSnapshotStartTime.ToTime()),
zap.Time("lastReceivedNewShards", d.lastReceivedNewShards),
)
return false
Expand Down
30 changes: 16 additions & 14 deletions src/dbnode/storage/database_bootstrapped_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/golang/mock/gomock"
xtime "github.com/m3db/m3/src/x/time"
"github.com/stretchr/testify/assert"
)

Expand All @@ -34,16 +35,17 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {

var (
validIsBootstrapped = true
validShardSetAssignedAt = time.Now()
validLastBootstrapCompletionTime = validShardSetAssignedAt.Add(time.Second)
validLastSuccessfulSnapshotStartTime = validLastBootstrapCompletionTime.Add(time.Second)
validShardSetAssignedAt = xtime.ToUnixNano(time.Now())
validLastBootstrapCompletionTime = xtime.ToUnixNano(validShardSetAssignedAt.ToTime().Add(time.Second))
validLastSuccessfulSnapshotStartTime = xtime.ToUnixNano(validLastBootstrapCompletionTime.ToTime().Add(time.Second))
zeroTime xtime.UnixNano
)
testCases := []struct {
title string
isBootstrapped bool
lastBootstrapCompletionTime time.Time
lastSuccessfulSnapshotStartTime time.Time
shardSetAssignedAt time.Time
lastBootstrapCompletionTime xtime.UnixNano
lastSuccessfulSnapshotStartTime xtime.UnixNano
shardSetAssignedAt xtime.UnixNano
expectedResult bool
}{
{
Expand All @@ -57,7 +59,7 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {
{
title: "False if no last bootstrap completion time",
isBootstrapped: validIsBootstrapped,
lastBootstrapCompletionTime: time.Time{},
lastBootstrapCompletionTime: zeroTime,
lastSuccessfulSnapshotStartTime: validLastSuccessfulSnapshotStartTime,
shardSetAssignedAt: validShardSetAssignedAt,
expectedResult: false,
Expand All @@ -66,7 +68,7 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {
title: "False if no last successful snapshot start time",
isBootstrapped: validIsBootstrapped,
lastBootstrapCompletionTime: validLastBootstrapCompletionTime,
lastSuccessfulSnapshotStartTime: time.Time{},
lastSuccessfulSnapshotStartTime: zeroTime,
shardSetAssignedAt: validShardSetAssignedAt,
expectedResult: false,
},
Expand All @@ -91,7 +93,7 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {
isBootstrapped: validIsBootstrapped,
lastBootstrapCompletionTime: validLastBootstrapCompletionTime,
lastSuccessfulSnapshotStartTime: validLastSuccessfulSnapshotStartTime,
shardSetAssignedAt: validLastBootstrapCompletionTime.Add(time.Second),
shardSetAssignedAt: validLastBootstrapCompletionTime + xtime.UnixNano(xtime.Second),
expectedResult: false,
},
{
Expand All @@ -113,7 +115,7 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {

mediator := NewMockdatabaseMediator(ctrl)
d.mediator = mediator
d.lastReceivedNewShards = tc.shardSetAssignedAt
d.lastReceivedNewShards = tc.shardSetAssignedAt.ToTime()

mediator.EXPECT().IsBootstrapped().Return(tc.isBootstrapped)
if !tc.isBootstrapped {
Expand All @@ -122,17 +124,17 @@ func TestDatabaseIsBootstrappedAndDurable(t *testing.T) {
return
}

if tc.lastBootstrapCompletionTime.IsZero() {
mediator.EXPECT().LastBootstrapCompletionTime().Return(time.Time{}, false)
if tc.lastBootstrapCompletionTime == 0 {
mediator.EXPECT().LastBootstrapCompletionTime().Return(zeroTime, false)
assert.Equal(t, tc.expectedResult, d.IsBootstrappedAndDurable())
// Early return because other mock calls will not get called.
return
}

mediator.EXPECT().LastBootstrapCompletionTime().Return(tc.lastBootstrapCompletionTime, true)

if tc.lastSuccessfulSnapshotStartTime.IsZero() {
mediator.EXPECT().LastSuccessfulSnapshotStartTime().Return(time.Time{}, false)
if tc.lastSuccessfulSnapshotStartTime == 0 {
mediator.EXPECT().LastSuccessfulSnapshotStartTime().Return(zeroTime, false)
assert.Equal(t, tc.expectedResult, d.IsBootstrappedAndDurable())
// Early return because other mock calls will not get called.
return
Expand Down
16 changes: 10 additions & 6 deletions src/dbnode/storage/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ import (
"github.com/m3db/m3/src/dbnode/persist/fs/commitlog"
"github.com/m3db/m3/src/dbnode/retention"
xerrors "github.com/m3db/m3/src/x/errors"
xtime "github.com/m3db/m3/src/x/time"

"github.com/pborman/uuid"
"github.com/uber-go/tally"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -90,9 +92,10 @@ type flushManager struct {
state flushManagerState
metrics flushManagerMetrics

lastSuccessfulSnapshotStartTime time.Time
logger *zap.Logger
nowFn clock.NowFn
lastSuccessfulSnapshotStartTime atomic.Int64 // == xtime.UnixNano

logger *zap.Logger
nowFn clock.NowFn
}

func newFlushManager(
Expand Down Expand Up @@ -247,7 +250,7 @@ func (m *flushManager) dataSnapshot(

finalErr := multiErr.FinalError()
if finalErr == nil {
m.lastSuccessfulSnapshotStartTime = startTime
m.lastSuccessfulSnapshotStartTime.Store(int64(xtime.ToUnixNano(startTime)))
}
m.metrics.dataSnapshotDuration.Record(m.nowFn().Sub(start))
return finalErr
Expand Down Expand Up @@ -378,6 +381,7 @@ func (m *flushManager) flushNamespaceWithTimes(
return multiErr.FinalError()
}

func (m *flushManager) LastSuccessfulSnapshotStartTime() (time.Time, bool) {
return m.lastSuccessfulSnapshotStartTime, !m.lastSuccessfulSnapshotStartTime.IsZero()
func (m *flushManager) LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool) {
snapTime := xtime.UnixNano(m.lastSuccessfulSnapshotStartTime.Load())
return snapTime, snapTime > 0
}
3 changes: 2 additions & 1 deletion src/dbnode/storage/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/x/ident"
xtest "github.com/m3db/m3/src/x/test"
xtime "github.com/m3db/m3/src/x/time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -554,7 +555,7 @@ func TestFlushManagerFlushSnapshot(t *testing.T) {

lastSuccessfulSnapshot, ok := fm.LastSuccessfulSnapshotStartTime()
require.True(t, ok)
require.Equal(t, now, lastSuccessfulSnapshot)
require.Equal(t, xtime.ToUnixNano(now), lastSuccessfulSnapshot)
}

type timesInOrder []time.Time
Expand Down
20 changes: 10 additions & 10 deletions src/dbnode/storage/storage_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions src/dbnode/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ type databaseBootstrapManager interface {

// LastBootstrapCompletionTime returns the last bootstrap completion time,
// if any.
LastBootstrapCompletionTime() (time.Time, bool)
LastBootstrapCompletionTime() (xtime.UnixNano, bool)

// Bootstrap performs bootstrapping for all namespaces and shards owned.
Bootstrap() (BootstrapResult, error)
Expand All @@ -749,7 +749,7 @@ type databaseFlushManager interface {

// LastSuccessfulSnapshotStartTime returns the start time of the last
// successful snapshot, if any.
LastSuccessfulSnapshotStartTime() (time.Time, bool)
LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool)

// Report reports runtime information.
Report()
Expand Down Expand Up @@ -797,7 +797,7 @@ type databaseFileSystemManager interface {

// LastSuccessfulSnapshotStartTime returns the start time of the last
// successful snapshot, if any.
LastSuccessfulSnapshotStartTime() (time.Time, bool)
LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool)
}

// databaseColdFlushManager manages the database related cold flush activities.
Expand Down Expand Up @@ -867,7 +867,7 @@ type databaseMediator interface {

// LastBootstrapCompletionTime returns the last bootstrap completion time,
// if any.
LastBootstrapCompletionTime() (time.Time, bool)
LastBootstrapCompletionTime() (xtime.UnixNano, bool)

// Bootstrap bootstraps the database with file operations performed at the end.
Bootstrap() (BootstrapResult, error)
Expand All @@ -892,7 +892,7 @@ type databaseMediator interface {

// LastSuccessfulSnapshotStartTime returns the start time of the last
// successful snapshot, if any.
LastSuccessfulSnapshotStartTime() (time.Time, bool)
LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool)
}

// OnColdFlush can perform work each time a series is flushed.
Expand Down

0 comments on commit b7addc3

Please sign in to comment.